Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 6.8 kB View raw
1"""Phase 0: fetch PR diffs (patchBlobs) into pull_requests.diff_text. 2 3The sh.tangled.repo.pull record carries its diff as a gzipped blob CID 4(rounds[-1].patchBlob.ref.$link), NOT inline. This resolves each pull's blob from 5the author's PDS, gunzips it to unified-diff text, and stores it -- the single 6highest-leverage unblock: it lights up the content head, Claude review, AND the 7slop-kNN, all of which are dead without diffs. 8 9Reuses backfill's _pds plumbing; the network fetch fans out over a thread pool, 10DB writes go in chunks (DuckDB is single-writer). Idempotent/resumable: only 11fetches pulls whose diff_text IS NULL, so re-running just resumes. Pause 12ingest/api/backfill first or the writes crawl on the single-writer lock. 13""" 14 15from __future__ import annotations 16 17import argparse 18import gzip 19import json 20import urllib.parse 21import urllib.request 22from concurrent.futures import ThreadPoolExecutor 23 24from .backfill import _pds 25from .db import connection, ensure_schema 26 27MAX_DIFF_CHARS = 50_000 # cap stored text; embeddings/Claude truncate well below this anyway 28MAX_BLOB_BYTES = 5_000_000 # a patch blob is normally < 100 KB; skip absurd ones, never OOM 29 30 31def _cid_for(record_json: str) -> str | None: 32 """rounds[-1].patchBlob.ref.$link from an archived sh.tangled.repo.pull record. 33 The LAST round is the final proposed change -- embed/review/store that one.""" 34 try: 35 rounds = (json.loads(record_json) or {}).get("rounds") or [] 36 pb = (rounds[-1].get("patchBlob") if rounds else None) or {} 37 return (pb.get("ref") or {}).get("$link") 38 except Exception: 39 return None 40 41 42def _get_blob(pds: str, did: str, cid: str) -> bytes | None: 43 """com.atproto.sync.getBlob -> raw bytes (the gzipped patch). None on any error 44 (a dead PDS / missing blob / 4xx must never abort the run).""" 45 q = urllib.parse.urlencode({"did": did, "cid": cid}) 46 url = f"{pds}/xrpc/com.atproto.sync.getBlob?{q}" 47 try: 48 req = urllib.request.Request(url, headers={"User-Agent": "trust-diffs"}) 49 with urllib.request.urlopen(req, timeout=30) as r: 50 return r.read(MAX_BLOB_BYTES + 1) # +1 so an oversized blob is detectable, not silently capped 51 except Exception: 52 return None 53 54 55def _diff_text(blob: bytes | None) -> str | None: 56 """Gunzip a patch blob to unified-diff text, capped. None if empty, oversized, 57 or not gzip/decodable (skip gracefully).""" 58 if not blob or len(blob) > MAX_BLOB_BYTES: 59 return None 60 try: 61 text = gzip.decompress(blob).decode("utf-8", "replace") 62 except Exception: # bad magic (BadGzipFile/OSError), truncation (EOFError), or a corrupt deflate 63 return None # body (zlib.error -- NOT an OSError) all skip this one blob, never abort the run 64 return text[:MAX_DIFF_CHARS] or None 65 66 67def _fetch_one(work: tuple[str, str, str]) -> tuple[str, str] | None: 68 """(pr_id, author_did, cid) -> (pr_id, diff_text), or None. Network only, so 69 WORKERS of these run concurrently without touching the single DB writer.""" 70 pr_id, did, cid = work 71 pds = _pds(did) 72 if not pds: 73 return None 74 text = _diff_text(_get_blob(pds, did, cid)) 75 return (pr_id, text) if text else None 76 77 78def _store(buf: list[tuple[str, str]]) -> None: 79 """Write a chunk of (pr_id, diff_text) under one short-lived write lock.""" 80 if not buf: 81 return 82 with connection(read_only=False) as con: 83 con.executemany("UPDATE pull_requests SET diff_text=? WHERE pr_id=?", 84 [(text, pr_id) for pr_id, text in buf]) 85 86 87def fetch_diffs(limit: int | None = None, workers: int = 12, chunk: int = 200) -> dict: 88 """Fetch+store diffs for every pull still missing one. The CID lives in the 89 archived events.record (joined back to the pull by the derive() pr_id convention 90 did||/||collection||/||rkey).""" 91 ensure_schema() 92 with connection(read_only=True) as con: 93 rows = con.execute( 94 "SELECT p.pr_id, p.author_did, e.record FROM pull_requests p " 95 "JOIN events e ON p.pr_id = e.did || '/' || e.collection || '/' || e.rkey " 96 "WHERE p.diff_text IS NULL AND e.collection = 'sh.tangled.repo.pull'" 97 + (f" LIMIT {int(limit)}" if limit else "") 98 ).fetchall() 99 work = [(pr_id, did, cid) for pr_id, did, rec in rows if (cid := _cid_for(rec))] 100 fetched = skipped = 0 101 buf: list[tuple[str, str]] = [] 102 with ThreadPoolExecutor(max_workers=workers) as ex: 103 for i, res in enumerate(ex.map(_fetch_one, work), 1): 104 if res is None: 105 skipped += 1 106 else: 107 buf.append(res) 108 fetched += 1 109 if len(buf) >= chunk: 110 _store(buf) 111 buf = [] 112 if i % 500 == 0: 113 print(f"[diffs] {i}/{len(work)} pulls, {fetched} stored ({skipped} skipped)", flush=True) 114 _store(buf) 115 out = {"candidates": len(rows), "with_cid": len(work), "stored": fetched, "skipped": skipped} 116 print(f"[diffs] DONE: {out}", flush=True) 117 return out 118 119 120def demo() -> None: 121 """Offline self-check: a gzipped unified diff round-trips through _diff_text (and 122 a non-gzip blob is skipped, not crashed), and _cid_for pulls the CID from a pull 123 record. No network -- the live fetch path is exercised by `python -m trust.diffs`.""" 124 sample = "diff --git a/x.py b/x.py\n@@ -1 +1 @@\n-old\n+new\n" 125 assert _diff_text(gzip.compress(sample.encode())) == sample, "gunzip round-trip failed" 126 assert "@@" in sample and "diff" in sample 127 assert _diff_text(b"not gzip at all") is None, "non-gzip blob must be skipped, not crash" 128 assert _diff_text(b"") is None 129 # valid gzip magic + garbage deflate body -> zlib.error (which is NOT an OSError); must skip, not crash. 130 assert _diff_text(b"\x1f\x8b\x08\x00" + bytes(20)) is None, "corrupt deflate body must be skipped, not abort the run" 131 rec = json.dumps({"rounds": [{"patchBlob": {"ref": {"$link": "bafyCID"}, "mimeType": "application/gzip"}}]}) 132 assert _cid_for(rec) == "bafyCID", "CID extraction from pull record failed" 133 assert _cid_for("{}") is None and _cid_for("not json") is None 134 print("gunzip round-trip + CID parse ok") 135 136 137def main() -> None: 138 ap = argparse.ArgumentParser(description="Fetch PR diff patchBlobs into pull_requests.diff_text") 139 ap.add_argument("--limit", type=int, default=None, help="cap pulls fetched this run (smoke test)") 140 ap.add_argument("--workers", type=int, default=12, help="concurrent blob fetchers (default 12)") 141 ap.add_argument("--demo", action="store_true", help="run the offline self-check and exit") 142 args = ap.parse_args() 143 if args.demo: 144 demo() 145 return 146 fetch_diffs(limit=args.limit, workers=args.workers) 147 148 149if __name__ == "__main__": 150 main()