diff options
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | cat.py | 214 | ||||
| -rw-r--r-- | catdb.py | 44 | ||||
| -rw-r--r-- | catquery.py | 16 | ||||
| -rw-r--r-- | config.py | 16 | ||||
| -rw-r--r-- | main.py | 198 |
6 files changed, 293 insertions, 197 deletions
@@ -3,3 +3,5 @@ __pycache__ notes.txt oauth.txt ./oauth.txt +test_* +data/ @@ -0,0 +1,214 @@ +from mastodon import Mastodon +import config as cfg +import time +import re +import random +import os +from patterns import * +import hnnews +import catdb + +# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll +# leave it to the future... +def init_app(config): + Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, + to_file=config.CLIENTID) + session = login_refresh_token() + +def login_refresh_token(config): + session = Mastodon(client_id=config.CLIENTID) + session.log_in(username=config.UNAME, + password=config.PW, to_file=config.TOKEN) + return session + +def restore_session(config): + return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') + + +class VnilCat: + def __init__(self, config=cfg): + self.config = cfg + self.session = restore_session(self.config) + self.news = [] + self.tl_lastseen_sid = None + print("[booting] checking client data") + if not os.path.isfile(self.config.CLIENTID): + print("[booting] client data doesn't exist..creating...") + init_app(self.config) + try: + self.session.account_verify_credentials() + except: + try: + self.session = login_refresh_token(self.config) + except: + exit() + + print("[booting] check DB .. using sqlite3") + try: + self.db = catdb.DBHandler(config) + except Exception as e: + print("[ERROR] failed to setup DB ...",e) + print("exit ...") + exit() + + # try to fetch the lastest status from the timeline + # so that we can skip the ones before we start. + # This is not a persistent state machine, + # we don't know if a older status has been processed + # or not! + try: + tl = self.session.timeline_home(limit=1) + if (len(tl)) != 0: + print("[booting] set init status id to...", tl[0]["id"]) + self.tl_lastseen_sid = tl[0]["id"] + except Exception as e: + print("[booting] can't init timeline, continue anyways", e) + print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human") + print("-------------") + + def not_mine(self, status): + return status['account']['acct'] != self.config.UNAME + + def is_mine(self, status): + return status['account']['acct'] == self.config.UNAME + + def reply_meow(self, ori_status): + print("replying meow to ", ori_status["id"]) + self.session.status_reply( + to_status=ori_status, status=random.choice(cat_sounds)) + self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"]) + + def post_hn_news(self, amount=3): + print("posting news") + if amount <= 0: + return + if amount > len(self.news): + self.news = hnnews.get_topnews(20) + if amount > len(self.news): + amount = len(self.news) + status = "Hear ye, hear ye \n\n" + for i in range(amount): + n = self.news.pop(random.randrange(len(self.news))) + status += n["title"] + status += "\n" + status += n["url"] + status += "\n\n" + self.session.toot(status) + + + def catch_birds(self, status, content): + if re_contains_bird.search(content) is not None and len(content) < 10: + print("i see a bird", status["id"], " from ", status["account"]["acct"]) + try: + s = self.session.status_reply( + to_status=status, status=random.choice(bird_sounds)) + self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"]) + return True + except: + print("fail to post") + return False + + def handle_command(self, status, content): + print("i see a command", content) + cmd = content[1:].strip().split() + actor = status["account"]["acct"] + # handle cmds + if cmd[0] == "news": + if actor not in self.config.ADMINS: + return + self.post_hn_news() + else: + print("I don't understand") + + def handle_home_status(self, status): + self.tl_lastseen_sid = status["id"] + if self.is_mine(status): + print("this one is from myself, skipping") + return + acc = status["account"] + content = cleanhtml(status["content"]) + # only one action is taken. if one succeed then return + if self.catch_birds(status,content): + return + + def scantimeline(self): + # print("scanning timeline, lastseen=",self.tl_lastseen_sid) + tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) + # important! make sure to iterate from older status to newer + # otherwise the 'last_seen' won't be updated correctly + for status in reversed(tl): + # skip if it's from myself + self.handle_home_status(status=status) + + # For now we don't handle other interaction types + + def handle_mention(self, notification): + nid = notification["id"] + acc = notification["account"] + status = notification["status"] + sid = status["id"] + print("we have a mention > ", nid) + content = cleanhtml(status["content"]).replace("@cat", "").lstrip() + print(content) + if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH: + print("invalid status content") + + # prioritized to commands + elif content[0] == "!": + self.handle_command(status, content) + + elif re_contains_meow.search(content) is not None: + self.reply_meow(status) + + print("dismissing notification ", nid) + self.session.notifications_dismiss(nid) + + def handle_follow(self, notification): + nid = notification["id"] + uid = notification['account']['id'] + rel = self.session.account_relationships(uid) + if rel[0]['following'] == False: + print("follow ", uid) + self.session.account_follow(uid, reblogs=False, notify=False) + else: + print("already following", uid) + self.session.notifications_dismiss(nid) + + def handle_fav(self, notification): + nid = notification["id"] + acct = notification['account']['acct'] + print(f"{acct} is petting me! purrr") + self.db.insert_event(_type="fav", remarks="",correspond=acct) + self.session.notifications_dismiss(nid) + + def handle_notification(self): + ns = self.session.notifications() + for n in ns: + if n['type'] == "mention": + self.handle_mention(n) + if n['type'] == "follow": + self.handle_follow(n) + if n['type'] == 'favourite': + self.handle_fav(n) + + def run(self): + print("[info] running...") + self.epoch = 3 + while True: + try: + self.handle_notification() + self.scantimeline() + if self.epoch % 360 == 1 : + self.post_hn_news(3) + except Exception as e: + print("something wrong...") + print(e) + time.sleep(self.config.POLL_INTERVAL) + self.epoch += 1 + + +if __name__ == "__main__": + # check session data + cat = VnilCat(cfg) + cat.run() + diff --git a/catdb.py b/catdb.py new file mode 100644 index 0000000..14843f8 --- /dev/null +++ b/catdb.py @@ -0,0 +1,44 @@ +import config as cfg +from catquery import * +import sqlite3 + +class DBHandler(): + def __init__(self,config): + self.DB = cfg.DB_NAME + self.init_db() + return + + def init_db(self): + self.commit(DB_SCHEMA) + + # this wrapper doesn't handle exception + # catch them in the caller. + # another remark: we don't expect much DB throughput + # and it's not necessary to maintain a long-lived connection + def commit(self, str, data=None): + conn = sqlite3.connect(self.DB) + cur = conn.cursor() + if data == None: + cur.execute(str) + else: + cur.execute(str,data) + conn.commit() + conn.close() + + # read-only access + def query(self,str): + conn = sqlite3.connect(self.DB) + cur = conn.cursor() + res = cur.execute(str) + conn.close() + return res + + # the insert_event exception should be handled here + # because this is used as a log, and should have no effect + # on the programs even if it fails + def insert_event(self, _type, remarks, correspond): + try: + self.commit(QUERY_INSERT_EVENT, (_type,remarks,correspond)) + except Exception as e: + print("ERROR ","failed to insert event to the db, ignoring ",e) + diff --git a/catquery.py b/catquery.py new file mode 100644 index 0000000..7b3bc5c --- /dev/null +++ b/catquery.py @@ -0,0 +1,16 @@ +DB_SCHEMA = \ +""" +CREATE TABLE IF NOT EXISTS + events( + id INTEGER PRIMARY KEY AUTOINCREMENT, + type TEXT, + remarks TEXT, + correspond TEXT + ) +""" + +QUERY_INSERT_EVENT = \ +""" +INSERT INTO events(type, remarks, correspond) VALUES (?,?,?) + +""" @@ -1,6 +1,7 @@ APPNAME = 'catbot' BASEURL = 'https://vnil.de' +## FEDIVERSE CONFIGS --------------------- # no need to change the following --- CLIENTID = 'client.secret' TOKEN = 'token.secret' @@ -18,11 +19,22 @@ ACCOUNT_SECRET = "account.secret" try: f = open(ACCOUNT_SECRET) lines = f.readlines() - UNAME = lines[0] - PW = lines[1] + UNAME = lines[0].strip() + PW = lines[1].strip() except: print("account secret not found, please manually input:") UNAME = input("username or email") PW = input("password (not concealed)") ADMINS = ["shrik3"] + + + +## DB CONFIGS ----------------------------- +# TODO, currently the bot and the db handler is sharing +# the same set of config, which is not deserable, expecially +# when the bot config has embeded script (the try..except block +# above). For the prototyle I'm keeping it this way. May change +# later. + +DB_NAME = "data/data.db" @@ -1,196 +1,4 @@ -from mastodon import Mastodon -import config -import time -import re -import random -import os -from patterns import * -import hnnews - - -def login_refresh_token(): - session = Mastodon(client_id=config.CLIENTID) - session.log_in(username=config.UNAME, - password=config.PW, to_file=config.TOKEN) - return session - -def restore_session(): - return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma') - - -class VnilCat: - def __init__(self, config): - self.session = restore_session() - self.tl_lastseen_sid = None - self.config = config - self.news = [] - try: - self.session.account_verify_credentials() - except: - try: - self.session = login_refresh_token() - except: - exit() - # try to fetch the lastest status from the timeline - # so that we can skip the ones before we start. - # This is not a persistent state machine, - # we don't know if a older status has been processed - # or not! - try: - tl = self.session.timeline_home(limit=1) - if (len(tl)) != 0: - print("[booting] set init status id to...", tl[0]["id"]) - self.tl_lastseen_sid = tl[0]["id"] - except Exception as e: - print("[booting] can't init timeline, continue anyways", e) - print(f"Cat booted, all system green, my name is {self.config.UNAME}, prepare to die, human") - print("-------------") - - def not_mine(self, status): - return status['account']['acct'] != self.config.UNAME - - def is_mine(self, status): - return status['account']['acct'] == self.config.UNAME - - def reply_meow(self, ori_status): - print("replying meow to ", ori_status["id"]) - self.session.status_reply( - to_status=ori_status, status=random.choice(cat_sounds)) - - def post_hn_news(self, amount=3): - print("posting news") - if amount <= 0: - return - if amount > len(self.news): - self.news = hnnews.get_topnews(20) - if amount > len(self.news): - amount = len(self.news) - status = "Hear ye, hear ye \n\n" - for i in range(amount): - n = self.news.pop(random.randrange(len(self.news))) - status += n["title"] - status += "\n" - status += n["url"] - status += "\n\n" - self.session.toot(status) - - - def catch_birds(self, status, content): - if re_contains_bird.search(content) is not None and len(content) < 10: - print("i see a bird", status["id"], " from ", status["account"]["acct"]) - try: - s = self.session.status_reply( - to_status=status, status=random.choice(bird_sounds)) - return True - except: - print("fail to post") - return False - - def handle_command(self, status, content): - print("i see a command", content) - cmd = content[1:].strip().split() - actor = status["account"]["acct"] - # handle cmds - if cmd[0] == "news": - if actor not in config.ADMINS: - return - self.post_hn_news() - else: - print("I don't understand") - - def handle_home_status(self, status): - self.tl_lastseen_sid = status["id"] - if self.is_mine(status): - print("this one is from myself, skipping") - return - acc = status["account"] - content = cleanhtml(status["content"]) - # only one action is taken. if one succeed then return - if self.catch_birds(status,content): - return - - def scantimeline(self): - # print("scanning timeline, lastseen=",self.tl_lastseen_sid) - tl = self.session.timeline_home(since_id=self.tl_lastseen_sid) - # important! make sure to iterate from older status to newer - # otherwise the 'last_seen' won't be updated correctly - for status in reversed(tl): - # skip if it's from myself - self.handle_home_status(status=status) - - # For now we don't handle other interaction types - - def handle_mention(self, notification): - nid = notification["id"] - acc = notification["account"] - status = notification["status"] - sid = status["id"] - print("we have a mention > ", nid) - content = cleanhtml(status["content"]).replace("@cat", "").lstrip() - print(content) - if len(content) == 0 or len(content) > config.MAX_STATUS_LENGTH: - print("invalid status content") - - # prioritized to commands - elif content[0] == "!": - self.handle_command(status, content) - - elif re_contains_meow.search(content) is not None: - self.reply_meow(status) - - print("dismissing notification ", nid) - self.session.notifications_dismiss(nid) - - def handle_follow(self, notification): - nid = notification["id"] - uid = notification['account']['id'] - rel = self.session.account_relationships(uid) - if rel[0]['following'] == False: - print("follow ", uid) - self.session.account_follow(uid, reblogs=False, notify=False) - else: - print("already following", uid) - self.session.notifications_dismiss(nid) - - def handle_notification(self): - ns = self.session.notifications() - for n in ns: - if n['type'] == "mention": - self.handle_mention(n) - if n['type'] == "follow": - self.handle_follow(n) - - def run(self): - self.epoch = 3 - while True: - try: - self.handle_notification() - self.scantimeline() - if self.epoch % 360 == 1 : - self.post_hn_news(3) - except Exception as e: - print("something wrong...") - print(e) - time.sleep(self.config.POLL_INTERVAL) - self.epoch += 1 - -# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll -# leave it to the future... -def init_bot(): - Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL, - to_file=config.CLIENTID) - session = login_refresh_token() - -# TODO use sched -def run(): - cat = VnilCat(config) - cat.run() - - +import cat if __name__ == "__main__": - # check session data - if not os.path.isfile(config.CLIENTID): - print("client data doesn't exist..creating...") - init_bot() - - run() + cat = cat.VnilCat() + cat.run() |
