#!/usr/bin/python3 TOFU_PATH = "~/.git-remote-gemini-known-hosts" import socket import ssl from sys import argv, stderr, stdout, stdin, exit import re import subprocess from subprocess import Popen, PIPE, DEVNULL import zlib import os import cryptography from cryptography import x509 from threading import Thread stdout = stdout.buffer TOFU_PATH = os.path.expanduser(TOFU_PATH) class GeminiHost: def __init__(self, hostname, base_path="/", port=1965): self.hostname = hostname self.base_path = base_path self.port = port self.ctx = ssl.create_default_context() # TOFU try: os.mkdir(TOFU_PATH) except FileExistsError: pass tofu_fname = TOFU_PATH + "/" + self.hostname try: cert = open(tofu_fname, "r").read() except FileNotFoundError: self.ctx.check_hostname = False self.ctx.verify_mode = ssl.CERT_NONE sock = self.connect() cert = sock.getpeercert(True) cert = x509.load_der_x509_certificate(cert) cert = cert.public_bytes( cryptography.hazmat.primitives.serialization.Encoding.PEM, ).decode("ascii") f = open(tofu_fname, "w") f.write(cert) f.close() self.ctx.verify_mode = ssl.CERT_REQUIRED self.ctx.check_hostname = True self.ctx.load_verify_locations(cadata=cert) def connect(self): sock = socket.create_connection((self.hostname, self.port)) ssock = self.ctx.wrap_socket(sock, server_hostname=self.hostname) cert = ssock.getpeercert(True) return ssock def request(self, path, expect_mime=None): if expect_mime: expect_mime = expect_mime.encode() sock = self.connect() sock.write(str.encode( "gemini://" + self.hostname + self.base_path + path + "\r\n" )) status = sock.read(3) if status == b"20 ": mime = b"" while (ch := sock.read(1)) != b"\n": mime += ch mime = mime.strip() if expect_mime == None or expect_mime == mime: return sock sock.close() return None class GitCheckFile: def __init__(self): self.proc = subprocess.Popen([ "git", "cat-file", "--batch-check"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) self.stdin = self.proc.stdin self.stdout = self.proc.stdout def close(self): self.stdin.close() self.proc.wait() self.stdout.close() def call(self, name): self.stdin.write(name + "\n") self.stdin.flush() line = self.stdout.readline().split() if line[1] == "missing": return None else: return line[1] def read_line_b(f): line = b"" while (ch := f.read(1)) != b"\n" and ch != b"": line += ch return line + ch def load_pack_index(gem): findex = gem.request("objects/info/packs") pmap = {} if findex == None: return pmap packs = set() while line := read_line_b(findex): line = line.split() if (len(line) >= 2): packs.add(line[1].decode("ascii")) for pack in packs: idx = pack.replace(".pack", ".idx") req = gem.request("objects/pack/" + idx) proc = Popen(["git", "show-index"], stdin=PIPE, stdout=PIPE) while chunk := req.read(1024): proc.stdin.write(chunk) proc.stdin.close() while line := read_line_b(proc.stdout): line = line.split() if (len(line) >= 2): name = line[1].decode("ascii") pmap[name] = pack proc.wait() return pmap class DecompressFile: def __init__(self, source): self.source = source self.z = zlib.decompressobj() def close(self): self.source.close() def read(self, n): if self.z.eof: return b"" data = b"" while data == b"": new_data = self.source.read(n) if (new_data==b"" and self.z.unconsumed_tail==b""): break data = self.z.decompress( self.z.unconsumed_tail + new_data, n) return data def do_list(gem): sock = gem.request("HEAD") if sock: ref = sock.read(1024).split()[1] stdout.write(b"@" + ref + b" HEAD\n") sock = gem.request("info/refs") if not sock: stderr.write("\n\nError: failed to fetch file info/refs.\n") stderr.write("Not a valid git repository.\n") exit(-1) while (data := sock.read(1024)): stdout.write(data.replace(b"\t", b" ")) stdout.write(b"\n") def fetch_loose_obj(gem, name): obj_name = f"objects/{name[:2]}/{name[2:]}" res = gem.request(obj_name) if not res: stderr.write(f"\n\nError failed to fetch object {name}\n") exit(-1) res = DecompressFile(res) ftype = b"" while True: ch = res.read(1) if ch == b" ": break ftype += ch while True: ch = res.read(1) if ch == b"\x00": break # start a git hash-object process proc = subprocess.Popen(["git", "hash-object", "-w", "-t", ftype, "--stdin"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) while chunk := res.read(1024): proc.stdin.write(chunk) proc.stdin.close() obj_hash = proc.stdout.readline().split()[0].decode(encoding="ascii") return ftype.decode("ascii") def fetch_pack(gem, pack): req = gem.request(f"objects/pack/{pack}") if not req: stderr.write(f"\n\nError failed to fetch pack {pack}\n") exit(-1) proc = Popen(["git", "index-pack", "--stdin"], stdin=PIPE, stdout=DEVNULL) while chunk := req.read(1024): proc.stdin.write(chunk) proc.stdin.close() proc.wait() def get_obj_deps(ftype, name): deps = set() if ftype == "blob": return deps contents = subprocess.Popen(["git", "cat-file", "-p", name], stdout=subprocess.PIPE, text=True) if ftype == "commit": while line := contents.stdout.readline().split(): if (line[0] in ['parent', 'tree']): deps.add(line[1]) elif ftype == "tree": while (line := contents.stdout.readline()) != "": line = line.split() deps.add(line[2]) contents.stdout.close() contents.wait() return deps def do_fetch(gem, name, packs): queue = set() queue.add(name) check_file = GitCheckFile() while queue: next_obj = queue.pop() next_obj_type = check_file.call(next_obj) if next_obj_type == None: if next_obj in packs: pack = packs[next_obj] stderr.write(f"fetching {pack}\n") fetch_pack(gem, pack) next_obj_type = check_file.call(next_obj) else: stderr.write(f"fetching object {next_obj}\n") next_obj_type = fetch_loose_obj(gem, next_obj) queue.update(get_obj_deps(next_obj_type, next_obj)) def do_connect(gem, service): def move_data(src, dst): while chunk:= src.read(1024): dst.write(chunk) dst.flush() con = gem.request("?" + service, "application/x-git") if not con: stdout.write(b"fallback\n") return stdout.write(b"\n") stdout.flush() thread = Thread(target=move_data, args=(con, stdout)) thread.start() while chunk := stdin.buffer.read1(1024): con.write(chunk) con.shutdown(socket.SHUT_WR) thread.join() exit(0) if __name__ == "__main__": regex_url = re.compile("^[^:]*://([^/]*)(.*)$") url = regex_url.match(argv[2]) if url == None: raise Exception("malformed url") hostname = url[1] base_path = url[2] if (base_path[-1] != "/"): base_path += "/" gem = GeminiHost(hostname, base_path) packs = None while line := stdin.readline(): cmd = line.split() if line == "\n": # blank line is sent after a batch of fetch commands stdout.write(b"\n") elif cmd[0] == "capabilities": stdout.write(b"fetch\n") stdout.write(b"connect\n") stdout.write(b"\n") elif cmd[0] == "list": do_list(gem) elif cmd[0] == "fetch": if packs == None: packs = load_pack_index(gem) do_fetch(gem, cmd[1], packs) elif cmd[0] == "connect": do_connect(gem, cmd[1]) else: raise Exception("unknown command " + cmd[0]) try: stdout.flush() except BrokenPipeError: pass