from mastodon import Mastodon import config as cfg import time import re import random import os from patterns import * import hnnews import catdb # TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll # leave it to the future... def init_app(config): Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, to_file=config.CLIENTID) session = login_refresh_token() def login_refresh_token(config): session = Mastodon(client_id=config.CLIENTID) session.log_in(username=config.UNAME, password=config.PW, to_file=config.TOKEN) return session def restore_session(config): return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') class VnilCat: def __init__(self, config=cfg): self.config = cfg self.session = restore_session(self.config) self.news = [] self.tl_lastseen_sid = None print("[booting] checking client data") if not os.path.isfile(self.config.CLIENTID): print("[booting] client data doesn't exist..creating...") init_app(self.config) try: self.session.account_verify_credentials() except: try: self.session = login_refresh_token(self.config) except: exit() print("[booting] check DB .. using sqlite3") try: self.db = catdb.DBHandler(config) except Exception as e: print("[ERROR] failed to setup DB ...",e) print("exit ...") exit() self.intimacy = {} print("[booting] init intimacy") self.update_intimacy() # try to fetch the lastest status from the timeline # so that we can skip the ones before we start. # This is not a persistent state machine, # we don't know if a older status has been processed # or not! try: tl = self.session.timeline_home(limit=1) if (len(tl)) != 0: print("[booting] set init status id to...", tl[0]["id"]) self.tl_lastseen_sid = tl[0]["id"] except Exception as e: print("[booting] can't init timeline, continue anyways", e) print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human") print("-------------") def update_intimacy(self): res = self.db.count_interaction() if res != None: self.intimacy = res def not_mine(self, status): return status['account']['acct'] != self.config.UNAME def is_mine(self, status): return status['account']['acct'] == self.config.UNAME def reply_meow(self, ori_status): print("replying meow to ", ori_status["id"]) self.session.status_reply( to_status=ori_status, status=random.choice(cat_sounds)) self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"]) def post_hn_news(self, amount=3): print("posting news") if amount <= 0: return if amount > len(self.news): self.news = hnnews.get_topnews(20) if amount > len(self.news): amount = len(self.news) status = "Hear ye, hear ye \n\n" for i in range(amount): n = self.news.pop(random.randrange(len(self.news))) status += n["title"] status += "\n" status += n["url"] status += "\n\n" self.session.toot(status) def catch_birds(self, status, content): if re_contains_bird.search(content) is not None and len(content) < self.config.CATCH_BIRD_MAX_LENGTH: print("i see a bird", status["id"], " from ", status["account"]["acct"]) try: s = self.session.status_reply( to_status=status, status=random.choice(bird_sounds)) self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"]) return True except: print("fail to post") return False def handle_command(self, status, content): print("i see a command", content) cmd = content[1:].strip().split() actor = status["account"]["acct"] # handle cmds if cmd[0] == "news": if actor not in self.config.ADMINS: return self.post_hn_news() elif cmd[0] == "stats": if actor not in self.config.ADMINS: return self.session.status_reply(to_status=status, status=str(self.intimacy)) else: print("I don't understand") def handle_home_status(self, status): self.tl_lastseen_sid = status["id"] if self.is_mine(status): print("this one is from myself, skipping") return acc = status["account"] content = cleanhtml(status["content"]) # only one action is taken. if one succeed then return if self.catch_birds(status,content): return def scantimeline(self): # print("scanning timeline, lastseen=",self.tl_lastseen_sid) tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) # important! make sure to iterate from older status to newer # otherwise the 'last_seen' won't be updated correctly for status in reversed(tl): self.handle_home_status(status=status) # For now we don't handle other interaction types def handle_mention(self, notification): nid = notification["id"] acc = notification["account"] status = notification["status"] sid = status["id"] print("we have a mention > ", nid) content = cleanhtml(status["content"]).replace("@cat", "").lstrip() print(content) if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH: print("invalid status content") # prioritized to commands elif content[0] == "!": self.handle_command(status, content) elif re_contains_meow.search(content) is not None: self.reply_meow(status) print("dismissing notification ", nid) self.session.notifications_dismiss(nid) def handle_follow(self, notification): nid = notification["id"] uid = notification['account']['id'] rel = self.session.account_relationships(uid) if rel[0]['following'] == False: print("follow ", uid) self.session.account_follow(uid, reblogs=False, notify=False) else: print("already following", uid) self.session.notifications_dismiss(nid) def handle_fav(self, notification): nid = notification["id"] acct = notification['account']['acct'] print(f"{acct} is petting me! purrr") self.db.insert_event(_type="fav", remarks="",correspond=acct) self.session.notifications_dismiss(nid) def handle_notification(self): ns = self.session.notifications() for n in ns: if n['type'] == "mention": self.handle_mention(n) if n['type'] == "follow": self.handle_follow(n) if n['type'] == 'favourite': self.handle_fav(n) def run(self): print("[info] running...") self.epoch = 3 while True: try: self.handle_notification() self.scantimeline() if self.epoch % 360 == 1 : # happens roughtly every 2 hours self.post_hn_news(3) self.update_intimacy() except Exception as e: print("something wrong...") print(e) time.sleep(self.config.POLL_INTERVAL) self.epoch += 1 if __name__ == "__main__": # check session data cat = VnilCat(cfg) cat.run()