Sunstead trust scoring project
1"""Phase 1 backstop: git-on-knots merge detection.
2
3`sh.tangled.repo.pull.status` is published for almost no PRs (41 network-wide), so
4merge truth lives on the knots. Each PR carries a `git format-patch` (the patchBlob);
5a PR is MERGED if its patch-id matches a commit on the target repo's default branch.
6So: clone each target repo bare once, patch-id every commit on HEAD, match all that
7repo's PRs against the set, store labels, delete the clone.
8
9Writes `pull_requests.merged=TRUE` for matches (the schema's intended backstop column;
10the real `merged` field is always NULL on sh.tangled.repo.pull). Also stores the
11decompressed patch into `diff_text` while it's in hand (the Phase-0 fetch, for free).
12
13ponytail: patch-id matching is rebase-tolerant but squash-blind -- a squash/heavily-edited
14merge shows up as a false negative. The patches carry a Gerrit-style `Change-Id:`; if recall
15proves too low, match on that instead (survives squash). Idempotent: re-running just
16recomputes; the UPDATEs are harmless to repeat.
17"""
18
19from __future__ import annotations
20
21import argparse
22import gzip
23import json
24import shutil
25import re
26import subprocess
27import tempfile
28from concurrent.futures import ThreadPoolExecutor
29
30from .backfill import _pds
31from .config import STAGING_DIR
32from .db import connection, ensure_schema
33from .diffs import MAX_BLOB_BYTES, MAX_DIFF_CHARS, _cid_for, _get_blob
34
35CLONE_TIMEOUT = 180 # seconds; skip a pathologically huge/slow repo rather than hang the run
36
37
38def _patch_id(patch: bytes) -> str | None:
39 """git patch-id --stable -> the leading hash, or None. Same algorithm both sides
40 (PR patch and branch commits) is all that matters for comparison."""
41 try:
42 out = subprocess.run(["git", "patch-id", "--stable"], input=patch,
43 capture_output=True, timeout=30).stdout.decode().split()
44 return out[0] if out else None
45 except Exception:
46 return None
47
48
49_CHANGE_ID = re.compile(r"Change-Id:\s*(\S+)")
50
51
52def _change_id(patch: str) -> str | None:
53 """The Gerrit-style Change-Id in a format-patch's message, if any. Survives rebase
54 AND squash (git keeps the trailer), so it catches merges that patch-id misses."""
55 m = _CHANGE_ID.search(patch)
56 return m.group(1) if m else None
57
58
59def _branch_keys(bare: str) -> tuple[set[str], set[str]]:
60 """(patch_ids, change_ids) over every non-merge commit reachable from HEAD.
61 Two independent fingerprints: patch-id matches exact/rebased patches; Change-Id also
62 matches squashed/amended ones (where the diff changed but the trailer was preserved)."""
63 log = subprocess.run(["git", "-C", bare, "log", "-p", "--no-merges", "HEAD"],
64 capture_output=True, timeout=CLONE_TIMEOUT).stdout
65 pid = subprocess.run(["git", "patch-id", "--stable"], input=log,
66 capture_output=True, timeout=CLONE_TIMEOUT).stdout.decode()
67 patch_ids = {ln.split()[0] for ln in pid.splitlines() if ln.split()}
68 change_ids = set(_CHANGE_ID.findall(log.decode("utf-8", "replace")))
69 return patch_ids, change_ids
70
71
72def _full_patch(did: str, cid: str) -> str | None:
73 """Decompressed patch text, UNCAPPED (patch-id needs the whole diff to match)."""
74 pds = _pds(did)
75 if not pds:
76 return None
77 blob = _get_blob(pds, did, cid)
78 if not blob or len(blob) > MAX_BLOB_BYTES:
79 return None
80 try:
81 return gzip.decompress(blob).decode("utf-8", "replace") or None
82 except Exception:
83 return None
84
85
86def _candidates(con) -> dict[str, list[str]]:
87 """repoDid -> clone URLs `https://{knot}/{owner_did}/{name}` (non-localhost), deduped.
88 A repoDid can have several repo records (re-registrations/forks); try them in order."""
89 out: dict[str, list[str]] = {}
90 for did, rkey, rec in con.execute(
91 "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo'").fetchall():
92 v = json.loads(rec)
93 rd, knot = v.get("repoDid"), (v.get("knot") or "")
94 if not rd or knot.startswith("localhost"):
95 continue
96 url = f"https://{knot}/{did}/{rkey}"
97 out.setdefault(rd, [])
98 if url not in out[rd]:
99 out[rd].append(url)
100 return out
101
102
103def _targets(con) -> dict[str, list[tuple[str, str, str]]]:
104 """repoDid -> [(pr_id, author_did, patch_cid)] for every pull targeting it."""
105 out: dict[str, list[tuple[str, str, str]]] = {}
106 for did, rkey, rec in con.execute(
107 "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo.pull'").fetchall():
108 v = json.loads(rec)
109 t = v.get("target") or {}
110 rd = t.get("repoDid") or t.get("repo")
111 cid = _cid_for(rec)
112 if not rd or not cid:
113 continue
114 pr_id = f"{did}/sh.tangled.repo.pull/{rkey}"
115 out.setdefault(rd, []).append((pr_id, did, cid))
116 return out
117
118
119def _clone(url: str, dest: str) -> bool:
120 try:
121 subprocess.run(["git", "clone", "--bare", "--single-branch", url, dest],
122 capture_output=True, timeout=CLONE_TIMEOUT,
123 env={"GIT_TERMINAL_PROMPT": "0"}, check=True)
124 return True
125 except Exception:
126 return False
127
128
129def _process(repodid: str, urls: list[str], prs: list[tuple[str, str, str]]) -> dict:
130 """Clone the repo, match each of its PRs, return {merged:set, diffs:[(pr_id,text)]}.
131 Always removes its clone before returning, so concurrent repos bound disk use."""
132 tmp = tempfile.mkdtemp(dir=str(STAGING_DIR), prefix="merged-")
133 bare = f"{tmp}/repo.git"
134 try:
135 if not any(_clone(u, bare) for u in urls):
136 return {"merged": set(), "diffs": [], "cloned": False}
137 branch_pids, branch_cids = _branch_keys(bare)
138 merged, diffs = set(), []
139 for pr_id, did, cid in prs:
140 patch = _full_patch(did, cid)
141 if not patch:
142 continue
143 diffs.append((pr_id, patch[:MAX_DIFF_CHARS])) # Phase-0 freebie (capped)
144 pid = _patch_id(patch.encode())
145 chid = _change_id(patch) # squash/rebase-proof fallback
146 if (pid and pid in branch_pids) or (chid and chid in branch_cids):
147 merged.add(pr_id)
148 return {"merged": merged, "diffs": diffs, "cloned": True}
149 except Exception:
150 return {"merged": set(), "diffs": [], "cloned": False}
151 finally:
152 shutil.rmtree(tmp, ignore_errors=True)
153
154
155def _write(merged: set[str], diffs: list[tuple[str, str]]) -> bool:
156 """One write lock: merge labels + (any still-missing) diff_text. Patient retry
157 (~120s) rides out a concurrent score-loop writer instead of crashing the run; if
158 it still can't get the lock, the chunk is dropped (logged) and an idempotent
159 re-run recovers it -- never abort 600 repos of work over one lock loss."""
160 if not merged and not diffs:
161 return True
162 try:
163 with connection(read_only=False, attempts=480, delay=0.25) as con:
164 if merged:
165 con.executemany("UPDATE pull_requests SET merged=TRUE WHERE pr_id=?",
166 [(p,) for p in merged])
167 if diffs:
168 con.executemany( # diffs is (pr_id, text); the UPDATE binds (text, pr_id)
169 "UPDATE pull_requests SET diff_text=? WHERE pr_id=? AND diff_text IS NULL",
170 [(text, pid) for pid, text in diffs])
171 return True
172 except Exception as e:
173 print(f"[merged] WRITE LOST to lock ({len(merged)} labels, {len(diffs)} diffs) -- "
174 f"re-run to recover: {e}", flush=True)
175 return False
176
177
178def detect(max_repos: int | None = None, workers: int = 6) -> dict:
179 """Clone every target repo, patch-id match its PRs, write merge labels.
180 Repos run concurrently (network+disk+git); DB writes funnel through the main thread."""
181 ensure_schema()
182 with connection(read_only=True) as con:
183 cands, targets = _candidates(con), _targets(con)
184 work = [(rd, cands[rd], prs) for rd, prs in targets.items() if rd in cands]
185 if max_repos:
186 work = work[:max_repos]
187 skipped_unresolved = sum(len(prs) for rd, prs in targets.items() if rd not in cands)
188 print(f"[merged] {len(work)} resolvable repos, "
189 f"{sum(len(p) for _,_,p in work)} PRs ({skipped_unresolved} PRs have no clone URL)", flush=True)
190
191 total_merged = total_diffs = repos_failed = writes_lost = 0
192 with ThreadPoolExecutor(max_workers=workers) as ex:
193 for i, res in enumerate(ex.map(lambda w: _process(*w), work), 1):
194 if not res["cloned"]:
195 repos_failed += 1
196 if _write(res["merged"], res["diffs"]):
197 total_merged += len(res["merged"])
198 total_diffs += len(res["diffs"])
199 else:
200 writes_lost += 1
201 if i % 25 == 0:
202 print(f"[merged] {i}/{len(work)} repos, {total_merged} merged labels, "
203 f"{total_diffs} diffs ({repos_failed} unreachable, {writes_lost} writes lost)", flush=True)
204 out = {"repos": len(work), "merged": total_merged, "diffs_stored": total_diffs,
205 "repos_failed": repos_failed, "writes_lost": writes_lost, "prs_unresolved": skipped_unresolved}
206 print(f"[merged] DONE: {out}", flush=True)
207 return out
208
209
210def demo() -> None:
211 """Offline self-check: a patch-id is stable for the same diff and differs for a
212 changed diff, so set-membership matching is sound. No network/clone."""
213 d1 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+b\n"
214 d2 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+c\n"
215 p1, p1b, p2 = _patch_id(d1), _patch_id(d1), _patch_id(d2)
216 assert p1 and p1 == p1b, "patch-id not stable for identical diff"
217 assert p1 != p2, "different diffs must yield different patch-ids"
218 assert p1 in {p1, "deadbeef"} and p2 not in {p1}, "membership logic wrong"
219 print("patch-id stable + discriminating ok")
220
221
222def main() -> None:
223 ap = argparse.ArgumentParser(description="git-on-knots merge detection -> pull_requests.merged")
224 ap.add_argument("--max-repos", type=int, default=None, help="cap repos processed (smoke test)")
225 ap.add_argument("--workers", type=int, default=6, help="concurrent repo clones (default 6)")
226 ap.add_argument("--demo", action="store_true", help="offline self-check, then exit")
227 args = ap.parse_args()
228 if args.demo:
229 demo()
230 return
231 detect(max_repos=args.max_repos, workers=args.workers)
232
233
234if __name__ == "__main__":
235 main()