import datetime import json import os import re import threading import time import subprocess from model import Database, Notification, User from utils import plural_s pjoin = os.path.join HOUR_RANGE = re.compile(r'(\d+)-(\d+)') def is_hour_in_range(hour, hour_range): try: m = HOUR_RANGE.match(hour_range) begin, end = int(m[1]), int(m[2]) if begin < end: return hour >= begin and hour <= end else: # Range crosses midnight. return hour >= begin or hour <= end except: return False class Emailer (threading.Thread): def __init__(self, capsule, hostname, port, cfg): super().__init__() self.capsule = capsule self.cfg = cfg self.hostname = hostname self.server_link = f'gemini://{hostname}' if port != 1965: self.server_link += f':{port}' # FIXME: These defaults are also in 50_bubble.py, should be DRY. self.site_icon = cfg.get('icon', '💬') self.site_name = cfg.get('name', 'Bubble') self.site_info = cfg.get('info', "Bulletin Boards for Gemini") self.email_interval = int(cfg.get("email.interval", 60 * 5)) self.email_cmd = cfg.get("email.cmd", "") self.email_from = cfg.get('email.from', 'nobody@example.com') self.email_footer = f'\n---\n{self.site_name}: {self.site_info}\n' def send_notifications(self, db): cur = db.conn.cursor() # Find users with unsent notifications. cur.execute(""" SELECT u.id, name, email, email_range, notif FROM users u JOIN notifs n ON u.id=n.dst AND n.is_sent=FALSE WHERE email!='' AND notif!=0 AND (flags & ?) != 0 AND TIMESTAMPDIFF(MINUTE, ts_email, CURRENT_TIMESTAMP())>email_inter """, (User.EMAIL_CONFIRMED_FLAG,)) pending_notifs = [] cur_hour = datetime.datetime.now(datetime.timezone.utc).hour for (id, name, email, email_range, enabled_types) in cur: # Check that the current hour is not excluded. if is_hour_in_range(cur_hour, email_range): continue pending_notifs.append(User(id, name, None, None, None, None, None, None, None, enabled_types, email, None, email_range, None, None, None, None, None, None, None, None, None)) messages = {} footer = \ f'\nView notifications in your Dashboard:\n=> {self.server_link}/dashboard\n' + \ f'\nChange notification settings:\n=> {self.server_link}/settings/notif\n' + \ self.email_footer for user in pending_notifs: notifs = db.get_notifications(user, only_unsent=True) count = 0 body = '' def personal_first(n): prio = Notification.PRIORITY[n.type] if n.type in Notification.PRIORITY else 10 return (-prio, n.ts) for notif in sorted(notifs, key=personal_first): count += 1 _, label = notif.entry(show_age=False) body += label + '\n\n' if count: subject = f'{user.name}: {count} new notification{plural_s(count)}' messages[user.email] = (subject, body + footer) # Mark everything as sent. if len(pending_notifs): user_ids = list(map(lambda u: u.id, pending_notifs)) placeholders = ','.join(map(str, user_ids)) cur.execute(f""" UPDATE notifs SET is_sent=TRUE WHERE dst IN ({placeholders}) """) cur.execute( f"UPDATE users SET ts_email=CURRENT_TIMESTAMP() WHERE id IN ({placeholders})") db.commit() cur = None db.close() for email in messages: subject, body = messages[email] try: self.send_mail(email, subject, body) except Exception as x: print('Emailer error:', x) def send_mail(self, to, subject, body): msg = (f'From: {self.site_name} <{self.email_from}>\n' f'To: {to}\n' f'Subject: {subject}\n\n' + body) args = [self.email_cmd, '-i', to] if self.email_cmd == 'stdout': print(args, msg) else: subprocess.check_output(args, input=msg, encoding='utf-8') def send_pending_confirmations(self, db): for user_id in db.pop_tasks(Database.TASK_CONFIRM_EMAIL): user = db.get_user(user_id) if not user or not user.email: continue subject = f'Confirm your email address' body = (f'To confirm your notification email address, open this link:\n\n' f'=> {self.server_link}/confirm-email/{db.get_token(user)}\n\n' f'The link expires in 1 hour.\n' + self.email_footer) try: self.send_mail(user.email, subject, body) except Exception as x: print('Emailer error (confirmation):', x) def run(self): if not self.email_cmd: # Emailter is disabled. return print(" Emailer is running") next_notif = 0.0 while not self.capsule.shutdown_event().wait(5): db = Database(self.cfg) try: self.send_pending_confirmations(db) except Exception as x: print('Emailer error (confirmation):', x) finally: db.close() now = time.monotonic() if now >= next_notif: db = Database(self.cfg) try: self.send_notifications(db) except: import traceback traceback.print_last() finally: db.close() next_notif = time.monotonic() + self.email_interval class RepoFetcher: INTERVAL = 60 * 20 class Git: """Helper for running git in a specific directory.""" def __init__(self, cmd, path): self.cmd = cmd self.path = path def run(self, args, as_bytes=False, without_path=False): result = subprocess.check_output( ([self.cmd] if without_path else [self.cmd, '-C', self.path]) + args ) if as_bytes: return result return result.decode('utf-8').rstrip() def log(self, count=None, skip=0): try: count_arg = [f'-n{count}'] if count else [] out = self.run([ 'log', '--all'] + count_arg + [f'--skip={skip}', "--pretty=format:{^@^fullHash^@^:^@^%H^@^,^@^message^@^:^@^%s^@^,^@^body^@^:^@^%b^@^,^@^commitDate^@^:^@^%ai^@^},^@&@^" ]) # Remove git comment lines (lines starting with #). out = re.sub(r'(?m)^#[^\n]*', '', out) # Remove record separators. out = re.sub(r'\^@&@\^\n?', '', out) # Replace ^@^...^@^ placeholders with properly JSON-escaped strings. # json.dumps handles all special characters including control characters. out = re.sub(r'\^@\^(.*?)\^@\^', lambda m: json.dumps(m.group(1)), out, flags=re.DOTALL) if out[-1] == ',': out = out[:-1] # Remove trailing comma if present. out = '[' + out + ']' #print(out) return json.loads(out) except json.JSONDecodeError as x: pos = x.pos print(f'RepoFetcher: {x}\n ...{repr(out[max(0,pos-40):pos+40])}...') return [] except Exception as x: print('RepoFetcher:', x) return [] def __init__(self, cfg): self.cache_dir = cfg.get("repo.cachedir", "") self.git_cmd = cfg.get("repo.git", "/usr/bin/git") def fetch_pending(self, db, repo): if repo.ts_fetch != None and \ time.time() - repo.ts_fetch < RepoFetcher.INTERVAL: return if not repo.clone_url: return # It's time to fetch now. cache_path = pjoin(self.cache_dir, str(repo.id)) os.makedirs(cache_path, exist_ok=True) git = RepoFetcher.Git(self.git_cmd, cache_path) if not os.path.exists(pjoin(cache_path, 'config')): git.run(['clone', '--bare', repo.clone_url, cache_path], without_path=True) git.run(['config', 'remote.origin.fetch', 'refs/heads/*:refs/heads/*']) # enable fetch num_commits = None else: git.run(['fetch', '--prune']) num_commits = 100 # Since the last `INTERVAL` mins, so probably enough. # Update the fetch timestamp. cur = db.conn.cursor() cur.execute("UPDATE repos SET ts_fetch=CURRENT_TIMESTAMP() WHERE id=?", (repo.id,)) db.commit() issue_pattern = re.compile(r'\b' + repo.idlabel + r'\s*#(\d+)\b') # Read the history to find out about commits. for commit in git.log(num_commits): hash = commit['fullHash'] date = commit['commitDate'] message = commit['message'] body = commit['body'] issuerefs = map(int, issue_pattern.findall(message + '\n' + body)) cur.execute("INSERT IGNORE INTO commits (repo, hash, msg, ts) VALUES (?, ?, ?, ?)", (repo.id, hash, message, date)) for issue in issuerefs: cur.execute("INSERT IGNORE INTO issuerefs (repo, commit, issue) VALUES (?, ?, ?)", (repo.id, hash, issue)) db.commit() def fetch_all(self, db): for repo in db.get_repositories(): try: self.fetch_pending(db, repo) except subprocess.CalledProcessError: print('Error when fetching repository:', repo.clone_url) class Worker (threading.Thread): EXPIRY_INTERVAL = 60 * 60 * 24 # Once per day. def __init__(self, capsule, cfg): super().__init__() self.capsule = capsule self.cfg = cfg self.fetcher = RepoFetcher(cfg) if cfg.get("repo.cachedir", "") else None def expire_notifications(self, db): cur = db.conn.cursor() cur.execute(""" DELETE FROM notifs WHERE type != ? AND TIMESTAMPDIFF(DAY, ts, CURRENT_TIMESTAMP()) > 90 """, (Notification.NEW_POLL,)) cur.execute(""" DELETE FROM notifs WHERE type = ? AND TIMESTAMPDIFF(DAY, ts, CURRENT_TIMESTAMP()) > 14 """, (Notification.NEW_POLL,)) db.commit() def run(self): print(" Worker is running", "(with RepoFetcher)" if self.fetcher else "") next_expiry = 0.0 while not self.capsule.shutdown_event().wait(60): now = time.monotonic() db = Database(self.cfg) try: # Expire old notifications. user_ids = db.pop_tasks(Database.TASK_EXPIRE_NOTIFICATIONS) if user_ids or now >= next_expiry: self.expire_notifications(db) if now >= next_expiry: next_expiry = now + Worker.EXPIRY_INTERVAL # Update Git repositories. if self.fetcher: self.fetcher.fetch_all(db) except Exception as x: print('Worker error:', x) finally: db.close()