summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTianhao Wang <shrik3@riseup.net>2023-03-29 14:27:53 +0200
committerTianhao Wang <shrik3@riseup.net>2023-03-29 14:27:53 +0200
commit04c0fd709b868400a826bca05c679a8c4f1abb8d (patch)
treec2af369e9b3ec53f011db93350fcc505a31b354c
parent7c19ce4ea2c9ae542d743cbdd19e4c0fcf4e3daa (diff)
add sqlite3 support
-rw-r--r--.gitignore2
-rw-r--r--cat.py214
-rw-r--r--catdb.py44
-rw-r--r--catquery.py16
-rw-r--r--config.py16
-rw-r--r--main.py198
6 files changed, 293 insertions, 197 deletions
diff --git a/.gitignore b/.gitignore
index 906ee43..6dc20f4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,5 @@ __pycache__
notes.txt
oauth.txt
./oauth.txt
+test_*
+data/
diff --git a/cat.py b/cat.py
new file mode 100644
index 0000000..544cbc7
--- /dev/null
+++ b/cat.py
@@ -0,0 +1,214 @@
+from mastodon import Mastodon
+import config as cfg
+import time
+import re
+import random
+import os
+from patterns import *
+import hnnews
+import catdb
+
+# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll
+# leave it to the future...
+def init_app(config):
+ Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL,
+ to_file=config.CLIENTID)
+ session = login_refresh_token()
+
+def login_refresh_token(config):
+ session = Mastodon(client_id=config.CLIENTID)
+ session.log_in(username=config.UNAME,
+ password=config.PW, to_file=config.TOKEN)
+ return session
+
+def restore_session(config):
+ return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma')
+
+
+class VnilCat:
+ def __init__(self, config=cfg):
+ self.config = cfg
+ self.session = restore_session(self.config)
+ self.news = []
+ self.tl_lastseen_sid = None
+ print("[booting] checking client data")
+ if not os.path.isfile(self.config.CLIENTID):
+ print("[booting] client data doesn't exist..creating...")
+ init_app(self.config)
+ try:
+ self.session.account_verify_credentials()
+ except:
+ try:
+ self.session = login_refresh_token(self.config)
+ except:
+ exit()
+
+ print("[booting] check DB .. using sqlite3")
+ try:
+ self.db = catdb.DBHandler(config)
+ except Exception as e:
+ print("[ERROR] failed to setup DB ...",e)
+ print("exit ...")
+ exit()
+
+ # try to fetch the lastest status from the timeline
+ # so that we can skip the ones before we start.
+ # This is not a persistent state machine,
+ # we don't know if a older status has been processed
+ # or not!
+ try:
+ tl = self.session.timeline_home(limit=1)
+ if (len(tl)) != 0:
+ print("[booting] set init status id to...", tl[0]["id"])
+ self.tl_lastseen_sid = tl[0]["id"]
+ except Exception as e:
+ print("[booting] can't init timeline, continue anyways", e)
+ print(f"[booting] Cat booted, all systems green, my name is {self.config.UNAME}, prepare to die, human")
+ print("-------------")
+
+ def not_mine(self, status):
+ return status['account']['acct'] != self.config.UNAME
+
+ def is_mine(self, status):
+ return status['account']['acct'] == self.config.UNAME
+
+ def reply_meow(self, ori_status):
+ print("replying meow to ", ori_status["id"])
+ self.session.status_reply(
+ to_status=ori_status, status=random.choice(cat_sounds))
+ self.db.insert_event(_type="meow",remarks="",correspond=ori_status["account"]["acct"])
+
+ def post_hn_news(self, amount=3):
+ print("posting news")
+ if amount <= 0:
+ return
+ if amount > len(self.news):
+ self.news = hnnews.get_topnews(20)
+ if amount > len(self.news):
+ amount = len(self.news)
+ status = "Hear ye, hear ye \n\n"
+ for i in range(amount):
+ n = self.news.pop(random.randrange(len(self.news)))
+ status += n["title"]
+ status += "\n"
+ status += n["url"]
+ status += "\n\n"
+ self.session.toot(status)
+
+
+ def catch_birds(self, status, content):
+ if re_contains_bird.search(content) is not None and len(content) < 10:
+ print("i see a bird", status["id"], " from ", status["account"]["acct"])
+ try:
+ s = self.session.status_reply(
+ to_status=status, status=random.choice(bird_sounds))
+ self.db.insert_event(_type="bird",remarks="",correspond=status["account"]["acct"])
+ return True
+ except:
+ print("fail to post")
+ return False
+
+ def handle_command(self, status, content):
+ print("i see a command", content)
+ cmd = content[1:].strip().split()
+ actor = status["account"]["acct"]
+ # handle cmds
+ if cmd[0] == "news":
+ if actor not in self.config.ADMINS:
+ return
+ self.post_hn_news()
+ else:
+ print("I don't understand")
+
+ def handle_home_status(self, status):
+ self.tl_lastseen_sid = status["id"]
+ if self.is_mine(status):
+ print("this one is from myself, skipping")
+ return
+ acc = status["account"]
+ content = cleanhtml(status["content"])
+ # only one action is taken. if one succeed then return
+ if self.catch_birds(status,content):
+ return
+
+ def scantimeline(self):
+ # print("scanning timeline, lastseen=",self.tl_lastseen_sid)
+ tl = self.session.timeline_home(since_id=self.tl_lastseen_sid)
+ # important! make sure to iterate from older status to newer
+ # otherwise the 'last_seen' won't be updated correctly
+ for status in reversed(tl):
+ # skip if it's from myself
+ self.handle_home_status(status=status)
+
+ # For now we don't handle other interaction types
+
+ def handle_mention(self, notification):
+ nid = notification["id"]
+ acc = notification["account"]
+ status = notification["status"]
+ sid = status["id"]
+ print("we have a mention > ", nid)
+ content = cleanhtml(status["content"]).replace("@cat", "").lstrip()
+ print(content)
+ if len(content) == 0 or len(content) > self.config.MAX_STATUS_LENGTH:
+ print("invalid status content")
+
+ # prioritized to commands
+ elif content[0] == "!":
+ self.handle_command(status, content)
+
+ elif re_contains_meow.search(content) is not None:
+ self.reply_meow(status)
+
+ print("dismissing notification ", nid)
+ self.session.notifications_dismiss(nid)
+
+ def handle_follow(self, notification):
+ nid = notification["id"]
+ uid = notification['account']['id']
+ rel = self.session.account_relationships(uid)
+ if rel[0]['following'] == False:
+ print("follow ", uid)
+ self.session.account_follow(uid, reblogs=False, notify=False)
+ else:
+ print("already following", uid)
+ self.session.notifications_dismiss(nid)
+
+ def handle_fav(self, notification):
+ nid = notification["id"]
+ acct = notification['account']['acct']
+ print(f"{acct} is petting me! purrr")
+ self.db.insert_event(_type="fav", remarks="",correspond=acct)
+ self.session.notifications_dismiss(nid)
+
+ def handle_notification(self):
+ ns = self.session.notifications()
+ for n in ns:
+ if n['type'] == "mention":
+ self.handle_mention(n)
+ if n['type'] == "follow":
+ self.handle_follow(n)
+ if n['type'] == 'favourite':
+ self.handle_fav(n)
+
+ def run(self):
+ print("[info] running...")
+ self.epoch = 3
+ while True:
+ try:
+ self.handle_notification()
+ self.scantimeline()
+ if self.epoch % 360 == 1 :
+ self.post_hn_news(3)
+ except Exception as e:
+ print("something wrong...")
+ print(e)
+ time.sleep(self.config.POLL_INTERVAL)
+ self.epoch += 1
+
+
+if __name__ == "__main__":
+ # check session data
+ cat = VnilCat(cfg)
+ cat.run()
+
diff --git a/catdb.py b/catdb.py
new file mode 100644
index 0000000..14843f8
--- /dev/null
+++ b/catdb.py
@@ -0,0 +1,44 @@
+import config as cfg
+from catquery import *
+import sqlite3
+
+class DBHandler():
+ def __init__(self,config):
+ self.DB = cfg.DB_NAME
+ self.init_db()
+ return
+
+ def init_db(self):
+ self.commit(DB_SCHEMA)
+
+ # this wrapper doesn't handle exception
+ # catch them in the caller.
+ # another remark: we don't expect much DB throughput
+ # and it's not necessary to maintain a long-lived connection
+ def commit(self, str, data=None):
+ conn = sqlite3.connect(self.DB)
+ cur = conn.cursor()
+ if data == None:
+ cur.execute(str)
+ else:
+ cur.execute(str,data)
+ conn.commit()
+ conn.close()
+
+ # read-only access
+ def query(self,str):
+ conn = sqlite3.connect(self.DB)
+ cur = conn.cursor()
+ res = cur.execute(str)
+ conn.close()
+ return res
+
+ # the insert_event exception should be handled here
+ # because this is used as a log, and should have no effect
+ # on the programs even if it fails
+ def insert_event(self, _type, remarks, correspond):
+ try:
+ self.commit(QUERY_INSERT_EVENT, (_type,remarks,correspond))
+ except Exception as e:
+ print("ERROR ","failed to insert event to the db, ignoring ",e)
+
diff --git a/catquery.py b/catquery.py
new file mode 100644
index 0000000..7b3bc5c
--- /dev/null
+++ b/catquery.py
@@ -0,0 +1,16 @@
+DB_SCHEMA = \
+"""
+CREATE TABLE IF NOT EXISTS
+ events(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ type TEXT,
+ remarks TEXT,
+ correspond TEXT
+ )
+"""
+
+QUERY_INSERT_EVENT = \
+"""
+INSERT INTO events(type, remarks, correspond) VALUES (?,?,?)
+
+"""
diff --git a/config.py b/config.py
index c550dfc..985370e 100644
--- a/config.py
+++ b/config.py
@@ -1,6 +1,7 @@
APPNAME = 'catbot'
BASEURL = 'https://vnil.de'
+## FEDIVERSE CONFIGS ---------------------
# no need to change the following ---
CLIENTID = 'client.secret'
TOKEN = 'token.secret'
@@ -18,11 +19,22 @@ ACCOUNT_SECRET = "account.secret"
try:
f = open(ACCOUNT_SECRET)
lines = f.readlines()
- UNAME = lines[0]
- PW = lines[1]
+ UNAME = lines[0].strip()
+ PW = lines[1].strip()
except:
print("account secret not found, please manually input:")
UNAME = input("username or email")
PW = input("password (not concealed)")
ADMINS = ["shrik3"]
+
+
+
+## DB CONFIGS -----------------------------
+# TODO, currently the bot and the db handler is sharing
+# the same set of config, which is not deserable, expecially
+# when the bot config has embeded script (the try..except block
+# above). For the prototyle I'm keeping it this way. May change
+# later.
+
+DB_NAME = "data/data.db"
diff --git a/main.py b/main.py
index 7124564..7b38d77 100644
--- a/main.py
+++ b/main.py
@@ -1,196 +1,4 @@
-from mastodon import Mastodon
-import config
-import time
-import re
-import random
-import os
-from patterns import *
-import hnnews
-
-
-def login_refresh_token():
- session = Mastodon(client_id=config.CLIENTID)
- session.log_in(username=config.UNAME,
- password=config.PW, to_file=config.TOKEN)
- return session
-
-def restore_session():
- return Mastodon(client_id=config.CLIENTID, access_token=config.TOKEN, feature_set='pleroma')
-
-
-class VnilCat:
- def __init__(self, config):
- self.session = restore_session()
- self.tl_lastseen_sid = None
- self.config = config
- self.news = []
- try:
- self.session.account_verify_credentials()
- except:
- try:
- self.session = login_refresh_token()
- except:
- exit()
- # try to fetch the lastest status from the timeline
- # so that we can skip the ones before we start.
- # This is not a persistent state machine,
- # we don't know if a older status has been processed
- # or not!
- try:
- tl = self.session.timeline_home(limit=1)
- if (len(tl)) != 0:
- print("[booting] set init status id to...", tl[0]["id"])
- self.tl_lastseen_sid = tl[0]["id"]
- except Exception as e:
- print("[booting] can't init timeline, continue anyways", e)
- print(f"Cat booted, all system green, my name is {self.config.UNAME}, prepare to die, human")
- print("-------------")
-
- def not_mine(self, status):
- return status['account']['acct'] != self.config.UNAME
-
- def is_mine(self, status):
- return status['account']['acct'] == self.config.UNAME
-
- def reply_meow(self, ori_status):
- print("replying meow to ", ori_status["id"])
- self.session.status_reply(
- to_status=ori_status, status=random.choice(cat_sounds))
-
- def post_hn_news(self, amount=3):
- print("posting news")
- if amount <= 0:
- return
- if amount > len(self.news):
- self.news = hnnews.get_topnews(20)
- if amount > len(self.news):
- amount = len(self.news)
- status = "Hear ye, hear ye \n\n"
- for i in range(amount):
- n = self.news.pop(random.randrange(len(self.news)))
- status += n["title"]
- status += "\n"
- status += n["url"]
- status += "\n\n"
- self.session.toot(status)
-
-
- def catch_birds(self, status, content):
- if re_contains_bird.search(content) is not None and len(content) < 10:
- print("i see a bird", status["id"], " from ", status["account"]["acct"])
- try:
- s = self.session.status_reply(
- to_status=status, status=random.choice(bird_sounds))
- return True
- except:
- print("fail to post")
- return False
-
- def handle_command(self, status, content):
- print("i see a command", content)
- cmd = content[1:].strip().split()
- actor = status["account"]["acct"]
- # handle cmds
- if cmd[0] == "news":
- if actor not in config.ADMINS:
- return
- self.post_hn_news()
- else:
- print("I don't understand")
-
- def handle_home_status(self, status):
- self.tl_lastseen_sid = status["id"]
- if self.is_mine(status):
- print("this one is from myself, skipping")
- return
- acc = status["account"]
- content = cleanhtml(status["content"])
- # only one action is taken. if one succeed then return
- if self.catch_birds(status,content):
- return
-
- def scantimeline(self):
- # print("scanning timeline, lastseen=",self.tl_lastseen_sid)
- tl = self.session.timeline_home(since_id=self.tl_lastseen_sid)
- # important! make sure to iterate from older status to newer
- # otherwise the 'last_seen' won't be updated correctly
- for status in reversed(tl):
- # skip if it's from myself
- self.handle_home_status(status=status)
-
- # For now we don't handle other interaction types
-
- def handle_mention(self, notification):
- nid = notification["id"]
- acc = notification["account"]
- status = notification["status"]
- sid = status["id"]
- print("we have a mention > ", nid)
- content = cleanhtml(status["content"]).replace("@cat", "").lstrip()
- print(content)
- if len(content) == 0 or len(content) > config.MAX_STATUS_LENGTH:
- print("invalid status content")
-
- # prioritized to commands
- elif content[0] == "!":
- self.handle_command(status, content)
-
- elif re_contains_meow.search(content) is not None:
- self.reply_meow(status)
-
- print("dismissing notification ", nid)
- self.session.notifications_dismiss(nid)
-
- def handle_follow(self, notification):
- nid = notification["id"]
- uid = notification['account']['id']
- rel = self.session.account_relationships(uid)
- if rel[0]['following'] == False:
- print("follow ", uid)
- self.session.account_follow(uid, reblogs=False, notify=False)
- else:
- print("already following", uid)
- self.session.notifications_dismiss(nid)
-
- def handle_notification(self):
- ns = self.session.notifications()
- for n in ns:
- if n['type'] == "mention":
- self.handle_mention(n)
- if n['type'] == "follow":
- self.handle_follow(n)
-
- def run(self):
- self.epoch = 3
- while True:
- try:
- self.handle_notification()
- self.scantimeline()
- if self.epoch % 360 == 1 :
- self.post_hn_news(3)
- except Exception as e:
- print("something wrong...")
- print(e)
- time.sleep(self.config.POLL_INTERVAL)
- self.epoch += 1
-
-# TODO re-organize auth mgmt. But it involves a lot of mamual testing so I'll
-# leave it to the future...
-def init_bot():
- Mastodon.create_app(config.APPNAME, api_base_url=config.BASEURL,
- to_file=config.CLIENTID)
- session = login_refresh_token()
-
-# TODO use sched
-def run():
- cat = VnilCat(config)
- cat.run()
-
-
+import cat
if __name__ == "__main__":
- # check session data
- if not os.path.isfile(config.CLIENTID):
- print("client data doesn't exist..creating...")
- init_bot()
-
- run()
+ cat = cat.VnilCat()
+ cat.run()