Sunstead trust scoring project
1"""Phase 0: fetch PR diffs (patchBlobs) into pull_requests.diff_text.
2
3The sh.tangled.repo.pull record carries its diff as a gzipped blob CID
4(rounds[-1].patchBlob.ref.$link), NOT inline. This resolves each pull's blob from
5the author's PDS, gunzips it to unified-diff text, and stores it -- the single
6highest-leverage unblock: it lights up the content head, Claude review, AND the
7slop-kNN, all of which are dead without diffs.
8
9Reuses backfill's _pds plumbing; the network fetch fans out over a thread pool,
10DB writes go in chunks (DuckDB is single-writer). Idempotent/resumable: only
11fetches pulls whose diff_text IS NULL, so re-running just resumes. Pause
12ingest/api/backfill first or the writes crawl on the single-writer lock.
13"""
14
15from __future__ import annotations
16
17import argparse
18import gzip
19import json
20import urllib.parse
21import urllib.request
22from concurrent.futures import ThreadPoolExecutor
23
24from .backfill import _pds
25from .db import connection, ensure_schema
26
27MAX_DIFF_CHARS = 50_000 # cap stored text; embeddings/Claude truncate well below this anyway
28MAX_BLOB_BYTES = 5_000_000 # a patch blob is normally < 100 KB; skip absurd ones, never OOM
29
30
31def _cid_for(record_json: str) -> str | None:
32 """rounds[-1].patchBlob.ref.$link from an archived sh.tangled.repo.pull record.
33 The LAST round is the final proposed change -- embed/review/store that one."""
34 try:
35 rounds = (json.loads(record_json) or {}).get("rounds") or []
36 pb = (rounds[-1].get("patchBlob") if rounds else None) or {}
37 return (pb.get("ref") or {}).get("$link")
38 except Exception:
39 return None
40
41
42def _get_blob(pds: str, did: str, cid: str) -> bytes | None:
43 """com.atproto.sync.getBlob -> raw bytes (the gzipped patch). None on any error
44 (a dead PDS / missing blob / 4xx must never abort the run)."""
45 q = urllib.parse.urlencode({"did": did, "cid": cid})
46 url = f"{pds}/xrpc/com.atproto.sync.getBlob?{q}"
47 try:
48 req = urllib.request.Request(url, headers={"User-Agent": "trust-diffs"})
49 with urllib.request.urlopen(req, timeout=30) as r:
50 return r.read(MAX_BLOB_BYTES + 1) # +1 so an oversized blob is detectable, not silently capped
51 except Exception:
52 return None
53
54
55def _diff_text(blob: bytes | None) -> str | None:
56 """Gunzip a patch blob to unified-diff text, capped. None if empty, oversized,
57 or not gzip/decodable (skip gracefully)."""
58 if not blob or len(blob) > MAX_BLOB_BYTES:
59 return None
60 try:
61 text = gzip.decompress(blob).decode("utf-8", "replace")
62 except Exception: # bad magic (BadGzipFile/OSError), truncation (EOFError), or a corrupt deflate
63 return None # body (zlib.error -- NOT an OSError) all skip this one blob, never abort the run
64 return text[:MAX_DIFF_CHARS] or None
65
66
67def _fetch_one(work: tuple[str, str, str]) -> tuple[str, str] | None:
68 """(pr_id, author_did, cid) -> (pr_id, diff_text), or None. Network only, so
69 WORKERS of these run concurrently without touching the single DB writer."""
70 pr_id, did, cid = work
71 pds = _pds(did)
72 if not pds:
73 return None
74 text = _diff_text(_get_blob(pds, did, cid))
75 return (pr_id, text) if text else None
76
77
78def _store(buf: list[tuple[str, str]]) -> None:
79 """Write a chunk of (pr_id, diff_text) under one short-lived write lock."""
80 if not buf:
81 return
82 with connection(read_only=False) as con:
83 con.executemany("UPDATE pull_requests SET diff_text=? WHERE pr_id=?",
84 [(text, pr_id) for pr_id, text in buf])
85
86
87def fetch_diffs(limit: int | None = None, workers: int = 12, chunk: int = 200) -> dict:
88 """Fetch+store diffs for every pull still missing one. The CID lives in the
89 archived events.record (joined back to the pull by the derive() pr_id convention
90 did||/||collection||/||rkey)."""
91 ensure_schema()
92 with connection(read_only=True) as con:
93 rows = con.execute(
94 "SELECT p.pr_id, p.author_did, e.record FROM pull_requests p "
95 "JOIN events e ON p.pr_id = e.did || '/' || e.collection || '/' || e.rkey "
96 "WHERE p.diff_text IS NULL AND e.collection = 'sh.tangled.repo.pull'"
97 + (f" LIMIT {int(limit)}" if limit else "")
98 ).fetchall()
99 work = [(pr_id, did, cid) for pr_id, did, rec in rows if (cid := _cid_for(rec))]
100 fetched = skipped = 0
101 buf: list[tuple[str, str]] = []
102 with ThreadPoolExecutor(max_workers=workers) as ex:
103 for i, res in enumerate(ex.map(_fetch_one, work), 1):
104 if res is None:
105 skipped += 1
106 else:
107 buf.append(res)
108 fetched += 1
109 if len(buf) >= chunk:
110 _store(buf)
111 buf = []
112 if i % 500 == 0:
113 print(f"[diffs] {i}/{len(work)} pulls, {fetched} stored ({skipped} skipped)", flush=True)
114 _store(buf)
115 out = {"candidates": len(rows), "with_cid": len(work), "stored": fetched, "skipped": skipped}
116 print(f"[diffs] DONE: {out}", flush=True)
117 return out
118
119
120def demo() -> None:
121 """Offline self-check: a gzipped unified diff round-trips through _diff_text (and
122 a non-gzip blob is skipped, not crashed), and _cid_for pulls the CID from a pull
123 record. No network -- the live fetch path is exercised by `python -m trust.diffs`."""
124 sample = "diff --git a/x.py b/x.py\n@@ -1 +1 @@\n-old\n+new\n"
125 assert _diff_text(gzip.compress(sample.encode())) == sample, "gunzip round-trip failed"
126 assert "@@" in sample and "diff" in sample
127 assert _diff_text(b"not gzip at all") is None, "non-gzip blob must be skipped, not crash"
128 assert _diff_text(b"") is None
129 # valid gzip magic + garbage deflate body -> zlib.error (which is NOT an OSError); must skip, not crash.
130 assert _diff_text(b"\x1f\x8b\x08\x00" + bytes(20)) is None, "corrupt deflate body must be skipped, not abort the run"
131 rec = json.dumps({"rounds": [{"patchBlob": {"ref": {"$link": "bafyCID"}, "mimeType": "application/gzip"}}]})
132 assert _cid_for(rec) == "bafyCID", "CID extraction from pull record failed"
133 assert _cid_for("{}") is None and _cid_for("not json") is None
134 print("gunzip round-trip + CID parse ok")
135
136
137def main() -> None:
138 ap = argparse.ArgumentParser(description="Fetch PR diff patchBlobs into pull_requests.diff_text")
139 ap.add_argument("--limit", type=int, default=None, help="cap pulls fetched this run (smoke test)")
140 ap.add_argument("--workers", type=int, default=12, help="concurrent blob fetchers (default 12)")
141 ap.add_argument("--demo", action="store_true", help="run the offline self-check and exit")
142 args = ap.parse_args()
143 if args.demo:
144 demo()
145 return
146 fetch_diffs(limit=args.limit, workers=args.workers)
147
148
149if __name__ == "__main__":
150 main()