from mastodon import Mastodon import config import time import re import random import os from patterns import * import hnnews def login_refresh_token(): session = Mastodon(client_id=config.CLIENTID) session.log_in(username=config.UNAME, password=config.PW, to_file=config.TOKEN) return session def restore_session(): return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') class VnilCat: def __init__(self, config): self.session = restore_session() self.tl_lastseen_sid = None self.config = config self.news = [] try: self.session.account_verify_credentials() except: try: self.session = login_refresh_token() except: exit() # try to fetch the lastest status from the timeline # so that we can skip the ones before we start. # This is not a persistent state machine, # we don't know if a older status has been processed # or not! try: tl = self.session.timeline_home(limit=1) if (len(tl)) != 0: print("set init status id to begin with...", tl[0]["id"]) self.tl_lastseen_sid = tl[0]["id"] except Exception as e: print("can't init timeline, continue anyways", e) def not_mine(self, status): return status['account']['acct'] != self.config.UNAME def is_mine(self, status): return status['account']['acct'] == self.config.UNAME def reply_meow(self, ori_status): print("replying meow to ", ori_status["id"]) self.session.status_reply( to_status=ori_status, status=random.choice(cat_sounds)) def post_hn_news(self, amount=3): if amount <= 0: return if amount > len(self.news): self.news = hnnews.get_topnews(20) if amount > len(self.news): amount = len(self.news) status = "Hear ye, hear ye \n\n" for i in range(amount): n = self.news.pop(random.randrange(len(self.news))) status += n["title"] status += "\n" status += n["url"] status += "\n\n" self.session.toot(status) def handle_command(self, status, content): print("we have a command", content) cmd = content[1:].strip().split() actor = status["account"]["acct"] # handle cmds if cmd[0] == "news": if actor not in config.ADMINS: return self.post_hn_news() def handle_home_status(self, status): self.tl_lastseen_sid = status["id"] if self.is_mine(status): return acc = status["account"] content = cleanhtml(status["content"]) if re_contains_bird.search(content) is not None and len(content) < 10: print("i see a bird", status["id"]) try: s = self.session.status_reply( to_status=status, status=random.choice(bird_sounds)) except: print("fail to post") def scantimeline(self): print("scanning timeline, lastseen=",self.tl_lastseen_sid) tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) # important! make sure to iterate from older status to newer # otherwise the 'last_seen' won't be updated correctly for status in reversed(tl): # skip if it's from myself self.handle_home_status(status=status) # For now we don't handle other interaction types def handle_mention(self, notification): nid = notification["id"] acc = notification["account"] status = notification["status"] sid = status["id"] print("we have a mention > ", nid) content = cleanhtml(status["content"]).replace("@cat", "").lstrip() print(content) if len(content) == 0 or len(content) > config.MAX_STATUS_LENGTH: print("invalid status content") # prioritized to commands elif content[0] == "!": self.handle_command(status, content) elif re_contains_meow.search(content) is not None: self.reply_meow(status) print("dismissing notification ", nid) self.session.notifications_dismiss(nid) def handle_follow(self, notification): nid = notification["id"] uid = notification['account']['id'] rel = self.session.account_relationships(uid) if rel[0]['following'] == False: print("follow ", uid) self.session.account_follow(uid, reblogs=False, notify=False) else: print("already following", uid) self.session.notifications_dismiss(nid) def handle_notification(self): ns = self.session.notifications() for n in ns: if n['type'] == "mention": self.handle_mention(n) if n['type'] == "follow": self.handle_follow(n) # TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll # leave it to the future... def init_bot(): Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, to_file=config.CLIENTID) session = login_refresh_token() # TODO use sched def run(): cat = VnilCat(config) epoch = 1 while True: try: cat.handle_notification() cat.scantimeline() if epoch % 360 == 1 : cat.post_hn_news(3) except Exception as e: print("something wrong...") print(e) time.sleep(config.POLL_INTERVAL) epoch += 1 if __name__ == "__main__": # check session data if not os.path.isfile(config.CLIENTID): print("client data doesn't exist..creating...") init_bot() run()