Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 10 kB View raw
1"""Phase 1 backstop: git-on-knots merge detection. 2 3`sh.tangled.repo.pull.status` is published for almost no PRs (41 network-wide), so 4merge truth lives on the knots. Each PR carries a `git format-patch` (the patchBlob); 5a PR is MERGED if its patch-id matches a commit on the target repo's default branch. 6So: clone each target repo bare once, patch-id every commit on HEAD, match all that 7repo's PRs against the set, store labels, delete the clone. 8 9Writes `pull_requests.merged=TRUE` for matches (the schema's intended backstop column; 10the real `merged` field is always NULL on sh.tangled.repo.pull). Also stores the 11decompressed patch into `diff_text` while it's in hand (the Phase-0 fetch, for free). 12 13ponytail: patch-id matching is rebase-tolerant but squash-blind -- a squash/heavily-edited 14merge shows up as a false negative. The patches carry a Gerrit-style `Change-Id:`; if recall 15proves too low, match on that instead (survives squash). Idempotent: re-running just 16recomputes; the UPDATEs are harmless to repeat. 17""" 18 19from __future__ import annotations 20 21import argparse 22import gzip 23import json 24import shutil 25import re 26import subprocess 27import tempfile 28from concurrent.futures import ThreadPoolExecutor 29 30from .backfill import _pds 31from .config import STAGING_DIR 32from .db import connection, ensure_schema 33from .diffs import MAX_BLOB_BYTES, MAX_DIFF_CHARS, _cid_for, _get_blob 34 35CLONE_TIMEOUT = 180 # seconds; skip a pathologically huge/slow repo rather than hang the run 36 37 38def _patch_id(patch: bytes) -> str | None: 39 """git patch-id --stable -> the leading hash, or None. Same algorithm both sides 40 (PR patch and branch commits) is all that matters for comparison.""" 41 try: 42 out = subprocess.run(["git", "patch-id", "--stable"], input=patch, 43 capture_output=True, timeout=30).stdout.decode().split() 44 return out[0] if out else None 45 except Exception: 46 return None 47 48 49_CHANGE_ID = re.compile(r"Change-Id:\s*(\S+)") 50 51 52def _change_id(patch: str) -> str | None: 53 """The Gerrit-style Change-Id in a format-patch's message, if any. Survives rebase 54 AND squash (git keeps the trailer), so it catches merges that patch-id misses.""" 55 m = _CHANGE_ID.search(patch) 56 return m.group(1) if m else None 57 58 59def _branch_keys(bare: str) -> tuple[set[str], set[str]]: 60 """(patch_ids, change_ids) over every non-merge commit reachable from HEAD. 61 Two independent fingerprints: patch-id matches exact/rebased patches; Change-Id also 62 matches squashed/amended ones (where the diff changed but the trailer was preserved).""" 63 log = subprocess.run(["git", "-C", bare, "log", "-p", "--no-merges", "HEAD"], 64 capture_output=True, timeout=CLONE_TIMEOUT).stdout 65 pid = subprocess.run(["git", "patch-id", "--stable"], input=log, 66 capture_output=True, timeout=CLONE_TIMEOUT).stdout.decode() 67 patch_ids = {ln.split()[0] for ln in pid.splitlines() if ln.split()} 68 change_ids = set(_CHANGE_ID.findall(log.decode("utf-8", "replace"))) 69 return patch_ids, change_ids 70 71 72def _full_patch(did: str, cid: str) -> str | None: 73 """Decompressed patch text, UNCAPPED (patch-id needs the whole diff to match).""" 74 pds = _pds(did) 75 if not pds: 76 return None 77 blob = _get_blob(pds, did, cid) 78 if not blob or len(blob) > MAX_BLOB_BYTES: 79 return None 80 try: 81 return gzip.decompress(blob).decode("utf-8", "replace") or None 82 except Exception: 83 return None 84 85 86def _candidates(con) -> dict[str, list[str]]: 87 """repoDid -> clone URLs `https://{knot}/{owner_did}/{name}` (non-localhost), deduped. 88 A repoDid can have several repo records (re-registrations/forks); try them in order.""" 89 out: dict[str, list[str]] = {} 90 for did, rkey, rec in con.execute( 91 "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo'").fetchall(): 92 v = json.loads(rec) 93 rd, knot = v.get("repoDid"), (v.get("knot") or "") 94 if not rd or knot.startswith("localhost"): 95 continue 96 url = f"https://{knot}/{did}/{rkey}" 97 out.setdefault(rd, []) 98 if url not in out[rd]: 99 out[rd].append(url) 100 return out 101 102 103def _targets(con) -> dict[str, list[tuple[str, str, str]]]: 104 """repoDid -> [(pr_id, author_did, patch_cid)] for every pull targeting it.""" 105 out: dict[str, list[tuple[str, str, str]]] = {} 106 for did, rkey, rec in con.execute( 107 "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo.pull'").fetchall(): 108 v = json.loads(rec) 109 t = v.get("target") or {} 110 rd = t.get("repoDid") or t.get("repo") 111 cid = _cid_for(rec) 112 if not rd or not cid: 113 continue 114 pr_id = f"{did}/sh.tangled.repo.pull/{rkey}" 115 out.setdefault(rd, []).append((pr_id, did, cid)) 116 return out 117 118 119def _clone(url: str, dest: str) -> bool: 120 try: 121 subprocess.run(["git", "clone", "--bare", "--single-branch", url, dest], 122 capture_output=True, timeout=CLONE_TIMEOUT, 123 env={"GIT_TERMINAL_PROMPT": "0"}, check=True) 124 return True 125 except Exception: 126 return False 127 128 129def _process(repodid: str, urls: list[str], prs: list[tuple[str, str, str]]) -> dict: 130 """Clone the repo, match each of its PRs, return {merged:set, diffs:[(pr_id,text)]}. 131 Always removes its clone before returning, so concurrent repos bound disk use.""" 132 tmp = tempfile.mkdtemp(dir=str(STAGING_DIR), prefix="merged-") 133 bare = f"{tmp}/repo.git" 134 try: 135 if not any(_clone(u, bare) for u in urls): 136 return {"merged": set(), "diffs": [], "cloned": False} 137 branch_pids, branch_cids = _branch_keys(bare) 138 merged, diffs = set(), [] 139 for pr_id, did, cid in prs: 140 patch = _full_patch(did, cid) 141 if not patch: 142 continue 143 diffs.append((pr_id, patch[:MAX_DIFF_CHARS])) # Phase-0 freebie (capped) 144 pid = _patch_id(patch.encode()) 145 chid = _change_id(patch) # squash/rebase-proof fallback 146 if (pid and pid in branch_pids) or (chid and chid in branch_cids): 147 merged.add(pr_id) 148 return {"merged": merged, "diffs": diffs, "cloned": True} 149 except Exception: 150 return {"merged": set(), "diffs": [], "cloned": False} 151 finally: 152 shutil.rmtree(tmp, ignore_errors=True) 153 154 155def _write(merged: set[str], diffs: list[tuple[str, str]]) -> bool: 156 """One write lock: merge labels + (any still-missing) diff_text. Patient retry 157 (~120s) rides out a concurrent score-loop writer instead of crashing the run; if 158 it still can't get the lock, the chunk is dropped (logged) and an idempotent 159 re-run recovers it -- never abort 600 repos of work over one lock loss.""" 160 if not merged and not diffs: 161 return True 162 try: 163 with connection(read_only=False, attempts=480, delay=0.25) as con: 164 if merged: 165 con.executemany("UPDATE pull_requests SET merged=TRUE WHERE pr_id=?", 166 [(p,) for p in merged]) 167 if diffs: 168 con.executemany( # diffs is (pr_id, text); the UPDATE binds (text, pr_id) 169 "UPDATE pull_requests SET diff_text=? WHERE pr_id=? AND diff_text IS NULL", 170 [(text, pid) for pid, text in diffs]) 171 return True 172 except Exception as e: 173 print(f"[merged] WRITE LOST to lock ({len(merged)} labels, {len(diffs)} diffs) -- " 174 f"re-run to recover: {e}", flush=True) 175 return False 176 177 178def detect(max_repos: int | None = None, workers: int = 6) -> dict: 179 """Clone every target repo, patch-id match its PRs, write merge labels. 180 Repos run concurrently (network+disk+git); DB writes funnel through the main thread.""" 181 ensure_schema() 182 with connection(read_only=True) as con: 183 cands, targets = _candidates(con), _targets(con) 184 work = [(rd, cands[rd], prs) for rd, prs in targets.items() if rd in cands] 185 if max_repos: 186 work = work[:max_repos] 187 skipped_unresolved = sum(len(prs) for rd, prs in targets.items() if rd not in cands) 188 print(f"[merged] {len(work)} resolvable repos, " 189 f"{sum(len(p) for _,_,p in work)} PRs ({skipped_unresolved} PRs have no clone URL)", flush=True) 190 191 total_merged = total_diffs = repos_failed = writes_lost = 0 192 with ThreadPoolExecutor(max_workers=workers) as ex: 193 for i, res in enumerate(ex.map(lambda w: _process(*w), work), 1): 194 if not res["cloned"]: 195 repos_failed += 1 196 if _write(res["merged"], res["diffs"]): 197 total_merged += len(res["merged"]) 198 total_diffs += len(res["diffs"]) 199 else: 200 writes_lost += 1 201 if i % 25 == 0: 202 print(f"[merged] {i}/{len(work)} repos, {total_merged} merged labels, " 203 f"{total_diffs} diffs ({repos_failed} unreachable, {writes_lost} writes lost)", flush=True) 204 out = {"repos": len(work), "merged": total_merged, "diffs_stored": total_diffs, 205 "repos_failed": repos_failed, "writes_lost": writes_lost, "prs_unresolved": skipped_unresolved} 206 print(f"[merged] DONE: {out}", flush=True) 207 return out 208 209 210def demo() -> None: 211 """Offline self-check: a patch-id is stable for the same diff and differs for a 212 changed diff, so set-membership matching is sound. No network/clone.""" 213 d1 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+b\n" 214 d2 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+c\n" 215 p1, p1b, p2 = _patch_id(d1), _patch_id(d1), _patch_id(d2) 216 assert p1 and p1 == p1b, "patch-id not stable for identical diff" 217 assert p1 != p2, "different diffs must yield different patch-ids" 218 assert p1 in {p1, "deadbeef"} and p2 not in {p1}, "membership logic wrong" 219 print("patch-id stable + discriminating ok") 220 221 222def main() -> None: 223 ap = argparse.ArgumentParser(description="git-on-knots merge detection -> pull_requests.merged") 224 ap.add_argument("--max-repos", type=int, default=None, help="cap repos processed (smoke test)") 225 ap.add_argument("--workers", type=int, default=6, help="concurrent repo clones (default 6)") 226 ap.add_argument("--demo", action="store_true", help="offline self-check, then exit") 227 args = ap.parse_args() 228 if args.demo: 229 demo() 230 return 231 detect(max_repos=args.max_repos, workers=args.workers) 232 233 234if __name__ == "__main__": 235 main()