import cgi import datetime import hashlib import re import socket import ssl import time import unicodedata import urllib.parse as urlparse from OpenSSL import crypto UTC = datetime.timezone.utc def _build_math_translation(): table = {} for cp in range(0x1D400, 0x1D800): name = unicodedata.name(chr(cp), '') if name.startswith('MATHEMATICAL'): parts = name.split() letter = parts[-1] if len(letter) == 1 and letter.isalpha(): table[cp] = letter.lower() if 'SMALL' in parts else letter return str.maketrans(table) _MATH_TRANS = _build_math_translation() GEMTEXT_MARKUP = re.compile(r'^(\s*=>\s*|\* |>\s*|##?#?)') URI_PATTERN = re.compile(r'(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+(\s+—)?') NONLINK_URI_PATTERN = re.compile(r'(?)(?\s)(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`")\s]+') INNER_LINK_PREFIX = '— ' class GeminiError (Exception): def __init__(self, code, msg): super().__init__(msg) self.code = code def unescape_ini_gemtext(src): unesc = [] for line in src.split('\n'): line = line.strip() if line == '|': line = '' elif line.startswith('&&&'): line = '###' + line[3:] elif line.startswith('&&'): line = '##' + line[2:] elif line.startswith('&'): line = '#' + line[1:] unesc.append(line) return '\n'.join(unesc) def is_valid_name(name): if len(name) < 2 or len(name) > 30: return False return re.match(r'^[\w-]+$', name) != None def plural_s(i, suffix='s'): return '' if i == 1 else suffix def plural(i, word, suffix='s'): return f'{i} {word}{plural_s(i, suffix)}' def plural_form(i, singular, plural=None): if plural is None: plural = singular + 's' return f'{i} {singular if i == 1 else plural}' def parse_at_names(text) -> list: names = set() pattern = re.compile(r'(\bu/|@)([\w-]+)') pos = 0 while pos < len(text): found = pattern.search(text, pos) if not found: break names.add(found[2].lower()) pos = found.end() return list(names) def parse_likely_commit_hashes(text) -> list: hashes = set() pattern = re.compile(r'\b[0-9a-fA-F]{7,}\b') pos = 0 while pos < len(text): found = pattern.search(text, pos) if not found: break hashes.add(found[0].lower()) pos = found.end() return list(hashes) def clean_text(text): text = strip_invalid(text) # Clean up the text: ensure that preformatted is closed. pre = False for line in text.split('\n'): if line[:3] == '```': pre = not pre if pre: # Close the preformatted block. if not text.endswith('\n'): text += '\n' text += '```' return text.rstrip() def split_paragraphs(text) -> list: """Split paragraphs unless the empty lines are found inside a preformatted block.""" paragraphs = [] pre = False para = [] start = 0 empty_count = 0 lines = text.split('\n') for i, line in enumerate(lines): if not pre: if len(line.strip()) == 0: empty_count += 1 continue if empty_count >= 1: para = '\n'.join(lines[start:i]).strip() if len(para): paragraphs.append(para) start = i empty_count = 0 if line[:3] == '```': pre = not pre last = '\n'.join(lines[start:]).strip() if len(last): paragraphs.append(last) return paragraphs if len(paragraphs) > 0 else [''] def remove_prefix(text, prefix): if text.startswith(prefix): return text[len(prefix):] return text def strip_links(text, placeholder=True): return URI_PATTERN.sub(r'[\1 link]' if placeholder else '', text) def parse_nonlink_uris(text) -> list: links = [] pos = 0 while pos < len(text): found = NONLINK_URI_PATTERN.search(text, pos) if not found: break links.append(found[0]) pos = found.end() return links def clean_unicode_math(text): """Replace Unicode mathematical A-Z letters (bold, italic, etc.) with plain ASCII.""" return text.translate(_MATH_TRANS) def clean_title(title): # Strip `=>` and other Gemini syntax. cleaned = [] pre = False unlabeled_link_pattern = re.compile(r'(\w+://[^ ]+) — \1') for line in title.split('\n'): if line[:3] == '```': if not pre: pre_label = line[3:].strip() if len(pre_label) == 0: pre_label = 'preformatted' line = f'[{pre_label}]' cleaned.append(line) pre = not pre continue if pre: continue found = GEMTEXT_MARKUP.match(line) if found: line = line[found.end():] line = unlabeled_link_pattern.sub(r'\1', line) line = line.replace('\t', ' ').replace('\r', '') cleaned.append(line) title = ' '.join(cleaned).strip() return clean_unicode_math(title) def clean_description(desc): # Strip links but keep other formatting. cleaned = [] pre = False for line in desc.split('\n'): line = line.strip() if line.startswith('```'): pre = not pre if not pre: if line.startswith('=>'): continue cleaned.append(line) return '\n'.join(cleaned) def clean_tinylog(text): # Clean it up as per Tinylog specification. clean = [] pre = False for line in text.split('\n'): if line.startswith('```'): clean.append(line) pre = not pre continue if pre: clean.append(line) continue m = re.search(r'^(##?)[^#]', line) # only level 3 headings allowed if m: line = '###' + line[len(m[1]):] clean.append(line) return '\n'.join(clean) def prefix_links(src, prefix): """Add a prefix to link labels.""" if not prefix: return src lines = [] pattern = re.compile(r'^\s*=>\s*([^ ]+)(\s+(.*))?$') pre = False for line in src.split('\n'): if line.startswith('```'): pre = not pre elif not pre: m = pattern.match(line) if m: label = m[3].strip() if m[3] and len(m[3]) else '' if len(label) == 0: label = m[1] # Omit gemini scheme. if label.startswith('gemini://'): label = label[9:] line = f'=> {m[1]} {prefix}{label}' lines.append(line) return '\n'.join(lines) def strip_multibreaks(src): lines = [] pre = False last_was_break = False for line in src.split('\n'): if line.startswith('```'): pre = not pre is_break = False elif not pre: is_break = len(line.strip()) == 0 if is_break and last_was_break: continue last_was_break = is_break lines.append(line) return '\n'.join(lines) def strip_invalid(src): return src.replace('\x00', '').replace('\r', '') def shorten_text(text, n): """Truncate and cut at white or word boundary.""" if len(text) > n: text = text[:n] if text[-1] == ' ': return text.strip() + '…' m = re.search(r'[\w,.]+$', text) if m: return text[:m.start()].rstrip() + '…' return text.rstrip() + '…' return text.strip() def extract_title_heading(session, body): lines = body.split('\n') title = None found = re.match(r'^\s*#\s*(.+)$', lines[0]) foundH2 = re.match(r'^\s*##\s*(.+)$', lines[0]) if found and not foundH2: title = found[1] body = '\n'.join(lines[1:]).strip() elif session.is_context_tracker: title = lines[0] body = '\n'.join(lines[1:]).strip() return title, body def time_delta_text(sec, date_ts, suffix='ago', now='Now', date_prefix='', date_fmt='%Y-%m-%d', date_sep=' · ', short_date_fmt='%b %d', tz=None): if sec < 2: return now if sec < 60: return f'{sec} seconds {suffix}' mins = int(sec / 60) if sec < 3600: return f'{mins} minute{plural_s(mins)} {suffix}' hours = int(sec / 3600) if hours <= 24: return f'{hours} hour{plural_s(hours)} {suffix}' days = round(sec / 3600 / 24) dt = datetime.datetime.fromtimestamp(date_ts, UTC) if tz: dt = dt.astimezone(tz) current_year = datetime.datetime.now().year age = date_prefix + dt.strftime(short_date_fmt if dt.year == current_year else date_fmt) if days < 14: return age + f'{date_sep}{days} day{plural_s(days)} {suffix}' weeks = round(days / 7) if weeks <= 8: return age + f'{date_sep}{weeks} week{plural_s(weeks)} {suffix}' months = round(days / (365 / 12)) # average month length if months < 12: return age + f'{date_sep}{months} month{plural_s(months)} {suffix}' years = round(days / 365) return age + f'{date_sep}{years} year{plural_s(years)} {suffix}' def ago_text(ts, suffix='ago', now='Now', tz=None): sec = max(0, int(time.time()) - ts) return time_delta_text(sec, ts, suffix, now, tz=tz) def atom_timestamp(ts): return datetime.datetime.fromtimestamp(ts, UTC).strftime("%Y-%m-%dT%H:%M:%SZ") def atom_escaped(text): return text.replace('&', '&').replace('<', '<').replace('>', '>').\ replace("'", ''').replace('"', '"') def gemtext_to_html(src): out = [] in_list = False in_quote = False in_pre = False for line in src.rstrip().split('\n'): rend = None is_bullet = False is_angle = False if in_pre: if line.startswith('```'): in_pre = False rend = '' else: rend = atom_escaped(line) else: if line.startswith('###'): rend = f'
') rend = f'{atom_escaped(line[1:])}' elif line.startswith('*'): is_bullet = True #if not in_list: # in_list = True # out.append('') in_quote = False if is_angle and not in_quote: out.append('') rend = f'
- {atom_escaped(line[1:].strip())}
' elif line.startswith('=>'): link = re.match(r'=>\s*([^\s]+)(\s+.*)?', line) if not link: continue url = link.group(1) label = link.group(2) if label is None: label = url label = label.strip() parts = urlparse.urlparse(url) scheme = parts.scheme if parts.scheme else 'gemini' # if not parts.netloc: # # Do something about a relative URL? link_attr = '' #if parts.path.endswith('.png') or parts.path.endswith('.jpg') or \ # parts.path.endswith('.webp'): # # Render as an image. # rend = f'' #else: rend = f'' elif line.startswith('```'): in_pre = True rend = '
' else: rend = f'{atom_escaped(line)}
' if rend is not None: if not is_bullet and in_list: out.append('') in_list = False if not is_angle and in_quote: out.append('
') in_quote = True if is_bullet and not in_list: out.append('') in_list = True out.append(rend) return '\n'.join(out) def is_empty_query(req): return req.query == None or len(req.query) == 0 def clean_query(req): if req.query == None: return '' return clean_text(urlparse.unquote(req.query)).strip() def nonzero(value): return 1 if value else 0 def is_zero(value): return 0 if value else 1 def parse_link_segment(content, unquote=True) -> tuple: if unquote: content = urlparse.unquote(content) q = content.replace('\n', ' ') found = re.match(r'^\s*(=>)?\s*([^\s]+)(\s+(.+))?\s*$', q) if not found: raise GeminiError(59, 'Invalid link syntax (enter URL followed by label, separated with space)') seg_url = found.group(2) if '://' not in seg_url: seg_url = 'gemini://' + seg_url parsed = urlparse.urlparse(seg_url) if not parsed.scheme or not parsed.netloc: raise GeminiError(59, 'Invalid URL') if found[4]: seg_text = clean_title(found[4]) else: seg_text = '' return seg_url, seg_text def parse_link_segment_query(req) -> tuple: if req.query == None: return '', '' return parse_link_segment(req.query) def form_link(url_label: tuple): url, label = url_label if len(url) and len(label): return url + ' ' + label if len(label) == 0: return url return '' def absolute_url(base, relative): # This is straight from Solderpunk's gemini-demo.py. if "://" not in relative: # Python's URL tools somehow only work with known schemes? base = base.replace("gemini://", "http://") relative = urlparse.urljoin(base, relative) relative = relative.replace("http://", "gemini://") # Remove the default port. port_pos = relative.find(':1965/') if port_pos >= 10: relative = relative[:port_pos] + relative[port_pos + 5:] return relative def gemini_fetch(url, redirect_count=0, max_data=None) -> tuple: """Returns tuple: (mime, mime_params, body).""" if redirect_count == 5: raise Exception("Too many redirects") parts = urlparse.urlparse(url) if parts.scheme != 'gemini': raise Exception("Only Gemini URLs allowed") try: s = socket.create_connection((parts.hostname, parts.port if parts.port else 1965), timeout=10) context = ssl.SSLContext() context.check_hostname = False context.verify_mode = ssl.CERT_NONE s = context.wrap_socket(s, server_hostname=parts.hostname) s.sendall((url + '\r\n').encode("UTF-8")) got_header = False incoming = bytes() mime = 'application/octet-stream' while True: data = s.recv(4096 if not max_data else max(max_data, 1024)) incoming += data if got_header and max_data and len(incoming) > max_data: #return mime, incoming break if not got_header: header_end = incoming.find(b'\r\n') if header_end > 0: got_header = True header = incoming[:header_end].decode("UTF-8").strip() incoming = incoming[header_end + 2:] parts = header.split() status = parts[0] mime = ''.join(parts[1:]) # Follow redirects. if status.startswith('3'): s.close() del s return gemini_fetch(absolute_url(url, mime), redirect_count=redirect_count + 1) elif status.startswith('2'): mime, mime_opts = cgi.parse_header(mime) else: print(url, 'gemini_fetch error:', header) return None, None, None if len(data) == 0: break s.close() if got_header: if mime.startswith('text/'): return mime, mime_opts, incoming.decode(mime_opts.get("charset", "UTF-8")) else: return mime, mime_opts, incoming except Exception as er: print(str(er), '-- failed:', url) return None, None, None def certificate_sha256(cert): der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) m = hashlib.sha256() m.update(der) return m.hexdigest() def pubkey_sha256(cert): pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey()) m = hashlib.sha256() m.update(pubkey) return m.hexdigest()