import re import urllib.parse as urlparse from model import Notification, Segment, User, Post, Subspace, \ FOLLOW_POST, FOLLOW_SUBSPACE, FOLLOW_USER, MUTE_POST, MUTE_SUBSPACE, MUTE_USER from utils import plural, plural_s, clean_query, is_empty_query, shorten_text, \ strip_links, clean_title def user_actions(session): db = session.db req = session.req user = session.user if req.path.startswith(session.path + 'like/') or \ req.path.startswith(session.path + 'unlike/'): if not user: return 60, 'Login required' if not session.is_likes_enabled(): return 50, 'Likes are disabled' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r'^(like|unlike)/(\d+)$', req.path[len(session.path):]) action = found.group(1) post_id = int(found.group(2)) post = db.get_post(id=post_id) if not post: return 51, 'Not found' db.modify_likes(session.user, post, add=(action == 'like')) return 30, post.page_url() elif req.path.startswith(session.path + 'lock/') or \ req.path.startswith(session.path + 'unlock/'): if not user: return 60, 'Login required' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r'^(lock|unlock)/(\d+)$', req.path[len(session.path):]) action = found.group(1) post_id = int(found.group(2)) post = db.get_post(id=post_id) if not post: return 51, 'Not found' if not session.is_lockable(post): return 60, 'Not authorized' flags = post.flags # Admin sets both lock flags, while normal users can only set the normal lock flag. lock_flag = Post.LOCKED_FLAG | (Post.ADMIN_LOCKED_FLAG if user.role == User.ADMIN else 0) if action == 'lock': flags |= lock_flag else: flags &= ~lock_flag db.update_post(post, flags=flags) return 30, post.page_url() + '/more' elif req.path.startswith(session.path + 'react/') or \ req.path.startswith(session.path + 'unreact/'): if not user: return 60, 'Login required' if not session.is_reactions_enabled(): return 50, 'Reactions are disabled' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r'^(react|unreact)/(\d+)$', req.path[len(session.path):]) action = found[1] post_id = int(found[2]) post = db.get_post(id=post_id) if not post: return 51, 'Not found' if action == 'unreact': db.modify_reactions(session.user, post, None) else: available_reactions = session.bubble.user_reactions.split() if is_empty_query(req): selected = db.get_user_reaction(post, session.user.id) page = 'Choose a reaction to:\n' page += session.gemini_feed_entry(post, session.context) page += '\n' idx = 1 for react in available_reactions: if selected == react: page += f'=> /unreact/{post.id} ❌ Reacted: {selected}\n' else: page += f'=> /react/{post.id}?{idx} {react}\n' idx += 1 page += f'\n=> {post.page_url()} Back to post\n' return page else: idx = int(clean_query(req)) selected = available_reactions[idx - 1] db.modify_reactions(user, post, selected) return 30, session.path + f'react/{post.id}' elif req.path.startswith(session.path + 'remind/'): if not user: return 60, 'Login required' found = re.match(r"^remind/(\d+)$", req.path[len(session.path):]) post_id = int(found.group(1)) post = db.get_post(id=post_id) if not post: return 51, 'Not found' db.notify_reminder(session.user, post) return 30, post.page_url() elif req.path.startswith(session.path + 'report/'): if not user: return 60, 'Login required' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r"^report/(\d+)(/([A-Za-z0-9]+))?$", req.path[len(session.path):]) post_id = int(found[1]) token = found[3] post = db.get_post(id=post_id) if not post: return 51, 'Not found' if token and db.verify_token(user, token): db.notify_moderators(Notification.REPORT, session.user, post) return 30, post.page_url() subspace = db.get_subspace(id=post.subspace) kind = 'comment' if post.parent \ else 'issue' if subspace.flags & Subspace.ISSUE_TRACKER \ else 'post' page = f'# ⚠️\n\nYou are about to report the following {kind} to the moderators:\n' page += session.gemini_feed_entry(post, session.context) page += f'\n=> /report/{post.id}/{session.get_token()} ⚠️ Send the report\n' page += f'=> {post.page_url()} Back to post\n' return page elif req.path.startswith(session.path + 'transmit/'): if not user: return 60, 'Login required' if not session.is_antenna_enabled(): return 50, 'Not available' if user.role == User.LIMITED: return 61, 'Not authorized' # Parse arguments. found = re.match(r"^transmit/(\d+)/(post/(\d+)|feed/(([\w-]+)(/([\w-]+))?))", req.path[len(session.path):]) try: antenna_index = int(found[1]) post_id = int(found[3]) if found[3] else None username = found[5] tag_filter = found[7] if post_id != None: kind = 'post' post = db.get_post(id=post_id) if post.user != user.id: return 61, "Cannot submit other users' posts" else: kind = 'feed' if username != user.name: return 61, "Cannot submit other users' feeds" # Confirmation page. antenna_label = session.bubble.antenna_labels[antenna_index] page = f'# 📡\n' page += f'You are about to submit the following:\n' if post_id != None: page += session.gemini_feed_entry(post, plain=True) post_sub = db.get_subspace(id=post.subspace) sub_path = f"u/{session.user.name}" if post_sub.owner == user.id \ else f"s/{post_sub.name}" post_ref = post.issueid if post.issueid else post.id antenna_feed = f"gemini://{session.bubble.hostname}{session.path}{sub_path}/{post_ref}/antenna" else: usub = db.get_subspace(name=username) antenna_feed = f"{session.server_root()}{session.path}u/{username}{'/tag/' + tag_filter if tag_filter else ''}?feed" page += f"=> {antenna_feed} {usub.info if usub.info else usub.name}{' [#' + tag_filter + ']' if tag_filter else ''}\n" page += '\n' + session.bubble.antenna_links(kind, antenna_feed)[antenna_index] page += f'\n=> {session.bubble.antenna_abouts[antenna_index]} More information\n' if post_id: page += f'=> {post.page_url()} Back to post\n' else: page += f'=> {session.path}u/{username} Back to u/{username}\n' except Exception as x: print(x) return 51, 'Not found' return page elif req.path.startswith(session.path + 'thanks/'): if not user: return 60, 'Login required' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r"^thanks/(\d+)$", req.path[len(session.path):]) post_id = int(found.group(1)) post = db.get_post(id=post_id) if not post: return 51, 'Not found' db.notify_thanks(session.user, post) page = '# 🙏\n\nYou thanked\n' poster = db.get_user(id=post.user) page += f'=> /u/{poster.name} {poster.avatar} {poster.name}\n' page += 'for:\n' page += session.gemini_feed_entry(post, session.context) page += f'\n=> {session.path}undo-thanks/{post.id} Undo (cancel notification!)\n' return page elif req.path.startswith(session.path + 'undo-thanks/'): if not user: return 60, 'Login required' found = re.match(r"^undo-thanks/(\d+)$", req.path[len(session.path):]) post_id = int(found.group(1)) post = db.get_post(id=post_id) if not post: return 51, 'Not found' db.undo_thanks(session.user, post) return 30, post.page_url() elif req.path.startswith(session.path + 'vote/'): try: if not user: return 60, 'Login required' if user.role == User.LIMITED: return 61, 'Not authorized' m = re.search(r'/vote/(\d+)/([0-9a-zA-Z]{10})$', req.path) if not m: return 59, 'Bad request' seg_id = int(m[1]) token = m[2] if not db.verify_token(user, token): return 61, 'Expired link' segment = db.get_segment(id=seg_id) if not segment or segment.type != Segment.POLL: return 51, 'Not found' db.modify_vote(user, segment) return 30, db.get_post(id=segment.post).page_url() except ValueError: return 59, 'Bad request' elif re.search(r'^(follow|unfollow|mute|unmute)/', req.path[len(session.path):]): if not session.user: return 60, 'Login required' if user.role == User.LIMITED: return 61, 'Not authorized' found = re.match(r'(follow|unfollow|mute|unmute)/(post/(\d+)|([\w%-]+)|(u/[\w%-]+)|(s/[\w%-]+))$', req.path[len(session.path):]) if not found: return 59, 'Bad request' action = found[1] post_id = found[3] u_name = urlparse.unquote(found[4]) if found[4] else None u_sub = urlparse.unquote(found[5]) if found[5] else None s_sub = urlparse.unquote(found[6]) if found[6] else None is_follow = action in ('follow', 'unfollow') if u_name != None: target_type = FOLLOW_USER if is_follow else MUTE_USER target_id = db.get_user(name=u_name).id dst = f'/u/{u_name}' elif post_id != None: target_type = FOLLOW_POST if is_follow else MUTE_POST target_id = int(post_id) post = db.get_post(target_id) dst = post.page_url() + '/more' else: target_type = FOLLOW_SUBSPACE if is_follow else MUTE_SUBSPACE sub_name = (u_sub if u_sub != None else s_sub)[2:] sub = db.get_subspace(name=sub_name) target_id = sub.id dst = '/' + sub.title() if is_follow: db.modify_follows(session.user, action == 'follow', target_type, target_id) else: db.modify_mutes(session.user, action == 'mute', target_type, target_id) return 30, dst elif req.path.startswith(session.path + 'notif/'): if not session.user: return 60, 'Login required' found = re.match(r'^notif/((\d+)|(clear|history|feed))$', req.path[len(session.path):]) if not found: return 59, 'Bad request' notif_id = int(found[2]) if found[2] else None action = found[3] if action == 'clear': db.get_notifications(session.user, clear=True) db.update_user(session.user, active=True) return 30, '/dashboard' if action == 'history': page = '# Notification History\n' page += '=> /dashboard Back to Dashboard\n' # Resolve by event so that duplicate notification types for the same event # are collapsed, while distinct events on the same post are kept separate. notifs = db.get_notifications(session.user, include_hidden=True, sort_desc=True, by_event=True) cur_ymd = None for notif in notifs: ymd = notif.ymd_date(tz=session.tz) link, label = notif.entry(show_age=False, with_time=True, tz=session.tz) if cur_ymd != ymd: cur_ymd = ymd page += f'\n## {ymd}\n\n' page += f'=> {link} {label}\n' return page if action == 'feed': page = f'# @{session.user.name}\n' page += '\n## Notifications\n' notifs = db.get_notifications(session.user) if not notifs: page += '\nNo activity.\n' for notif in notifs: link, label = notif.entry(show_age=False) page += f'=> {link} {notif.ymd_date(tz=session.tz)} {label}\n' return page notif = db.get_notification(session.user, notif_id, clear=True) db.update_user(session.user, active=True) if req.query: extra_ids = [] for part in req.query.split('.'): try: extra_ids.append(int(part)) except ValueError: pass db.clear_notifications(session.user, extra_ids[:64]) if not notif: return 30, '/dashboard' if notif.type == Notification.CONTENT_DELETED: return 30, f'{session.path}conduct' if notif.comment: cmt = db.get_post(id=notif.comment) if not cmt: return 51, 'Not found' return 30, cmt.page_url() if notif.post: post = db.get_post(id=notif.post) if not post: return 51, 'Not found' return 30, post.page_url() if notif.subspace: subs = db.get_subspace(id=notif.subspace) if not subs: return 51, 'Not found' return 30, session.path + subs.title() notif_src = db.get_user(id=notif.src) if notif_src: if notif.type == Notification.USER_FLAIR_CHANGED: return 30, f'{session.path}settings/flair/{notif_src.name}' return 30, f'{session.path}u/{notif_src.name}' return 42, 'Invalid notification' class Subgrouping: """Sorts notifications by related post ID and descending time.""" def __init__(self, user, notifs): self.user = user self.by_post = {} ordered = sorted(notifs, key=lambda n: -n.ts) for notif in ordered: if not notif.post in self.by_post: self.by_post[notif.post] = [] self.by_post[notif.post].append(notif) def render(self): src = [] top_head = False prev_top_head = False for post_id, notifs in self.by_post.items(): prev_top_head = top_head top_head = post_id and len(notifs) > 1 # Print a common heading. if top_head: n = notifs[0] if n.post_subname and n.post_subowner != self.user.id: sub_text = f" in {'u/' if n.post_subowner else 's/'}{n.post_subname}" else: sub_text = '' if src: src.append('') src.append(f'"{n.title_text()}"{sub_text}:') elif prev_top_head: # Sepearate from previous grouping. src.append('') # Print the notifications. if top_head: grouped_by_type = {} non_groupable = [] # Separate the groupable and non-groupable notifications. for n in notifs: if n.is_groupable(): grouped_by_type.setdefault(n.type, []).append(n) else: non_groupable.append(n) # Grouped first (comments occur after post, and post notifs are not grouped). for type_notifs in grouped_by_type.values(): if len(type_notifs) > 1: # Collect the list of user names. seen_names = set() names = [] for n in type_notifs: if n.src_name not in seen_names: seen_names.add(n.src_name) names.append(n.src_name) if len(names) == 2: names_text = f'{names[0]} and {names[1]}' else: names_text = ', '.join(names[:-1]) + f', and {names[-1]}' # Compose a single link with the names. icon, label = type_notifs[0].grouped_label() oldest = type_notifs[-1] link, _ = oldest.entry(with_title=False) # The IDs are included so they can all be cleared when opening the # notification link. all_ids = '.'.join(str(n.id) for n in type_notifs[:100] if n.id != oldest.id) src.append(f'=> {link}?{all_ids} {icon}{names_text} {label} · {oldest.age()}') else: src.append('=> %s %s' % type_notifs[0].entry(with_title=False)) # Non-groupable afterwards. for notif in non_groupable: src.append('=> %s %s' % notif.entry(with_title=False)) else: for notif in notifs: src.append('=> %s %s' % notif.entry(with_title=not top_head)) return '\n'.join(src) + '\n' def make_dashboard_page(session): if not session.user: return 60, "Login required" user = session.user db = session.db session.cleanup() # TODO: Should be done via background tasks. session.mark_user_active() page = f'# {session.user.name}: Dashboard\n' if session.user.role == User.ADMIN: page += f'=> /admin/ 🔧 {session.bubble.site_name} administration\n' page += f'=> /settings ⚙️ Settings\n' page += f'=> /u/{session.user.name} {session.user.avatar} u/{session.user.name}\n' notifs = db.get_notifications(session.user) n = len(notifs) page += f'\n## {n} Notification{plural_s(n)}\n' if n: # Group in two: @user and Followed. yours = [] moderated = [] followed = [] for notif in notifs: if not notif.type in Notification.PRIORITY or \ notif.type == Notification.REACTION or \ Notification.PRIORITY[notif.type] >= 10: yours.append(notif) else: if notif.post_subid in user.moderated_subspace_ids: moderated.append(notif) else: followed.append(notif) if yours: page += f'\n### @{session.user.name}\n' page += Subgrouping(user, yours).render() if moderated or followed: subtitle = [] if moderated: subtitle.append('Moderated') if followed: subtitle.append('Followed') page += f'\n### {"/".join(subtitle)}\n' page += Subgrouping(user, moderated + followed).render() page += '\n=> /notif/clear 🧹 Clear all\n' page += '=> /notif/feed Notifications feed\nSubscribe to this Gemini feed to view your unread notifications in a feed reader.\n' page += '=> /notif/history 🕓 View history\n' # Visiting the Dashboard will prevent emails from being sent for these notifications. db.mark_notifications_sent(session.user) drafts = db.get_posts(user=session.user, draft=True) n = len(drafts) page += f'\n## {n} Draft{plural_s(n)}\n' if n: for post in drafts: if not post.sub_owner and post.parent == 0: sub = f' · s/{post.sub_name}' else: sub = '' page += f'=> /edit/{post.id} {post.ymd_date(tz=session.tz)} {post.title_text()}{sub}\n' subs = db.get_subspaces(mod=session.user.id) n = len(subs) if n: page += f'\n## {n} Subspace{plural_s(n)}\n' for sub in subs: page += sub.subspace_link() page += '\n## Index\n' page += f'=> /u/{user.name}/index/posts?feed {plural(db.count_posts(user=user, ignore_omit_flags=True), "post")}\n' page += f'=> /u/{user.name}/index/comments?feed {plural(db.count_posts(user=user, is_comment=True), "comment")}\n' return page def make_user_index_page(session, mode): db = session.db user = session.c_user page = '' is_posts = not (mode == 'comments') is_feed = clean_query(session.req) == 'feed' page += f'# {user.name}: {"Posts" if is_posts else "Comments"}\n' page += '=> /dashboard Back to Dashboard\n' page += f'=> ? List by subspace\n' if is_feed else f'=> ?feed List by date\n' if is_posts: posts = db.get_posts(user=user, draft=False, comment=False, sort_by_subspace=not is_feed) cur_sub = None ymd = None for post in posts: if not is_feed: # Headings. if cur_sub != post.subspace: cur_sub = post.subspace ymd = None page += f"\n## {post.sub_name}\n" post_ymd = post.ymd_date(tz=session.tz) if ymd != post_ymd[:7]: ymd = post_ymd[:7] page += f"\n### {ymd}\n" # List entry linking to post prefix = f'{"u" if post.sub_owner else "s"}/{post.sub_name}: ' if is_feed else '' title = post.title if post.title else shorten_text(strip_links(clean_title(post.summary)), 120) if post.issueid: title = f'[#{post.issueid}] ' + title SEP = " · " meta = [] if post.num_cmts: meta.append(f'💬 {post.num_cmts}') if post.num_likes: meta.append(f'👍 {post.num_likes}') if post.tags: meta.append(post.tags) entry = f'=> {post.page_url()} {post_ymd} {prefix}{title}{SEP + SEP.join(meta) if meta else ""}\n' page += entry else: comments = db.get_posts(user=user, draft=False, comment=True, sort_by_subspace=not is_feed, sort_by_post=not is_feed) cur_parent = None cur_sub = None for cmt in comments: if not is_feed: # Headings. if cur_sub != cmt.subspace: cur_sub = cmt.subspace ymd = None page += f"\n## {cmt.sub_name}\n" if cur_parent != cmt.parent: cur_parent = cmt.parent parent_post = db.get_post(id=cur_parent) if parent_post: prefix = prefix = f'{"u" if cmt.sub_owner else "s"}/{cmt.sub_name}: ' if is_feed else '' parent_title = parent_post.title if parent_post.title else f'"{shorten_text(strip_links(clean_title(parent_post.summary)), 120)}"' if parent_post.issueid: parent_title = f'[#{parent_post.issueid}] ' + parent_title else: if not session.user or session.user.id != cmt.user: continue # Don't show other people's comments on deleted posts. prefix = '' parent_title = f"Deleted post (ID:{cur_parent})" page += f"\n{prefix}{parent_title}\n" cmt_ymd = cmt.ymd_date(tz=session.tz) title = shorten_text(strip_links(clean_title(cmt.summary)), 120) entry = f'=> {cmt.page_url()} {cmt_ymd} "{title}"\n' page += entry return page