From dc9d78058e72e2483a8f57f03046cc8eb7eceadd Mon Sep 17 00:00:00 2001 From: Tianhao Wang Date: Tue, 4 Apr 2023 09:58:28 +0200 Subject: retab, move meow handler to timeline scan --- cat.py | 418 +++++++++++++++++++++++++++++++----------------------------- config.py | 3 +- patterns.py | 2 +- 3 files changed, 218 insertions(+), 205 deletions(-) diff --git a/cat.py b/cat.py index 86e9af4..3eec4b8 100644 --- a/cat.py +++ b/cat.py @@ -11,217 +11,229 @@ import catdb # TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll # leave it to the future... def init_app(config): - Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, - to_file=config.CLIENTID) - session = login_refresh_token() + Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, + to_file=config.CLIENTID) + session = login_refresh_token() def login_refresh_token(config): - session = Mastodon(client_id=config.CLIENTID) - session.log_in(username=config.UNAME, - password=config.PW, to_file=config.TOKEN) - return session + session = Mastodon(client_id=config.CLIENTID) + session.log_in(username=config.UNAME, + password=config.PW, to_file=config.TOKEN) + return session def restore_session(config): - return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') + return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') class VnilCat: - def __init__(self, config=cfg): - self.config = cfg - self.session = restore_session(self.config) - self.news = [] - self.tl_lastseen_sid = None - print("[booting] checking client data") - if not os.path.isfile(self.config.CLIENTID): - print("[booting] client data doesn't exist..creating...") - init_app(self.config) - try: - self.session.account_verify_credentials() - except: - try: - self.session = login_refresh_token(self.config) - except: - exit() - - print("[booting] check DB .. using sqlite3") - try: - self.db = catdb.DBHandler(config) - except Exception as e: - print("[ERROR] failed to setup DB ...",e) - print("exit ...") - exit() - self.intimacy = {} - print("[booting] init intimacy") - self.update_intimacy() - - # try to fetch the lastest status from the timeline - # so that we can skip the ones before we start. - # This is not a persistent state machine, - # we don't know if a older status has been processed - # or not! - try: - tl = self.session.timeline_home(limit=1) - if (len(tl)) != 0: - print("[booting] set init status id to...", tl[0]["id"]) - self.tl_lastseen_sid = tl[0]["id"] - except Exception as e: - print("[booting] can't init timeline, continue anyways", e) - print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human") - print("-------------") - - def update_intimacy(self): - res = self.db.count_interaction() - if res != None: - self.intimacy = res - - def not_mine(self, status): - return status['account']['acct'] != self.config.UNAME - - def is_mine(self, status): - return status['account']['acct'] == self.config.UNAME - - def reply_meow(self, ori_status): - print("replying meow to ", ori_status["id"]) - self.session.status_reply( - to_status=ori_status, status=random.choice(cat_sounds)) - self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"]) - - def post_hn_news(self, amount=3): - print("posting news") - if amount <= 0: - return - if amount > len(self.news): - self.news = hnnews.get_topnews(20) - if amount > len(self.news): - amount = len(self.news) - status = "Hear ye, hear ye \n\n" - for i in range(amount): - n = self.news.pop(random.randrange(len(self.news))) - status += n["title"] - status += "\n" - status += n["url"] - status += "\n\n" - self.session.toot(status) - - - def catch_birds(self, status, content): - if re_contains_bird.search(content) is not None and len(content) < self.config.CATCH_BIRD_MAX_LENGTH: - print("i see a bird", status["id"], " from ", status["account"]["acct"]) - try: - s = self.session.status_reply( - to_status=status, status=random.choice(bird_sounds)) - self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"]) - return True - except: - print("fail to post") - return False - - def handle_command(self, status, content): - print("i see a command", content) - cmd = content[1:].strip().split() - actor = status["account"]["acct"] - # handle cmds - if cmd[0] == "news": - if actor not in self.config.ADMINS: - return - self.post_hn_news() - elif cmd[0] == "stats": - if actor not in self.config.ADMINS: - return - self.session.status_reply(to_status=status, status=str(self.intimacy)) - else: - print("I don't understand") - - def handle_home_status(self, status): - self.tl_lastseen_sid = status["id"] - if self.is_mine(status): - print("this one is from myself, skipping") - return - acc = status["account"] - content = cleanhtml(status["content"]) - # only one action is taken. if one succeed then return - if self.catch_birds(status,content): - return - - def scantimeline(self): - # print("scanning timeline, lastseen=",self.tl_lastseen_sid) - tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) - # important! make sure to iterate from older status to newer - # otherwise the 'last_seen' won't be updated correctly - for status in reversed(tl): - self.handle_home_status(status=status) - - # For now we don't handle other interaction types - - def handle_mention(self, notification): - nid = notification["id"] - acc = notification["account"] - status = notification["status"] - sid = status["id"] - print("we have a mention > ", nid) - content = cleanhtml(status["content"]).replace("@cat", "").lstrip() - print(content) - if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH: - print("invalid status content") - - # prioritized to commands - elif content[0] == "!": - self.handle_command(status, content) - - elif re_contains_meow.search(content) is not None: - self.reply_meow(status) - - print("dismissing notification ", nid) - self.session.notifications_dismiss(nid) - - def handle_follow(self, notification): - nid = notification["id"] - uid = notification['account']['id'] - rel = self.session.account_relationships(uid) - if rel[0]['following'] == False: - print("follow ", uid) - self.session.account_follow(uid, reblogs=False, notify=False) - else: - print("already following", uid) - self.session.notifications_dismiss(nid) - - def handle_fav(self, notification): - nid = notification["id"] - acct = notification['account']['acct'] - print(f"{acct} is petting me! purrr") - self.db.insert_event(_type="fav", remarks="",correspond=acct) - self.session.notifications_dismiss(nid) - - def handle_notification(self): - ns = self.session.notifications() - for n in ns: - if n['type'] == "mention": - self.handle_mention(n) - if n['type'] == "follow": - self.handle_follow(n) - if n['type'] == 'favourite': - self.handle_fav(n) - - def run(self): - print("[info] running...") - self.epoch = 3 - while True: - try: - self.handle_notification() - self.scantimeline() - if self.epoch % 360 == 1 : - # happens roughtly every 2 hours - self.post_hn_news(3) - self.update_intimacy() - except Exception as e: - print("something wrong...") - print(e) - time.sleep(self.config.POLL_INTERVAL) - self.epoch += 1 + def __init__(self, config=cfg): + self.config = cfg + self.session = restore_session(self.config) + self.news = [] + self.tl_lastseen_sid = None + print("[booting] checking client data") + if not os.path.isfile(self.config.CLIENTID): + print("[booting] client data doesn't exist..creating...") + init_app(self.config) + try: + self.session.account_verify_credentials() + except: + try: + self.session = login_refresh_token(self.config) + except: + exit() + + print("[booting] check DB .. using sqlite3") + try: + self.db = catdb.DBHandler(config) + except Exception as e: + print("[ERROR] failed to setup DB ...",e) + print("exit ...") + exit() + self.intimacy = {} + print("[booting] init intimacy") + self.update_intimacy() + + # try to fetch the lastest status from the timeline + # so that we can skip the ones before we start. + # This is not a persistent state machine, + # we don't know if a older status has been processed + # or not! + try: + tl = self.session.timeline_home(limit=1) + if (len(tl)) != 0: + print("[booting] set init status id to...", tl[0]["id"]) + self.tl_lastseen_sid = tl[0]["id"] + except Exception as e: + print("[booting] can't init timeline, continue anyways", e) + print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human") + print("-------------") + + def update_intimacy(self): + res = self.db.count_interaction() + if res != None: + self.intimacy = res + + def not_mine(self, status): + return status['account']['acct'] != self.config.UNAME + + def is_mine(self, status): + return status['account']['acct'] == self.config.UNAME + + def reply_meow(self, ori_status): + print("replying meow to ", ori_status["id"]) + self.session.status_reply( + to_status=ori_status, status=random.choice(cat_sounds)) + self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"]) + + def post_hn_news(self, amount=3): + print("posting news") + if amount <= 0: + return + if amount > len(self.news): + self.news = hnnews.get_topnews(20) + if amount > len(self.news): + amount = len(self.news) + status = "Hear ye, hear ye \n\n" + for i in range(amount): + n = self.news.pop(random.randrange(len(self.news))) + status += n["title"] + status += "\n" + status += n["url"] + status += "\n\n" + self.session.toot(status) + + + def catch_birds(self, status, content): + if re_contains_bird.search(content) is not None and len(content) < self.config.CATCH_BIRD_MAX_LENGTH: + print("i see a bird", status["id"], " from ", status["account"]["acct"]) + try: + s = self.session.status_reply( + to_status=status, status=random.choice(bird_sounds)) + self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"]) + return True + except: + print("fail to post") + return False + + def catch_cats(self, status, content): + if re_contains_meow.search(content) is not None and len(content) < self.config.CATCH_CAT_MAX_LENGTH: + print("i see a cat", status["id"], " from ", status["account"]["acct"]) + try: + self.reply_meow(status) + return True + except: + print("fail to post") + return False + + def handle_command(self, status, content): + print("i see a command", content) + cmd = content[1:].strip().split() + actor = status["account"]["acct"] + # handle cmds + if cmd[0] == "news": + if actor not in self.config.ADMINS: + return + self.post_hn_news() + elif cmd[0] == "stats": + if actor not in self.config.ADMINS: + return + self.session.status_reply(to_status=status, status=str(self.intimacy)) + else: + print("I don't understand") + + def handle_home_status(self, status): + self.tl_lastseen_sid = status["id"] + if self.is_mine(status): + print("this one is from myself, skipping") + return + acc = status["account"] + content = cleanhtml(status["content"]) + # only one action is taken. if one succeed then return + if self.catch_birds(status,content): + return + elif self.catch_cats(status,content): + return + + def scantimeline(self): + # print("scanning timeline, lastseen=",self.tl_lastseen_sid) + tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) + # important! make sure to iterate from older status to newer + # otherwise the 'last_seen' won't be updated correctly + for status in reversed(tl): + self.handle_home_status(status=status) + + # For now we don't handle other interaction types + + def handle_mention(self, notification): + nid = notification["id"] + acc = notification["account"] + status = notification["status"] + sid = status["id"] + print("we have a mention > ", nid) + content = cleanhtml(status["content"]).replace("@cat", "").lstrip() + print(content) + if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH: + print("invalid status content") + + # prioritized to commands + elif content[0] == "!": + self.handle_command(status, content) + # moved to "handle_home_status" + # elif re_contains_meow.search(content) is not None: + # self.reply_meow(status) + + print("dismissing notification ", nid) + self.session.notifications_dismiss(nid) + + def handle_follow(self, notification): + nid = notification["id"] + uid = notification['account']['id'] + rel = self.session.account_relationships(uid) + if rel[0]['following'] == False: + print("follow ", uid) + self.session.account_follow(uid, reblogs=False, notify=False) + else: + print("already following", uid) + self.session.notifications_dismiss(nid) + + def handle_fav(self, notification): + nid = notification["id"] + acct = notification['account']['acct'] + print(f"{acct} is petting me! purrr") + self.db.insert_event(_type="fav", remarks="",correspond=acct) + self.session.notifications_dismiss(nid) + + def handle_notification(self): + ns = self.session.notifications() + for n in ns: + if n['type'] == "mention": + self.handle_mention(n) + if n['type'] == "follow": + self.handle_follow(n) + if n['type'] == 'favourite': + self.handle_fav(n) + + def run(self): + print("[info] running...") + self.epoch = 3 + while True: + try: + self.handle_notification() + self.scantimeline() + if self.epoch % 360 == 1 : + # happens roughtly every 2 hours + self.post_hn_news(3) + self.update_intimacy() + except Exception as e: + print("something wrong...") + print(e) + time.sleep(self.config.POLL_INTERVAL) + self.epoch += 1 if __name__ == "__main__": - # check session data - cat = VnilCat(cfg) - cat.run() + # check session data + cat = VnilCat(cfg) + cat.run() diff --git a/config.py b/config.py index 12e3a2f..f68e7fa 100644 --- a/config.py +++ b/config.py @@ -34,7 +34,8 @@ INTIMACY_ACTIVE = 50 INTIMACY_LOVE = 100 # BEHAVIOURS .. -CATCH_BIRD_MAX_LENGTH = 30 +CATCH_BIRD_MAX_LENGTH = 20 +CATCH_CAT_MAX_LENGTH = 20 ## DB CONFIGS ----------------------------- # TODO, currently the bot and the db handler is sharing diff --git a/patterns.py b/patterns.py index 7520a12..2bf400d 100644 --- a/patterns.py +++ b/patterns.py @@ -1,7 +1,7 @@ import re #### RE PATTERNS #### -re_contains_meow = re.compile(r'(me+o+w|喵)',re.IGNORECASE) +re_contains_meow = re.compile(r'(me+o+w|喵|にゃん|nya+n)',re.IGNORECASE) re_contains_bird = re.compile(r'(鸟|bird|鳥)',re.IGNORECASE) #### MISC STRING PRESETS #### cat_sounds = ["Meow!", "Mrrrow!", "Purr...", "Meee-OW!", "Mreoww!", "Nya~", "Mew?", "Rowr?", "Prrrr...", "Maow-maow!"] -- cgit v1.2.3-70-g09d2