diff options
Diffstat (limited to 'cat.py')
| -rw-r--r-- | cat.py | 214 |
1 files changed, 214 insertions, 0 deletions
@@ -0,0 +1,214 @@ +from mastodon import Mastodon +import config as cfg +import time +import re +import random +import os +from patterns import * +import hnnews +import catdb + +# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll +# leave it to the future... +def init_app(config): + Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, + to_file=config.CLIENTID) + session = login_refresh_token() + +def login_refresh_token(config): + session = Mastodon(client_id=config.CLIENTID) + session.log_in(username=config.UNAME, + password=config.PW, to_file=config.TOKEN) + return session + +def restore_session(config): + return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') + + +class VnilCat: + def __init__(self, config=cfg): + self.config = cfg + self.session = restore_session(self.config) + self.news = [] + self.tl_lastseen_sid = None + print("[booting] checking client data") + if not os.path.isfile(self.config.CLIENTID): + print("[booting] client data doesn't exist..creating...") + init_app(self.config) + try: + self.session.account_verify_credentials() + except: + try: + self.session = login_refresh_token(self.config) + except: + exit() + + print("[booting] check DB .. using sqlite3") + try: + self.db = catdb.DBHandler(config) + except Exception as e: + print("[ERROR] failed to setup DB ...",e) + print("exit ...") + exit() + + # try to fetch the lastest status from the timeline + # so that we can skip the ones before we start. + # This is not a persistent state machine, + # we don't know if a older status has been processed + # or not! + try: + tl = self.session.timeline_home(limit=1) + if (len(tl)) != 0: + print("[booting] set init status id to...", tl[0]["id"]) + self.tl_lastseen_sid = tl[0]["id"] + except Exception as e: + print("[booting] can't init timeline, continue anyways", e) + print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human") + print("-------------") + + def not_mine(self, status): + return status['account']['acct'] != self.config.UNAME + + def is_mine(self, status): + return status['account']['acct'] == self.config.UNAME + + def reply_meow(self, ori_status): + print("replying meow to ", ori_status["id"]) + self.session.status_reply( + to_status=ori_status, status=random.choice(cat_sounds)) + self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"]) + + def post_hn_news(self, amount=3): + print("posting news") + if amount <= 0: + return + if amount > len(self.news): + self.news = hnnews.get_topnews(20) + if amount > len(self.news): + amount = len(self.news) + status = "Hear ye, hear ye \n\n" + for i in range(amount): + n = self.news.pop(random.randrange(len(self.news))) + status += n["title"] + status += "\n" + status += n["url"] + status += "\n\n" + self.session.toot(status) + + + def catch_birds(self, status, content): + if re_contains_bird.search(content) is not None and len(content) < 10: + print("i see a bird", status["id"], " from ", status["account"]["acct"]) + try: + s = self.session.status_reply( + to_status=status, status=random.choice(bird_sounds)) + self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"]) + return True + except: + print("fail to post") + return False + + def handle_command(self, status, content): + print("i see a command", content) + cmd = content[1:].strip().split() + actor = status["account"]["acct"] + # handle cmds + if cmd[0] == "news": + if actor not in self.config.ADMINS: + return + self.post_hn_news() + else: + print("I don't understand") + + def handle_home_status(self, status): + self.tl_lastseen_sid = status["id"] + if self.is_mine(status): + print("this one is from myself, skipping") + return + acc = status["account"] + content = cleanhtml(status["content"]) + # only one action is taken. if one succeed then return + if self.catch_birds(status,content): + return + + def scantimeline(self): + # print("scanning timeline, lastseen=",self.tl_lastseen_sid) + tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) + # important! make sure to iterate from older status to newer + # otherwise the 'last_seen' won't be updated correctly + for status in reversed(tl): + # skip if it's from myself + self.handle_home_status(status=status) + + # For now we don't handle other interaction types + + def handle_mention(self, notification): + nid = notification["id"] + acc = notification["account"] + status = notification["status"] + sid = status["id"] + print("we have a mention > ", nid) + content = cleanhtml(status["content"]).replace("@cat", "").lstrip() + print(content) + if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH: + print("invalid status content") + + # prioritized to commands + elif content[0] == "!": + self.handle_command(status, content) + + elif re_contains_meow.search(content) is not None: + self.reply_meow(status) + + print("dismissing notification ", nid) + self.session.notifications_dismiss(nid) + + def handle_follow(self, notification): + nid = notification["id"] + uid = notification['account']['id'] + rel = self.session.account_relationships(uid) + if rel[0]['following'] == False: + print("follow ", uid) + self.session.account_follow(uid, reblogs=False, notify=False) + else: + print("already following", uid) + self.session.notifications_dismiss(nid) + + def handle_fav(self, notification): + nid = notification["id"] + acct = notification['account']['acct'] + print(f"{acct} is petting me! purrr") + self.db.insert_event(_type="fav", remarks="",correspond=acct) + self.session.notifications_dismiss(nid) + + def handle_notification(self): + ns = self.session.notifications() + for n in ns: + if n['type'] == "mention": + self.handle_mention(n) + if n['type'] == "follow": + self.handle_follow(n) + if n['type'] == 'favourite': + self.handle_fav(n) + + def run(self): + print("[info] running...") + self.epoch = 3 + while True: + try: + self.handle_notification() + self.scantimeline() + if self.epoch % 360 == 1 : + self.post_hn_news(3) + except Exception as e: + print("something wrong...") + print(e) + time.sleep(self.config.POLL_INTERVAL) + self.epoch += 1 + + +if __name__ == "__main__": + # check session data + cat = VnilCat(cfg) + cat.run() + |
