GmCapsule [main]
Refactoring to isolate workers better (trying `multiprocessing`)
28d6cb61487854a146a5573523712ebe77cb6e9e
[1mdiff --git a/gmcapsule/__init__.py b/gmcapsule/__init__.py[m
[1mindex ce52024..decc1d8 100644[m
[1m--- a/gmcapsule/__init__.py[m
[1m+++ b/gmcapsule/__init__.py[m
[36m@@ -485,9 +485,9 @@[m [mfrom .gemini import Server, Cache[m
from .markdown import to_gemtext as markdown_to_gemtext[m
[m
[m
[31m-__version__ = '0.4.1'[m
[32m+[m[32m__version__ = '0.5.0'[m
__all__ = [[m
[31m- 'Config', 'Capsule', 'Cache',[m
[32m+[m[32m 'Config', 'Cache',[m
'get_mime_type', 'markdown_to_gemtext'[m
][m
[m
[36m@@ -508,7 +508,6 @@[m [mclass Config:[m
"""[m
[m
def __init__(self, config_path):[m
[31m- self.debug_memtrace = False[m
self.ini = configparser.ConfigParser()[m
if os.path.exists(config_path):[m
self.ini.read(config_path)[m
[36m@@ -556,6 +555,9 @@[m [mclass Config:[m
def max_upload_size(self):[m
return self.ini.getint('titan', 'upload_limit', fallback=10 * 1024 * 1024)[m
[m
[32m+[m[32m def require_upload_identity(self):[m
[32m+[m[32m return self.ini.getboolean('titan', 'require_identity', fallback=True)[m
[32m+[m
def section(self, name):[m
"""[m
Find a section in the config INI file.[m
[36m@@ -607,137 +609,16 @@[m [mclass Capsule:[m
cfg (Config): Server configuration.[m
"""[m
[m
[31m- _capsule = None[m
[31m-[m
def __init__(self, cfg):[m
[31m- Capsule._capsule = self[m
self.cfg = cfg[m
[31m- self.sv = Server([m
[31m- cfg.hostnames(),[m
[31m- cfg.certs_dir() / 'cert.pem',[m
[31m- cfg.certs_dir() / 'key.pem',[m
[31m- address=cfg.address(),[m
[31m- port=cfg.port(),[m
[31m- session_id=f'GmCapsule:{cfg.port()}'.encode('utf-8'),[m
[31m- max_upload_size=cfg.max_upload_size(),[m
[31m- num_threads=cfg.num_threads()[m
[31m- )[m
[31m- # Modules define the entrypoints.[m
[31m- self.load_modules()[m
[31m-[m
[31m- @staticmethod[m
[31m- def config():[m
[31m- """[m
[31m- Returns:[m
[31m- Config: Server configuration.[m
[31m- """[m
[31m- return Capsule._capsule.cfg[m
[31m-[m
[31m- def add(self, path, entrypoint, hostname=None, protocol='gemini'):[m
[31m- """[m
[31m- Register a URL entry point.[m
[31m-[m
[31m- Extension modules must call this to become visible in the server's[m
[31m- path hierarchy. Entry points are looked up in the order the modules[m
[31m- were loaded, with earlier modules getting precedence.[m
[31m-[m
[31m- Args:[m
[31m- path (str): URL path. Must begin with a slash (``/``). Asterisk[m
[31m- wildcards (``*``) are supported. Note that if the path[m
[31m- ``/*`` is registered, it will match any requested URL.[m
[31m- entrypoint (callable): Function or other callable object that[m
[31m- gets called when a request is processed with a matching[m
[31m- URL path. A :class:`~gmcapsule.gemini.Request` is passed in as the[m
[31m- only argument.[m
[31m- hostname (str): Hostname for the entry point. If omitted,[m
[31m- the entry point applies to all configured hostnames.[m
[31m- protocol (str): Protocol for the entry point.[m
[31m- """[m
[31m- if hostname:[m
[31m- self.sv.add_entrypoint(protocol, hostname, path, entrypoint)[m
[31m- else:[m
[31m- for hostname in self.cfg.hostnames():[m
[31m- if not hostname:[m
[31m- raise Exception(f'invalid hostname: "{hostname}"')[m
[31m- self.sv.add_entrypoint(protocol, hostname, path, entrypoint)[m
[31m-[m
[31m- def add_cache(self, cache):[m
[31m- """[m
[31m- Install a cache.[m
[31m-[m
[31m- All installed caches will attempt to save and load content until one[m
[31m- succeeds. The caches installed first get precedence.[m
[31m-[m
[31m- Args:[m
[31m- cache (Cache): Cache instance.[m
[31m- """[m
[31m- self.sv.add_cache(cache)[m
[31m-[m
[31m- def load_modules(self):[m
[31m- # The configuration can override default priorities.[m
[31m- mod_priority = {}[m
[31m- if 'priority' in self.cfg.ini:[m
[31m- for name, priority in self.cfg.section('priority').items():[m
[31m- mod_priority[name] = int(priority)[m
[31m-[m
[31m- # We will load all recognized modules.[m
[31m- name_pattern = re.compile(r'([0-9][0-9])_(.*)\.py')[m
[31m- dirs = [][m
[31m- for user_dir in self.cfg.mod_dirs():[m
[31m- if user_dir not in dirs:[m
[31m- dirs.append(user_dir)[m
[31m- dirs += [Path(__file__).parent.resolve() / 'modules'][m
[31m- mods = [][m
[31m- for mdir in dirs:[m
[31m- for mod_file in sorted(os.listdir(mdir)):[m
[31m- m = name_pattern.match(mod_file)[m
[31m- if m:[m
[31m- path = (mdir / mod_file).resolve()[m
[31m- name = m.group(2)[m
[31m- loader = importlib.machinery.SourceFileLoader(name, str(path))[m
[31m- spec = importlib.util.spec_from_loader(name, loader)[m
[31m- mod = importlib.util.module_from_spec(spec)[m
[31m- loader.exec_module(mod)[m
[31m- if name in mod_priority:[m
[31m- priority = mod_priority[name][m
[31m- else:[m
[31m- priority = int(m.group(1))[m
[31m- mods.append((priority, name, mod))[m
[31m-[m
[31m- # Initialize in priority order.[m
[31m- for _, _, mod in sorted(mods):[m
[31m- print(f'Init:', mod.__doc__)[m
[31m- mod.init(self)[m
[31m-[m
[31m- def shutdown_event(self):[m
[31m- """[m
[31m- Returns:[m
[31m- threading.Event: Event that is set when the server is[m
[31m- shutting down. Background workers must wait on this and stop[m
[31m- when the event is set.[m
[31m- """[m
[31m- return self.sv.shutdown_event[m
[31m-[m
[31m- def call_entrypoint(self, request):[m
[31m- """[m
[31m- Calls the registered entry point for a request.[m
[31m-[m
[31m- Args:[m
[31m- request (Request): Request object.[m
[31m-[m
[31m- Returns:[m
[31m- Tuple with (response, cache). The response can be binary data, text,[m
[31m- tuple with status and meta string, or tuple with status, meta, and body.[m
[31m- The cache is None if the data was not read from a cache.[m
[31m- """[m
[31m- return self.sv.call_entrypoint(request)[m
[32m+[m[32m self.sv = Server(cfg)[m
[m
def run(self):[m
"""[m
Start worker threads and begin accepting incoming connections. The[m
server will run until stopped with a KeyboardInterrupt (^C).[m
"""[m
[31m- self.sv.run(memtrace=self.cfg.debug_memtrace)[m
[32m+[m[32m self.sv.run()[m
[m
[m
def get_mime_type(path):[m
[36m@@ -795,8 +676,6 @@[m [mdef run_server():[m
args = argp.parse_args()[m
[m
cfg = Config(args.config_file)[m
[31m- cfg.debug_memtrace = args.trace_malloc[m
[31m-[m
try:[m
capsule = Capsule(cfg)[m
capsule.run()[m
[1mdiff --git a/gmcapsule/gemini.py b/gmcapsule/gemini.py[m
[1mindex 26ded98..dcb17d9 100644[m
[1m--- a/gmcapsule/gemini.py[m
[1m+++ b/gmcapsule/gemini.py[m
[36m@@ -2,14 +2,15 @@[m
# License: BSD-2-Clause[m
[m
import fnmatch[m
[31m-import gc[m
import hashlib[m
[31m-import queue[m
[32m+[m[32mimport importlib[m
import os.path[m
import select[m
import socket[m
[31m-import threading[m
[32m+[m[32mimport multiprocessing as mp[m
[32m+[m[32mimport re[m
import time[m
[32m+[m[32mfrom pathlib import Path[m
from urllib.parse import urlparse[m
[m
import OpenSSL.crypto[m
[36m@@ -144,56 +145,6 @@[m [mdef report_error(stream, code, msg):[m
safe_close(stream)[m
[m
[m
[31m-memtrace_lock = threading.Lock()[m
[31m-[m
[31m-[m
[31m-def display_memtop(snapshot, prev_snapshot, key_type='lineno', limit=1000):[m
[31m- import tracemalloc[m
[31m- import linecache[m
[31m- filters = ([m
[31m- tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),[m
[31m- tracemalloc.Filter(False, "<unknown>"),[m
[31m- tracemalloc.Filter(False, "*/linecache.py"),[m
[31m- tracemalloc.Filter(False, "*/tracemalloc.py")[m
[31m- )[m
[31m- snapshot = snapshot.filter_traces(filters)[m
[31m- if prev_snapshot:[m
[31m- prev_snapshot = prev_snapshot.filter_traces(filters)[m
[31m- top_stats = snapshot.compare_to(prev_snapshot, key_type)[m
[31m- top_type = 'delta'[m
[31m- limit = 200[m
[31m- else:[m
[31m- top_stats = snapshot.statistics('traceback') #key_type)[m
[31m- top_type = 'malloc'[m
[31m-[m
[31m- with memtrace_lock:[m
[31m- print("\n\nTop %s %s" % (limit, top_type))[m
[31m- for index, stat in enumerate(top_stats[:limit], 1):[m
[31m- if prev_snapshot:[m
[31m- frame = stat.traceback[0][m
[31m- if stat.size_diff <= 0:[m
[31m- continue[m
[31m- print("#%s: \x1b[1m%.1f\x1b[0m KiB (%+.1f KiB) count=%d (%+d)"[m
[31m- % (index,[m
[31m- stat.size / 1024, stat.size_diff / 1024, stat.count, stat.count_diff))[m
[31m- else:[m
[31m- print("#%s: \x1b[1m%.1f\x1b[0m KiB count=%d"[m
[31m- % (index, stat.size / 1024, stat.count))[m
[31m- for frame in stat.traceback:[m
[31m- line = linecache.getline(frame.filename, frame.lineno).strip()[m
[31m- if 'python3.' in frame.filename: continue[m
[31m- if line:[m
[31m- print('\x1b[0;31m %35s:%-5s ' % (frame.filename[-35:], str(frame.lineno) + ':'))[m
[31m- print('\x1b[0;36m %s\x1b[0m' % line)[m
[31m-[m
[31m- other = top_stats[limit:][m
[31m- if other:[m
[31m- size = sum(stat.size for stat in other)[m
[31m- print("%s other: %.1f KiB" % (len(other), size / 1024))[m
[31m- total = sum(stat.size for stat in top_stats)[m
[31m- print("Total size: %.1f KiB\n\n" % (total / 1024))[m
[31m-[m
[31m-[m
class Identity:[m
"""[m
Client certificate.[m
[36m@@ -339,39 +290,204 @@[m [mclass Cache:[m
return None, None[m
[m
[m
[31m-class Worker(threading.Thread):[m
[31m- """Thread that processes incoming requests from clients."""[m
[32m+[m[32mclass WorkerContext:[m
[32m+[m[32m def __init__(self, cfg, shutdown_event):[m
[32m+[m[32m self.cfg = cfg[m
[32m+[m[32m self.shutdown = shutdown_event[m
[32m+[m[32m self.hostnames = cfg.hostnames()[m
[32m+[m[32m self.entrypoints = {'gemini': {}, 'titan': {}}[m
[32m+[m[32m for proto in ['gemini', 'titan']:[m
[32m+[m[32m self.entrypoints[proto] = {}[m
[32m+[m[32m for hostname in self.hostnames:[m
[32m+[m[32m self.entrypoints[proto][hostname] = [][m
[32m+[m[32m self.caches = [][m
[32m+[m[32m self.is_quiet = False[m
[32m+[m
[32m+[m[32m def set_quiet(self, is_quiet):[m
[32m+[m[32m self.is_quiet = is_quiet[m
[m
[31m- def __init__(self, id, server):[m
[31m- super().__init__()[m
[31m- self.id = id[m
[31m- self.server = server[m
[31m- self.jobs = server.work_queue[m
[32m+[m[32m def config(self):[m
[32m+[m[32m return self.cfg[m
[m
[31m- def run(self):[m
[31m- while True:[m
[31m- stream, from_addr = self.jobs.get()[m
[31m- if stream is None:[m
[31m- break[m
[32m+[m[32m def print(self, *args):[m
[32m+[m[32m if not self.is_quiet:[m
[32m+[m[32m print(*args)[m
[m
[31m- try:[m
[31m- self.process_request(stream, from_addr)[m
[31m- except OpenSSL.SSL.SysCallError as error:[m
[31m- self.log(f'OpenSSL error: ' + str(error))[m
[31m- except AbortedIOError as error:[m
[31m- self.log(f'Send aborted: ' + str(error))[m
[31m- except Exception as error:[m
[31m- self.log(f'Problem: ' + str(error))[m
[31m- # Some unexpected problem...[m
[31m- #import traceback[m
[31m- #traceback.print_exc()[m
[31m- # try:[m
[31m- # report_error(stream, 42, str(error))[m
[31m- # except:[m
[31m- # pass[m
[32m+[m[32m def add_entrypoint(self, protocol, hostname, path_pattern, entrypoint):[m
[32m+[m[32m self.entrypoints[protocol][hostname].append((path_pattern, entrypoint))[m
[32m+[m
[32m+[m[32m def __setitem__(self, key, value):[m
[32m+[m[32m for hostname in self.hostnames:[m
[32m+[m[32m self.add_entrypoint('gemini', hostname, key, value)[m
[32m+[m
[32m+[m[32m def add_cache(self, cache):[m
[32m+[m[32m """[m
[32m+[m[32m Install a cache.[m
[32m+[m
[32m+[m[32m All installed caches will attempt to save and load content until one[m
[32m+[m[32m succeeds. The caches installed first get precedence.[m
[32m+[m
[32m+[m[32m Args:[m
[32m+[m[32m cache (Cache): Cache instance.[m
[32m+[m[32m """[m
[32m+[m[32m self.caches.append(cache)[m
[32m+[m
[32m+[m[32m def add(self, path, entrypoint, hostname=None, protocol='gemini'):[m
[32m+[m[32m """[m
[32m+[m[32m Register a URL entry point.[m
[32m+[m
[32m+[m[32m Extension modules must call this to become visible in the server's[m
[32m+[m[32m path hierarchy. Entry points are looked up in the order the modules[m
[32m+[m[32m were loaded, with earlier modules getting precedence.[m
[32m+[m
[32m+[m[32m Args:[m
[32m+[m[32m path (str): URL path. Must begin with a slash (``/``). Asterisk[m
[32m+[m[32m wildcards (``*``) are supported. Note that if the path[m
[32m+[m[32m ``/*`` is registered, it will match any requested URL.[m
[32m+[m[32m entrypoint (callable): Function or other callable object that[m
[32m+[m[32m gets called when a request is processed with a matching[m
[32m+[m[32m URL path. A :class:`~gmcapsule.gemini.Request` is passed in as the[m
[32m+[m[32m only argument.[m
[32m+[m[32m hostname (str): Hostname for the entry point. If omitted,[m
[32m+[m[32m the entry point applies to all configured hostnames.[m
[32m+[m[32m protocol (str): Protocol for the entry point.[m
[32m+[m[32m """[m
[32m+[m[32m if hostname:[m
[32m+[m[32m self.add_entrypoint(protocol, hostname, path, entrypoint)[m
[32m+[m[32m else:[m
[32m+[m[32m for hostname in self.cfg.hostnames():[m
[32m+[m[32m if not hostname:[m
[32m+[m[32m raise Exception(f'invalid hostname: "{hostname}"')[m
[32m+[m[32m self.add_entrypoint(protocol, hostname, path, entrypoint)[m
[32m+[m
[32m+[m[32m def load_modules(self):[m
[32m+[m[32m # The configuration can override default priorities.[m
[32m+[m[32m mod_priority = {}[m
[32m+[m[32m if 'priority' in self.cfg.ini:[m
[32m+[m[32m for name, priority in self.cfg.section('priority').items():[m
[32m+[m[32m mod_priority[name] = int(priority)[m
[32m+[m
[32m+[m[32m # We will load all recognized modules.[m
[32m+[m[32m name_pattern = re.compile(r'([0-9][0-9])_(.*)\.py')[m
[32m+[m[32m dirs = [][m
[32m+[m[32m for user_dir in self.cfg.mod_dirs():[m
[32m+[m[32m if user_dir not in dirs:[m
[32m+[m[32m dirs.append(user_dir)[m
[32m+[m[32m dirs += [Path(__file__).parent.resolve() / 'modules'][m
[32m+[m[32m mods = [][m
[32m+[m[32m for mdir in dirs:[m
[32m+[m[32m for mod_file in sorted(os.listdir(mdir)):[m
[32m+[m[32m m = name_pattern.match(mod_file)[m
[32m+[m[32m if m:[m
[32m+[m[32m path = (mdir / mod_file).resolve()[m
[32m+[m[32m name = m.group(2)[m
[32m+[m[32m loader = importlib.machinery.SourceFileLoader(name, str(path))[m
[32m+[m[32m spec = importlib.util.spec_from_loader(name, loader)[m
[32m+[m[32m mod = importlib.util.module_from_spec(spec)[m
[32m+[m[32m loader.exec_module(mod)[m
[32m+[m[32m if name in mod_priority:[m
[32m+[m[32m priority = mod_priority[name][m
[32m+[m[32m else:[m
[32m+[m[32m priority = int(m.group(1))[m
[32m+[m[32m mods.append((priority, name, mod))[m
[32m+[m
[32m+[m[32m # Initialize in priority order.[m
[32m+[m[32m for _, name, mod in sorted(mods):[m
[32m+[m[32m self.print(f'Init:', mod.__doc__ if mod.__doc__ else name)[m
[32m+[m[32m mod.init(self)[m
[32m+[m
[32m+[m[32m def shutdown_event(self):[m
[32m+[m[32m """[m
[32m+[m[32m Returns:[m
[32m+[m[32m threading.Event: Event that is set when the server is[m
[32m+[m[32m shutting down. Background workers must wait on this and stop[m
[32m+[m[32m when the event is set.[m
[32m+[m[32m """[m
[32m+[m[32m return self.shutdown[m
[32m+[m
[32m+[m[32m def call_entrypoint(self, request):[m
[32m+[m[32m """[m
[32m+[m[32m Calls the registered entry point for a request.[m
[32m+[m
[32m+[m[32m Args:[m
[32m+[m[32m request (Request): Request object.[m
[32m+[m
[32m+[m[32m Returns:[m
[32m+[m[32m Tuple with (response, cache). The response can be binary data, text,[m
[32m+[m[32m tuple with status and meta string, or tuple with status, meta, and body.[m
[32m+[m[32m The cache is None if the data was not read from a cache.[m
[32m+[m[32m """[m
[32m+[m[32m entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)[m
[32m+[m
[32m+[m[32m caches = self.caches if (request.scheme == 'gemini' and[m
[32m+[m[32m not request.identity and[m
[32m+[m[32m not request.query) else [][m
[32m+[m[32m from_cache = None[m
[32m+[m
[32m+[m[32m if entrypoint:[m
[32m+[m[32m # Check the caches first.[m
[32m+[m[32m for cache in caches:[m
[32m+[m[32m media, content = cache.try_load(request.hostname + request.path)[m
[32m+[m[32m if not media is None:[m
[32m+[m[32m response = 20, media, content[m
[32m+[m[32m if hasattr(content, '__len__'):[m
[32m+[m[32m self.print('%d bytes from cache, %s' % (len(content), media))[m
[32m+[m[32m else:[m
[32m+[m[32m self.print('stream from cache,', media)[m
[32m+[m[32m return response, cache[m
[m
[31m- safe_close(stream)[m
[31m- stream, from_addr = None, None[m
[32m+[m[32m # Process the request normally if there is nothing cached.[m
[32m+[m[32m if not from_cache:[m
[32m+[m[32m try:[m
[32m+[m[32m return entrypoint(request), None[m
[32m+[m[32m except Exception as x:[m
[32m+[m[32m import traceback[m
[32m+[m[32m traceback.print_exception(x)[m
[32m+[m[32m raise GeminiError(40, 'Temporary failure')[m
[32m+[m
[32m+[m[32m raise GeminiError(50, 'Permanent failure')[m
[32m+[m
[32m+[m
[32m+[m[32mclass Worker(mp.Process):[m
[32m+[m[32m """Process that handles incoming requests from clients."""[m
[32m+[m
[32m+[m[32m def __init__(self, id, cfg, work_queue, shutdown_event):[m
[32m+[m[32m super().__init__(target=Worker._run, args=(self,))[m
[32m+[m[32m self.id = id[m
[32m+[m[32m self.cfg = cfg[m
[32m+[m[32m self.port = cfg.port()[m
[32m+[m[32m self.context = WorkerContext(self.cfg, shutdown_event)[m
[32m+[m[32m self.context.set_quiet(id > 0)[m
[32m+[m[32m self.jobs = work_queue[m
[32m+[m
[32m+[m[32m def _run(self):[m
[32m+[m[32m try:[m
[32m+[m[32m # Extensions are initialized in the worker process.[m
[32m+[m[32m self.context.load_modules()[m
[32m+[m[32m self.context.set_quiet(False)[m
[32m+[m[32m while True:[m
[32m+[m[32m stream, from_addr = self.jobs.get()[m
[32m+[m[32m if stream is None:[m
[32m+[m[32m break[m
[32m+[m[32m try:[m
[32m+[m[32m self.process_request(stream, from_addr)[m
[32m+[m[32m except OpenSSL.SSL.SysCallError as error:[m
[32m+[m[32m self.log(f'OpenSSL error: ' + str(error))[m
[32m+[m[32m except AbortedIOError as error:[m
[32m+[m[32m self.log(f'Send aborted: ' + str(error))[m
[32m+[m[32m except Exception as error:[m
[32m+[m[32m self.log(f'Problem: ' + str(error))[m
[32m+[m[32m # Some unexpected problem...[m
[32m+[m[32m #import traceback[m
[32m+[m[32m #traceback.print_exc()[m
[32m+[m[32m # try:[m
[32m+[m[32m # report_error(stream, 42, str(error))[m
[32m+[m[32m # except:[m
[32m+[m[32m # pass[m
[32m+[m[32m safe_close(stream)[m
[32m+[m[32m stream, from_addr = None, None[m
[32m+[m[32m except KeyboardInterrupt:[m
[32m+[m[32m pass[m
[m
def log(self, *args):[m
print(time.strftime('%Y-%m-%d %H:%M:%S'), f'[{self.id}]', '--', *args)[m
[36m@@ -418,7 +534,7 @@[m [mclass Worker(threading.Thread):[m
identity = Identity(cl_cert) if cl_cert else None[m
[m
if request.startswith('titan:'):[m
[31m- if identity is None and self.server.require_upload_identity:[m
[32m+[m[32m if identity is None and self.cfg.require_upload_identity():[m
report_error(stream, 60, "Client certificate required for upload")[m
return[m
[m
[36m@@ -433,7 +549,8 @@[m [mclass Worker(threading.Thread):[m
elif p.startswith('mime='):[m
req_mime = p[5:][m
self.log(f'Receiving Titan content: {expected_size}')[m
[31m- if expected_size > self.server.max_upload_size and self.server.max_upload_size > 0:[m
[32m+[m[32m max_upload_size = self.cfg.max_upload_size()[m
[32m+[m[32m if expected_size > max_upload_size and max_upload_size > 0:[m
report_error(stream, 59, "Maximum content length exceeded")[m
return[m
while len(data) < expected_size:[m
[36m@@ -458,7 +575,7 @@[m [mclass Worker(threading.Thread):[m
path = '/'[m
hostname = url.hostname[m
[m
[31m- if url.port != None and url.port != self.server.port:[m
[32m+[m[32m if url.port != None and url.port != self.port:[m
report_error(stream, 59, "Invalid port number")[m
return[m
if not stream.get_servername():[m
[36m@@ -481,7 +598,7 @@[m [mclass Worker(threading.Thread):[m
content_mime=req_mime,[m
content=data if len(data) else None[m
)[m
[31m- response, from_cache = self.server.call_entrypoint(request)[m
[32m+[m[32m response, from_cache = self.context.call_entrypoint(request)[m
[m
# Determine status code, meta line, and body content.[m
if type(response) == tuple:[m
[36m@@ -521,24 +638,25 @@[m [mclass Worker(threading.Thread):[m
[m
[m
class Server:[m
[31m- def __init__(self, hostname_or_hostnames, cert_path, key_path,[m
[31m- address='localhost', port=1965,[m
[31m- cache=None, session_id=None, max_upload_size=0, num_threads=1,[m
[31m- require_upload_identity=True):[m
[32m+[m[32m def __init__(self, cfg):[m
[32m+[m[32m mp.set_start_method('spawn')[m
[32m+[m
[32m+[m[32m hostname_or_hostnames = cfg.hostnames()[m
[32m+[m[32m cert_path = cfg.certs_dir() / 'cert.pem'[m
[32m+[m[32m key_path = cfg.certs_dir() / 'key.pem'[m
[32m+[m[32m address = cfg.address()[m
[32m+[m[32m port = cfg.port()[m
[32m+[m[32m session_id = f'GmCapsule:{cfg.port()}'.encode('utf-8')[m
[32m+[m[32m num_threads = cfg.num_threads()[m
[32m+[m
self.hostnames = [hostname_or_hostnames] \[m
if type(hostname_or_hostnames) == str else hostname_or_hostnames[m
self.address = address[m
self.port = port[m
[31m- self.entrypoints = {'gemini': {}, 'titan': {}}[m
[31m- for proto in ['gemini', 'titan']:[m
[31m- self.entrypoints[proto] = {}[m
[31m- for hostname in self.hostnames:[m
[31m- self.entrypoints[proto][hostname] = [][m
[31m- self.caches = [][m
[31m- if cache:[m
[31m- self.caches.append(cache)[m
[31m- self.max_upload_size = max_upload_size[m
[31m- self.require_upload_identity = require_upload_identity[m
[32m+[m[32m #if cache:[m
[32m+[m[32m # self.caches.append(cache)[m
[32m+[m[32m #self.max_upload_size = max_upload_size[m
[32m+[m[32m #self.require_upload_identity = require_upload_identity[m
[m
if not os.path.exists(cert_path):[m
raise Exception("certificate file not found: " + str(cert_path))[m
[36m@@ -555,32 +673,17 @@[m [mclass Server:[m
self.context.set_session_id(session_id)[m
[m
# Spawn the worker threads.[m
[31m- self.shutdown_event = threading.Event()[m
[32m+[m[32m self.shutdown_event = mp.Event()[m
self.workers = [][m
[31m- self.work_queue = queue.Queue()[m
[32m+[m[32m self.work_queue = mp.Queue()[m
for worker_id in range(max(num_threads, 1)):[m
[31m- worker = Worker(worker_id, self)[m
[32m+[m[32m worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)[m
self.workers.append(worker)[m
[m
self.sock = None[m
self.sv_conn = None[m
[m
[31m- def add_cache(self, cache):[m
[31m- self.caches.append(cache)[m
[31m-[m
[31m- def add_entrypoint(self, protocol, hostname, path_pattern, entrypoint):[m
[31m- self.entrypoints[protocol][hostname].append((path_pattern, entrypoint))[m
[31m-[m
[31m- def __setitem__(self, key, value):[m
[31m- for hostname in self.hostnames:[m
[31m- self.add_entrypoint('gemini', hostname, key, value)[m
[31m-[m
[31m- def run(self, memtrace=False):[m
[31m- self.memtrace = memtrace[m
[31m- if self.memtrace:[m
[31m- import tracemalloc[m
[31m- tracemalloc.start(10)[m
[31m-[m
[32m+[m[32m def run(self):[m
attempts = 60[m
print(f'Opening port {self.port}...')[m
while True:[m
[36m@@ -599,9 +702,9 @@[m [mclass Server:[m
print('...')[m
print(f'Server started on port {self.port}')[m
[m
[31m- MULTITHREAD = True[m
[32m+[m[32m MULTIPROCESS = True[m
[m
[31m- if MULTITHREAD:[m
[32m+[m[32m if MULTIPROCESS:[m
for worker in self.workers:[m
worker.start()[m
print(len(self.workers), 'worker(s) started')[m
[36m@@ -615,7 +718,7 @@[m [mclass Server:[m
stream._socket.settimeout(10)[m
self.work_queue.put((stream, from_addr))[m
[m
[31m- if not MULTITHREAD:[m
[32m+[m[32m if not MULTIPROCESS:[m
self.work_queue.put((None, None)) # single iteration only[m
self.workers[0].run()[m
[m
[36m@@ -629,22 +732,6 @@[m [mclass Server:[m
#traceback.print_exc()[m
print(ex)[m
[m
[31m- if self.memtrace:[m
[31m- old_snapshot = snapshot[m
[31m- gc.collect()[m
[31m- snapshot = tracemalloc.take_snapshot()[m
[31m- filters = ([m
[31m- tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),[m
[31m- tracemalloc.Filter(False, "<unknown>"),[m
[31m- tracemalloc.Filter(False, "*/linecache.py"),[m
[31m- tracemalloc.Filter(False, "*/tracemalloc.py"),[m
[31m- tracemalloc.Filter(False, "*/mimetypes.py"),[m
[31m- tracemalloc.Filter(False, "*/fnmatch.py")[m
[31m- )[m
[31m- snapshot = snapshot.filter_traces(filters)[m
[31m- top_stats = snapshot.statistics('lineno')[m
[31m- display_memtop(snapshot, None) #old_snapshot)[m
[31m-[m
# Close the server socket.[m
self.sv_conn = None[m
self.sock.close()[m
[36m@@ -652,7 +739,7 @@[m [mclass Server:[m
[m
# Stop all workers.[m
self.shutdown_event.set()[m
[31m- if MULTITHREAD:[m
[32m+[m[32m if MULTIPROCESS:[m
for i in range(len(self.workers)):[m
self.work_queue.put((None, None))[m
for worker in self.workers:[m
[36m@@ -678,34 +765,3 @@[m [mclass Server:[m
return None[m
[m
return None[m
[31m-[m
[31m- def call_entrypoint(self, request):[m
[31m- entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)[m
[31m-[m
[31m- caches = self.caches if (request.scheme == 'gemini' and[m
[31m- not request.identity and[m
[31m- not request.query) else [][m
[31m- from_cache = None[m
[31m-[m
[31m- if entrypoint:[m
[31m- # Check the caches first.[m
[31m- for cache in caches:[m
[31m- media, content = cache.try_load(request.hostname + request.path)[m
[31m- if not media is None:[m
[31m- response = 20, media, content[m
[31m- if hasattr(content, '__len__'):[m
[31m- print('%d bytes from cache, %s' % (len(content), media))[m
[31m- else:[m
[31m- print('stream from cache,', media)[m
[31m- return response, cache[m
[31m-[m
[31m- # Process the request normally if there is nothing cached.[m
[31m- if not from_cache:[m
[31m- try:[m
[31m- return entrypoint(request), None[m
[31m- except Exception as x:[m
[31m- import traceback[m
[31m- traceback.print_exception(x)[m
[31m- raise GeminiError(40, 'Temporary failure')[m
[31m-[m
[31m- raise GeminiError(50, 'Permanent failure')[m
[1mdiff --git a/gmcapsule/modules/10_rewrite.py b/gmcapsule/modules/10_rewrite.py[m
[1mindex b0ce05d..313c6e8 100644[m
[1m--- a/gmcapsule/modules/10_rewrite.py[m
[1m+++ b/gmcapsule/modules/10_rewrite.py[m
[36m@@ -7,8 +7,8 @@[m [mimport re[m
[m
[m
class PathRewriteHandler:[m
[31m- def __init__(self, capsule, rewritten_path):[m
[31m- self.capsule = capsule[m
[32m+[m[32m def __init__(self, context, rewritten_path):[m
[32m+[m[32m self.context = context[m
self.rewritten_path = rewritten_path[m
[m
def __call__(self, req):[m
[36m@@ -23,8 +23,8 @@[m [mclass PathRewriteHandler:[m
if req.num_rewrites == 100:[m
return 40, "Stuck in rewrite loop: " + req.url()[m
[m
[31m- print("[rewrite]", old_path, "->", req.path)[m
[31m- return self.capsule.call_entrypoint(req)[0][m
[32m+[m[32m self.context.print("[rewrite]", old_path, "->", req.path)[m
[32m+[m[32m return self.context.call_entrypoint(req)[0][m
[m
[m
class Responder:[m
[36m@@ -38,8 +38,8 @@[m [mclass Responder:[m
[m
[m
class Rewriter:[m
[31m- def __init__(self, capsule, protocol, host, src_path, dst_path, status):[m
[31m- self.capsule = capsule[m
[32m+[m[32m def __init__(self, context, protocol, host, src_path, dst_path, status):[m
[32m+[m[32m self.context = context[m
self.protocol = protocol[m
self.host = host[m
self.src_path = src_path[m
[36m@@ -52,7 +52,7 @@[m [mclass Rewriter:[m
if self.dst_path:[m
new_path = self.src_path.sub(self.dst_path, path)[m
if new_path != path:[m
[31m- return PathRewriteHandler(self.capsule, new_path)[m
[32m+[m[32m return PathRewriteHandler(self.context, new_path)[m
[m
elif self.status:[m
m = self.src_path.match(path)[m
[36m@@ -63,14 +63,14 @@[m [mclass Rewriter:[m
if cap:[m
status = status.replace(f'\\{i}', cap)[m
code, meta = status.split()[m
[31m- print("[rewrite]", code, meta)[m
[32m+[m[32m self.context.print("[rewrite]", code, meta)[m
return Responder(int(code), meta)[m
[m
return None[m
[m
[m
[31m-def init(capsule):[m
[31m- cfg = capsule.config()[m
[32m+[m[32mdef init(context):[m
[32m+[m[32m cfg = context.config()[m
for section in cfg.prefixed_sections('rewrite.').values():[m
protocol = section.get('protocol', None)[m
host = section.get('host', cfg.hostnames()[0])[m
[36m@@ -78,7 +78,7 @@[m [mdef init(capsule):[m
dst_path = section.get('repl', None)[m
status = section.get('status', None)[m
for proto in [protocol] if protocol else ['gemini', 'titan']:[m
[31m- capsule.add(Rewriter(capsule, proto, host, src_path, dst_path, status),[m
[32m+[m[32m context.add(Rewriter(context, proto, host, src_path, dst_path, status),[m
None, # `Rewriter` will return a suitable handler callback.[m
host,[m
proto)[m
[1mdiff --git a/gmcapsule/modules/80_gitview.py b/gmcapsule/modules/80_gitview.py[m
[1mindex fc22de0..d3b20f1 100755[m
[1m--- a/gmcapsule/modules/80_gitview.py[m
[1m+++ b/gmcapsule/modules/80_gitview.py[m
[36m@@ -14,7 +14,7 @@[m [mimport time[m
import urllib[m
from pathlib import Path[m
[m
[31m-from gmcapsule import Cache, Capsule, markdown_to_gemtext[m
[32m+[m[32mfrom gmcapsule import Cache, markdown_to_gemtext[m
[m
pjoin = os.path.join[m
[m
[36m@@ -69,6 +69,7 @@[m [mclass GitViewCache(Cache):[m
return True[m
[m
[m
[32m+[m[32mCONFIG = None[m
GIT = '/usr/bin/git'[m
HOSTNAME = 'localhost'[m
NUM_COMMITS_FRONT = 8[m
[36m@@ -95,7 +96,7 @@[m [mdef preformat(raw, alt_text=''):[m
[m
def repositories():[m
roots = [][m
[31m- for name, cfg in Capsule.config().prefixed_sections('gitview.').items():[m
[32m+[m[32m for name, cfg in CONFIG.prefixed_sections('gitview.').items():[m
url = cfg['url_root'][m
if not url.startswith('/'): url = '/' + url[m
if not url.endswith('/'): url += '/'[m
[36m@@ -342,7 +343,7 @@[m [mdef handle_request(gemini_request):[m
email_subject = urllib.parse.quote(f"{req.cfg['title']} commit {hash}")[m
email_body = urllib.parse.quote("=> gemini://%s:%d%scommits/%s" %[m
(HOSTNAME,[m
[31m- Capsule.config().port(),[m
[32m+[m[32m CONFIG.port(),[m
req.url_root + req.ubranch,[m
full_hash)[m
)[m
[36m@@ -465,8 +466,12 @@[m [mdef main_page(req):[m
return page[m
[m
[m
[31m-def init(capsule):[m
[31m- cfg = capsule.config()[m
[32m+[m[32mdef init(context):[m
[32m+[m[32m cfg = context.config()[m
[32m+[m
[32m+[m[32m global CONFIG[m
[32m+[m[32m CONFIG = cfg[m
[32m+[m
try:[m
mod_cfg = cfg.section('gitview')[m
[m
[36m@@ -479,13 +484,13 @@[m [mdef init(capsule):[m
HOSTNAME = cfg.hostnames()[0][m
[m
if 'cache_path' in mod_cfg:[m
[31m- capsule.add_cache(GitViewCache(HOSTNAME, mod_cfg['cache_path']))[m
[32m+[m[32m context.add_cache(GitViewCache(HOSTNAME, mod_cfg['cache_path']))[m
[m
for name, url_root, _ in repositories():[m
[31m- print(f' Adding repository "{name}"...')[m
[31m- capsule.add('/', main_page, hostname=HOSTNAME)[m
[31m- capsule.add(url_root[:-1], redirect_to_default, hostname=HOSTNAME)[m
[31m- capsule.add(url_root + '*', handle_request, hostname=HOSTNAME)[m
[32m+[m[32m context.print(f' Adding repository "{name}"...')[m
[32m+[m[32m context.add('/', main_page, hostname=HOSTNAME)[m
[32m+[m[32m context.add(url_root[:-1], redirect_to_default, hostname=HOSTNAME)[m
[32m+[m[32m context.add(url_root + '*', handle_request, hostname=HOSTNAME)[m
[m
except KeyError:[m
# GitView not configured.[m
[1mdiff --git a/gmcapsule/modules/90_cgi.py b/gmcapsule/modules/90_cgi.py[m
[1mindex a533163..9839c18 100644[m
[1m--- a/gmcapsule/modules/90_cgi.py[m
[1m+++ b/gmcapsule/modules/90_cgi.py[m
[36m@@ -10,11 +10,11 @@[m [mimport subprocess[m
import urllib.parse[m
[m
import gmcapsule[m
[31m-from gmcapsule import Capsule[m
[m
[m
class CgiContext:[m
[31m- def __init__(self, url_path, args, work_dir=None):[m
[32m+[m[32m def __init__(self, port, url_path, args, work_dir=None):[m
[32m+[m[32m self.port = port[m
self.args = args[m
self.base_path = url_path[m
if self.base_path.endswith('/*'):[m
[36m@@ -35,7 +35,7 @@[m [mclass CgiContext:[m
env_vars['SERVER_SOFTWARE'] = 'GmCapsule/' + gmcapsule.__version__[m
env_vars['SERVER_PROTOCOL'] = req.scheme.upper()[m
env_vars['SERVER_NAME'] = req.hostname[m
[31m- env_vars['SERVER_PORT'] = str(Capsule.config().port())[m
[32m+[m[32m env_vars['SERVER_PORT'] = str(self.port)[m
env_vars[req.scheme.upper() + '_URL'] = f"{req.scheme}://{req.hostname}{req.path}" + ([m
'?' + req.query if req.query != None else '')[m
env_vars[req.scheme.upper() + '_URL_PATH'] = req.path[m
[36m@@ -89,9 +89,10 @@[m [mclass CgiContext:[m
[m
[m
class CgiTreeMapper:[m
[31m- def __init__(self, protocol, host, root_dir):[m
[32m+[m[32m def __init__(self, protocol, host, port, root_dir):[m
self.protocol = protocol[m
self.host = host[m
[32m+[m[32m self.port = port[m
self.root_dir = pathlib.Path(root_dir)[m
[m
def __call__(self, url_path):[m
[36m@@ -103,18 +104,18 @@[m [mclass CgiTreeMapper:[m
if os.path.isdir(fn):[m
return None[m
if os.access(fn, os.X_OK):[m
[31m- return CgiContext(url_path, [fn], work_dir=os.path.dirname(fn))[m
[32m+[m[32m return CgiContext(self.port, url_path, [fn], work_dir=os.path.dirname(fn))[m
return None[m
[m
[m
# # NOTE: This require restarting the server when binaries are added/removed.[m
[31m-# def add_cgibin_entrypoints_recursively(capsule, host, base, cur_dir=None):[m
[32m+[m[32m# def add_cgibin_entrypoints_recursively(context, host, base, cur_dir=None):[m
# if cur_dir is None:[m
# cur_dir = base[m
# for name in os.listdir(cur_dir):[m
# fn = cur_dir / name[m
# if os.path.isdir(fn):[m
[31m-# add_cgibin_entrypoints_recursively(capsule, host, base, fn)[m
[32m+[m[32m# add_cgibin_entrypoints_recursively(context, host, base, fn)[m
# elif os.access(fn, os.X_OK):[m
# protocol = 'gemini'[m
# if name.endswith(',titan'):[m
[36m@@ -125,31 +126,31 @@[m [mclass CgiTreeMapper:[m
# if protocol == 'titan':[m
# path = path[:-6][m
# print(f' {protocol}://{host}{path} ->', args)[m
[31m-# capsule.add(path, CgiContext(path, args, work_dir=cur_dir), host, protocol)[m
[32m+[m[32m# context.add(path, CgiContext(path, args, work_dir=cur_dir), host, protocol)[m
[m
[m
[31m-def init(capsule):[m
[31m- cfg = Capsule.config()[m
[32m+[m[32mdef init(context):[m
[32m+[m[32m cfg = context.config()[m
default_host = cfg.hostnames()[0][m
[m
# Custom entrypoints for specific URLs.[m
[31m- for section in Capsule.config().prefixed_sections('cgi.').values():[m
[32m+[m[32m for section in cfg.prefixed_sections('cgi.').values():[m
protocol = section.get('protocol', fallback='gemini')[m
host = section.get('host', fallback=default_host)[m
work_dir = section.get('cwd', fallback=None)[m
args = shlex.split(section.get('command'))[m
for path in shlex.split(section.get('path', fallback='/*')):[m
[31m- print(f' {protocol}://{host}{path} ->', args)[m
[31m- capsule.add(path, CgiContext(path, args, work_dir),[m
[32m+[m[32m context.print(f' {protocol}://{host}{path} ->', args)[m
[32m+[m[32m context.add(path, CgiContext(cfg.port(), path, args, work_dir),[m
host, protocol)[m
[m
# Automatic entrypoints for all executables.[m
[31m- bin_root = Capsule.config().ini.get('cgi', 'bin_root', fallback=None)[m
[32m+[m[32m bin_root = cfg.ini.get('cgi', 'bin_root', fallback=None)[m
if bin_root != None:[m
bin_root = pathlib.Path(bin_root).resolve()[m
[31m- for host in Capsule.config().hostnames():[m
[32m+[m[32m for host in cfg.hostnames():[m
host_bin_root = bin_root / host[m
for protocol in ['gemini', 'titan']:[m
[31m- capsule.add([m
[31m- CgiTreeMapper(protocol, host, host_bin_root), None,[m
[32m+[m[32m context.add([m
[32m+[m[32m CgiTreeMapper(protocol, host, cfg.port(), host_bin_root), None,[m
host, protocol)[m
[1mdiff --git a/gmcapsule/modules/99_static.py b/gmcapsule/modules/99_static.py[m
[1mindex e30f332..05c74cf 100644[m
[1m--- a/gmcapsule/modules/99_static.py[m
[1m+++ b/gmcapsule/modules/99_static.py[m
[36m@@ -7,18 +7,18 @@[m [mimport fnmatch[m
import os.path[m
import string[m
[m
[31m-from gmcapsule import Capsule, get_mime_type[m
[32m+[m[32mfrom gmcapsule import get_mime_type[m
from pathlib import Path[m
[m
META = '.meta'[m
[32m+[m[32mCONFIG = None[m
[m
[m
def check_meta_rules(path, hostname):[m
[31m- cfg = Capsule.config()[m
[31m- root = cfg.root_dir() / hostname[m
[32m+[m[32m root = CONFIG.root_dir() / hostname[m
dir = Path(path).parent[m
while True:[m
[31m- if not str(dir).startswith(str(cfg.root_dir())):[m
[32m+[m[32m if not str(dir).startswith(str(CONFIG.root_dir())):[m
break[m
if (dir / META).exists():[m
for rule in open(dir / META, 'rt').readlines():[m
[36m@@ -41,7 +41,7 @@[m [mdef serve_file(req):[m
if req.scheme != 'gemini':[m
return 59, "Only Gemini requests allowed"[m
[m
[31m- cfg = Capsule.config()[m
[32m+[m[32m cfg = CONFIG[m
if req.path == '':[m
return 31, '/'[m
[m
[36m@@ -71,8 +71,10 @@[m [mdef serve_file(req):[m
return status, meta, (open(path, 'rb') if status == 20 else None)[m
[m
[m
[31m-def init(capsule):[m
[31m- cfg = capsule.config()[m
[32m+[m[32mdef init(context):[m
[32m+[m[32m cfg = context.config()[m
[32m+[m[32m global CONFIG[m
[32m+[m[32m CONFIG = cfg[m
if 'static' in cfg.ini and 'root' in cfg.section('static'):[m
[31m- print(' Content directory:', cfg.root_dir() / '{hostname}')[m
[31m- capsule.add('/*', serve_file)[m
[32m+[m[32m context.print(' Content directory:', cfg.root_dir() / '{hostname}')[m
[32m+[m[32m context.add('/*', serve_file)[m
[1mdiff --git a/gmcapsuled b/gmcapsuled[m
[1mindex 48f1b44..6f82d33 100755[m
[1m--- a/gmcapsuled[m
[1m+++ b/gmcapsuled[m
[36m@@ -5,25 +5,21 @@[m
# License: BSD-2-Clause[m
[m
import argparse[m
[31m-[m
import gmcapsule[m
from pathlib import Path[m
[m
VERSION = gmcapsule.__version__[m
[m
[31m-print(f"GmCapsule v{VERSION}")[m
[32m+[m[32mif __name__ == '__main__':[m
[32m+[m[32m print(f"GmCapsule v{VERSION}")[m
[m
[31m-argp = argparse.ArgumentParser(description='GmCapsule is an extensible server for Gemini and Titan.')[m
[31m-argp.add_argument('-c', '--config',[m
[31m- dest='config_file',[m
[31m- default=Path.home() / '.gmcapsulerc',[m
[31m- help='Configuration file to load at startup')[m
[31m-argp.add_argument('--trace-malloc',[m
[31m- action='store_true',[m
[31m- help='Enable memory allocation tracing (for debugging)')[m
[31m-args = argp.parse_args()[m
[32m+[m[32m argp = argparse.ArgumentParser(description='GmCapsule is an extensible server for Gemini and Titan.')[m
[32m+[m[32m argp.add_argument('-c', '--config',[m
[32m+[m[32m dest='config_file',[m
[32m+[m[32m default=Path.home() / '.gmcapsulerc',[m
[32m+[m[32m help='Configuration file to load at startup')[m
[32m+[m[32m args = argp.parse_args()[m
[m
[31m-cfg = gmcapsule.Config(args.config_file)[m
[31m-cfg.debug_memtrace = args.trace_malloc[m
[31m-capsule = gmcapsule.Capsule(cfg)[m
[31m-capsule.run()[m
[32m+[m[32m cfg = gmcapsule.Config(args.config_file)[m
[32m+[m[32m capsule = gmcapsule.Capsule(cfg)[m
[32m+[m[32m capsule.run()[m