summaryrefslogtreecommitdiff
path: root/cat.py
diff options
context:
space:
mode:
authorTianhao Wang <shrik3@riseup.net>2023-03-29 14:27:53 +0200
committerTianhao Wang <shrik3@riseup.net>2023-03-29 14:27:53 +0200
commit04c0fd709b868400a826bca05c679a8c4f1abb8d (patch)
treec2af369e9b3ec53f011db93350fcc505a31b354c /cat.py
parent7c19ce4ea2c9ae542d743cbdd19e4c0fcf4e3daa (diff)
add sqlite3 support
Diffstat (limited to 'cat.py')
-rw-r--r--cat.py214
1 files changed, 214 insertions, 0 deletions
diff --git a/cat.py b/cat.py
new file mode 100644
index 0000000..544cbc7
--- /dev/null
+++ b/cat.py
@@ -0,0 +1,214 @@
+from mastodon import Mastodon
+import config as cfg
+import time
+import re
+import random
+import os
+from patterns import *
+import hnnews
+import catdb
+
+# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll
+# leave it to the future...
+def init_app(config):
+ Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL,
+ to_file=config.CLIENTID)
+ session = login_refresh_token()
+
+def login_refresh_token(config):
+ session = Mastodon(client_id=config.CLIENTID)
+ session.log_in(username=config.UNAME,
+ password=config.PW, to_file=config.TOKEN)
+ return session
+
+def restore_session(config):
+ return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma')
+
+
+class VnilCat:
+ def __init__(self, config=cfg):
+ self.config = cfg
+ self.session = restore_session(self.config)
+ self.news = []
+ self.tl_lastseen_sid = None
+ print("[booting] checking client data")
+ if not os.path.isfile(self.config.CLIENTID):
+ print("[booting] client data doesn't exist..creating...")
+ init_app(self.config)
+ try:
+ self.session.account_verify_credentials()
+ except:
+ try:
+ self.session = login_refresh_token(self.config)
+ except:
+ exit()
+
+ print("[booting] check DB .. using sqlite3")
+ try:
+ self.db = catdb.DBHandler(config)
+ except Exception as e:
+ print("[ERROR] failed to setup DB ...",e)
+ print("exit ...")
+ exit()
+
+ # try to fetch the lastest status from the timeline
+ # so that we can skip the ones before we start.
+ # This is not a persistent state machine,
+ # we don't know if a older status has been processed
+ # or not!
+ try:
+ tl = self.session.timeline_home(limit=1)
+ if (len(tl)) != 0:
+ print("[booting] set init status id to...", tl[0]["id"])
+ self.tl_lastseen_sid = tl[0]["id"]
+ except Exception as e:
+ print("[booting] can't init timeline, continue anyways", e)
+ print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human")
+ print("-------------")
+
+ def not_mine(self, status):
+ return status['account']['acct'] != self.config.UNAME
+
+ def is_mine(self, status):
+ return status['account']['acct'] == self.config.UNAME
+
+ def reply_meow(self, ori_status):
+ print("replying meow to ", ori_status["id"])
+ self.session.status_reply(
+ to_status=ori_status, status=random.choice(cat_sounds))
+ self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"])
+
+ def post_hn_news(self, amount=3):
+ print("posting news")
+ if amount <= 0:
+ return
+ if amount > len(self.news):
+ self.news = hnnews.get_topnews(20)
+ if amount > len(self.news):
+ amount = len(self.news)
+ status = "Hear ye, hear ye \n\n"
+ for i in range(amount):
+ n = self.news.pop(random.randrange(len(self.news)))
+ status += n["title"]
+ status += "\n"
+ status += n["url"]
+ status += "\n\n"
+ self.session.toot(status)
+
+
+ def catch_birds(self, status, content):
+ if re_contains_bird.search(content) is not None and len(content) < 10:
+ print("i see a bird", status["id"], " from ", status["account"]["acct"])
+ try:
+ s = self.session.status_reply(
+ to_status=status, status=random.choice(bird_sounds))
+ self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"])
+ return True
+ except:
+ print("fail to post")
+ return False
+
+ def handle_command(self, status, content):
+ print("i see a command", content)
+ cmd = content[1:].strip().split()
+ actor = status["account"]["acct"]
+ # handle cmds
+ if cmd[0] == "news":
+ if actor not in self.config.ADMINS:
+ return
+ self.post_hn_news()
+ else:
+ print("I don't understand")
+
+ def handle_home_status(self, status):
+ self.tl_lastseen_sid = status["id"]
+ if self.is_mine(status):
+ print("this one is from myself, skipping")
+ return
+ acc = status["account"]
+ content = cleanhtml(status["content"])
+ # only one action is taken. if one succeed then return
+ if self.catch_birds(status,content):
+ return
+
+ def scantimeline(self):
+ # print("scanning timeline, lastseen=",self.tl_lastseen_sid)
+ tl = self.session.timeline_home(since_id=self.tl_lastseen_sid)
+ # important! make sure to iterate from older status to newer
+ # otherwise the 'last_seen' won't be updated correctly
+ for status in reversed(tl):
+ # skip if it's from myself
+ self.handle_home_status(status=status)
+
+ # For now we don't handle other interaction types
+
+ def handle_mention(self, notification):
+ nid = notification["id"]
+ acc = notification["account"]
+ status = notification["status"]
+ sid = status["id"]
+ print("we have a mention > ", nid)
+ content = cleanhtml(status["content"]).replace("@cat", "").lstrip()
+ print(content)
+ if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH:
+ print("invalid status content")
+
+ # prioritized to commands
+ elif content[0] == "!":
+ self.handle_command(status, content)
+
+ elif re_contains_meow.search(content) is not None:
+ self.reply_meow(status)
+
+ print("dismissing notification ", nid)
+ self.session.notifications_dismiss(nid)
+
+ def handle_follow(self, notification):
+ nid = notification["id"]
+ uid = notification['account']['id']
+ rel = self.session.account_relationships(uid)
+ if rel[0]['following'] == False:
+ print("follow ", uid)
+ self.session.account_follow(uid, reblogs=False, notify=False)
+ else:
+ print("already following", uid)
+ self.session.notifications_dismiss(nid)
+
+ def handle_fav(self, notification):
+ nid = notification["id"]
+ acct = notification['account']['acct']
+ print(f"{acct} is petting me! purrr")
+ self.db.insert_event(_type="fav", remarks="",correspond=acct)
+ self.session.notifications_dismiss(nid)
+
+ def handle_notification(self):
+ ns = self.session.notifications()
+ for n in ns:
+ if n['type'] == "mention":
+ self.handle_mention(n)
+ if n['type'] == "follow":
+ self.handle_follow(n)
+ if n['type'] == 'favourite':
+ self.handle_fav(n)
+
+ def run(self):
+ print("[info] running...")
+ self.epoch = 3
+ while True:
+ try:
+ self.handle_notification()
+ self.scantimeline()
+ if self.epoch % 360 == 1 :
+ self.post_hn_news(3)
+ except Exception as e:
+ print("something wrong...")
+ print(e)
+ time.sleep(self.config.POLL_INTERVAL)
+ self.epoch += 1
+
+
+if __name__ == "__main__":
+ # check session data
+ cat = VnilCat(cfg)
+ cat.run()
+