Sunstead trust scoring project
1"""Tower B (content risk): a calibrated head on FROZEN diff embeddings (PRD Tier 1).
2
3Identity-blind by construction (PRD constraint 1): the only inputs are the diff
4embedding (diff_vectors, from the frozen Qwen3 transformer) and PR-intrinsic
5scalars (additions, deletions, files_touched, discussion_len). NO author
6DID/handle/history/aggregate ever enters this model -- leakage-free because the
7features simply do not contain identity.
8
9Trained on the clean_merge label with a time split; calibrated so the output is a
10real P(content safe). content_risk = 1 - P. Served only if it BEATS its baselines
11(majority + slop-kNN) on a time split AND a repo holdout -- same beat-the-baseline
12guardrail the GNN follows. fusion consults the verdict via load_if_winner().
13
14Model: L2-normalize the embedding -> L2-regularized logistic regression (a linear
15probe, the correct choice for frozen embeddings at low data; a tree on the raw
162560-dim vector overfits) -> isotonic calibration on a held-out fold.
17
18Optional: needs `uv pip install -e '.[learned]'` (scikit-learn). fusion imports
19this lazily and treats an ImportError / unfit model as "signal unavailable".
20"""
21
22from __future__ import annotations
23
24import json
25import pickle
26
27import numpy as np
28
29from .config import MODEL_DIR
30from .db import connection
31from .learned import _reliability # reuse the PRD 6.8 reliability curve (predicted vs actual)
32
33MODEL_PATH = MODEL_DIR / "content.pkl"
34VERDICT = MODEL_DIR / "content_verdict.json"
35
36# PR-intrinsic scalars appended after the embedding. Size/shape of the CHANGE only --
37# never identity. Order is fixed; it is the contract between _rows and serving.
38SCALAR_COLS = ["additions", "deletions", "files_touched", "discussion_len"]
39MIN_ROWS = 8 # hard floor: below this a linear probe is noise
40
41
42# --- feature construction (leakage-free: scaler fit on TRAIN only) ----------
43
44def _l2(E: np.ndarray) -> np.ndarray:
45 """Row-wise L2-normalize; zero rows stay zero (clamped denominator)."""
46 return E / np.clip(np.linalg.norm(E, axis=1, keepdims=True), 1e-9, None)
47
48
49def _featurize_fit(emb, scal):
50 """Fit the scalar standardizer on these rows; return (X, mean, std)."""
51 E = _l2(np.asarray(emb, dtype=float))
52 S = np.log1p(np.asarray(scal, dtype=float).clip(min=0)) # size scalars: tame the heavy tail
53 mean, std = S.mean(0), np.clip(S.std(0), 1e-9, None)
54 return np.hstack([E, (S - mean) / std]), mean, std
55
56
57def _featurize_apply(emb, scal, mean, std):
58 E = _l2(np.asarray(emb, dtype=float))
59 S = np.log1p(np.asarray(scal, dtype=float).clip(min=0))
60 return np.hstack([E, (S - mean) / std])
61
62
63# --- data access ------------------------------------------------------------
64
65def _rows(con):
66 """(pr_id, embedding, additions, deletions, files_touched, discussion_len,
67 clean_merge, opened_at, repo) for every PR with BOTH an embedding and a non-NULL
68 clean_merge label, ordered by opened_at so [:k]/[k:] is a leakage-free time split."""
69 return con.execute(
70 "SELECT v.pr_id, v.embedding, p.additions, p.deletions, p.files_touched, "
71 " p.discussion_len, l.clean_merge, p.opened_at, p.repo "
72 "FROM diff_vectors v "
73 "JOIN pr_labels l USING (pr_id) "
74 "JOIN pull_requests p ON p.pr_id = v.pr_id "
75 "WHERE l.clean_merge IS NOT NULL "
76 "ORDER BY p.opened_at"
77 ).fetchall()
78
79
80def _matrix(rows):
81 """rows -> (emb, scal, y, repos). y=1 is clean_merge (content safe)."""
82 emb = [r[1] for r in rows]
83 scal = [[r[2] or 0, r[3] or 0, r[4] or 0, r[5] or 0] for r in rows]
84 y = np.array([int(r[6]) for r in rows], dtype=int)
85 repos = [r[8] or "" for r in rows]
86 return emb, scal, y, repos
87
88
89def _require(rows) -> None:
90 classes = {int(r[6]) for r in rows}
91 if len(rows) < MIN_ROWS or len(classes) < 2:
92 raise SystemExit(
93 f"content head needs >={MIN_ROWS} embedded+labelled PRs spanning both classes; "
94 f"got {len(rows)} rows, classes={classes}. Run Phase 0 (trust.diffs), Phase 1 "
95 f"(backfill sh.tangled.repo.pull.status for a positive class), Phase 2 "
96 f"(trust.embed --build) first.")
97
98
99# --- the scorer -------------------------------------------------------------
100
101class ContentScorer:
102 """Serves P(content safe) / content_risk for a PR, identity-blind."""
103
104 def __init__(self, clf, iso, mean, std, emb_dim):
105 self.clf, self.iso, self.mean, self.std, self.emb_dim = clf, iso, mean, std, emb_dim
106
107 def _prob_safe(self, emb, scalars) -> float | None:
108 if emb is None or len(emb) != self.emb_dim: # dim mismatch (different EMBED_DIMENSIONS) -> unavailable
109 return None
110 X = _featurize_apply([emb], [scalars], self.mean, self.std)
111 raw = float(self.clf.predict_proba(X)[0, 1])
112 return float(self.iso.predict([raw])[0]) if self.iso is not None else raw
113
114 def risk_for(self, emb, scalars) -> float | None:
115 """content_risk from an embedding + scalars directly. None if dim mismatch."""
116 p = self._prob_safe(emb, scalars)
117 return None if p is None else 1.0 - p
118
119 def risk(self, con, pr_id: str | None = None, diff: str | None = None,
120 scalars: list | None = None) -> float | None:
121 """content_risk for a PR: prefer the stored embedding (cheap, every PR), else
122 embed the diff live (serving a brand-new PR). None if neither is available
123 (no embedding + no API key), so the gate treats content as simply absent."""
124 emb = None
125 if pr_id is not None:
126 row = con.execute("SELECT embedding FROM diff_vectors WHERE pr_id=?", [pr_id]).fetchone()
127 emb = row[0] if row else None
128 if emb is None and diff:
129 from . import embed as embed_mod
130 v = embed_mod.embed(diff) # None without FEATHERLESS_API_KEY
131 emb = v[0] if v else None
132 if emb is None:
133 return None
134 if scalars is None and pr_id is not None:
135 srow = con.execute(
136 f"SELECT {', '.join(SCALAR_COLS)} FROM pull_requests WHERE pr_id=?", [pr_id]).fetchone()
137 scalars = [s or 0 for s in srow] if srow else [0, 0, 0, 0]
138 return self.risk_for(emb, scalars or [0, 0, 0, 0])
139
140 def dump(self) -> dict:
141 return {"clf": self.clf, "iso": self.iso, "mean": self.mean, "std": self.std,
142 "emb_dim": self.emb_dim}
143
144
145# --- fit / train ------------------------------------------------------------
146
147def _fit(emb, scal, y, split: float = 0.7):
148 """Time-ordered emb/scalars/labels -> (ContentScorer, val_stats). Pure, no DB/IO.
149 Scaler + model fit on the train fold only; isotonic calibrated on the val fold."""
150 from sklearn.isotonic import IsotonicRegression
151 from sklearn.linear_model import LogisticRegression
152
153 k = max(2, int(len(emb) * split))
154 Xtr, mean, std = _featurize_fit(emb[:k], scal[:k])
155 Xval = _featurize_apply(emb[k:], scal[k:], mean, std)
156 ytr, yval = y[:k], y[k:]
157 if len(set(ytr.tolist())) < 2:
158 raise SystemExit("time-split train fold has a single class; need more history spanning both.")
159 clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr)
160 raw_val = clf.predict_proba(Xval)[:, 1]
161 iso = (IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval)
162 if len(set(yval.tolist())) > 1 else None) # isotonic needs both classes in the holdout
163 cal_val = iso.predict(raw_val) if iso is not None else raw_val
164 scorer = ContentScorer(clf, iso, mean, std, len(emb[0]))
165 return scorer, {"cal_val": np.asarray(cal_val), "yval": yval, "n_train": k, "n_val": len(yval)}
166
167
168def _save(scorer: ContentScorer) -> None:
169 MODEL_DIR.mkdir(parents=True, exist_ok=True)
170 MODEL_PATH.write_bytes(pickle.dumps(scorer.dump()))
171 global _loaded, _cache
172 _loaded, _cache = False, None # force reload of the fresh model
173
174
175def train(split: float = 0.7) -> dict:
176 """Fit + calibrate + save content.pkl (Phase 3). Returns the reliability curve."""
177 with connection(read_only=True) as con:
178 rows = _rows(con)
179 _require(rows)
180 emb, scal, y, _ = _matrix(rows)
181 scorer, st = _fit(emb, scal, y, split)
182 _save(scorer)
183 return {"rows": len(rows), "train": st["n_train"], "val": st["n_val"], "emb_dim": scorer.emb_dim,
184 "calibrated": scorer.iso is not None,
185 "reliability": _reliability(st["cal_val"], st["yval"]), "model": str(MODEL_PATH)}
186
187
188# --- eval + beat-the-baseline gate (Phase 5) --------------------------------
189
190def _auc(p_safe, y) -> float | None:
191 """ROC-AUC of P(safe) vs the clean label. None if the fold has a single class."""
192 from sklearn.metrics import roc_auc_score
193 if len(set(np.asarray(y).tolist())) < 2:
194 return None
195 return float(roc_auc_score(y, p_safe))
196
197
198def _ap(p_safe, y) -> float | None:
199 from sklearn.metrics import average_precision_score
200 if len(set(np.asarray(y).tolist())) < 2:
201 return None
202 return float(average_precision_score(y, p_safe))
203
204
205def _slop_baseline(emb_val, emb_train, y_train):
206 """slop-kNN as a P(safe) proxy: 1 - nearest-cosine to a TRAIN known-bad diff
207 (clean_merge=0). Leakage-free (train corpus only), no API (stored vectors)."""
208 bad = [e for e, yy in zip(emb_train, y_train) if int(yy) == 0]
209 if not bad:
210 return None
211 B = _l2(np.asarray(bad, dtype=float))
212 V = _l2(np.asarray(emb_val, dtype=float))
213 sim = (V @ B.T).max(axis=1)
214 return 1.0 - np.clip(sim, 0.0, 1.0)
215
216
217def _eval_fold(train_rows, val_rows) -> dict | None:
218 """Fit on train_rows, score val_rows. Returns model + slop-kNN AUC/AP on the val
219 fold, or None if either fold lacks both classes (AUC undefined -> can't claim a win)."""
220 emb_tr, scal_tr, ytr, _ = _matrix(train_rows)
221 emb_va, scal_va, yva, _ = _matrix(val_rows)
222 if len(set(ytr.tolist())) < 2 or len(set(yva.tolist())) < 2:
223 return None
224 from sklearn.linear_model import LogisticRegression
225 Xtr, mean, std = _featurize_fit(emb_tr, scal_tr)
226 clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr)
227 p_safe = clf.predict_proba(_featurize_apply(emb_va, scal_va, mean, std))[:, 1]
228 slop = _slop_baseline(emb_va, emb_tr, ytr)
229 return {"auc": _auc(p_safe, yva), "ap": _ap(p_safe, yva),
230 "slop_auc": (_auc(slop, yva) if slop is not None else None),
231 "n_val": len(yva), "pos_val": int(yva.sum())}
232
233
234def _repo_split(rows, frac: float = 0.3):
235 """Hold out whole repos (generalize to UNSEEN repos). Deterministic: the first
236 ceil(frac*R) repos by name, so the holdout never overlaps the training repos."""
237 repos = sorted({r[8] or "" for r in rows})
238 held = set(repos[: max(1, int(len(repos) * frac + 0.999))])
239 train = [r for r in rows if (r[8] or "") not in held]
240 val = [r for r in rows if (r[8] or "") in held]
241 return train, val, held
242
243
244def _wins(ev) -> bool:
245 """A fold is a win if the head beats majority (AUC>0.5) and slop-kNN (where the
246 slop baseline is computable). A missing/single-class fold is NOT a win (conservative)."""
247 if not ev or ev["auc"] is None:
248 return False
249 beats_majority = ev["auc"] > 0.5
250 beats_slop = ev["slop_auc"] is None or ev["auc"] > ev["slop_auc"]
251 return bool(beats_majority and beats_slop)
252
253
254def _jsonable(o):
255 if isinstance(o, (np.floating, np.integer)):
256 return float(o)
257 return str(o)
258
259
260def train_and_compare(split: float = 0.7) -> dict:
261 """Phase 5: fit + save the head, then write a verdict. content_wins is True only
262 if it beats majority + slop-kNN on BOTH a time split and a repo holdout. fusion
263 serves the head only when content_wins (load_if_winner)."""
264 with connection(read_only=True) as con:
265 rows = _rows(con)
266 _require(rows)
267 emb, scal, y, _ = _matrix(rows)
268 scorer, st = _fit(emb, scal, y, split)
269 _save(scorer)
270
271 k = max(2, int(len(rows) * split))
272 time_eval = _eval_fold(rows[:k], rows[k:])
273 repo_tr, repo_va, held = _repo_split(rows)
274 repo_eval = _eval_fold(repo_tr, repo_va)
275 content_wins = bool(_wins(time_eval) and _wins(repo_eval))
276
277 verdict = {"rows": len(rows), "time_split": time_eval, "repo_holdout": repo_eval,
278 "held_repos": len(held), "content_wins": content_wins,
279 "reliability": _reliability(st["cal_val"], st["yval"])}
280 MODEL_DIR.mkdir(parents=True, exist_ok=True)
281 VERDICT.write_text(json.dumps(verdict, indent=2, default=_jsonable))
282 return verdict
283
284
285# --- serving (winner-gated, like gnn.load_if_winner) ------------------------
286
287_cache: ContentScorer | None = None
288_loaded = False
289
290
291def load() -> ContentScorer | None:
292 global _cache, _loaded
293 if not _loaded:
294 _loaded = True
295 if MODEL_PATH.exists():
296 d = pickle.loads(MODEL_PATH.read_bytes())
297 _cache = ContentScorer(d["clf"], d["iso"], d["mean"], d["std"], d["emb_dim"])
298 return _cache
299
300
301def load_if_winner() -> ContentScorer | None:
302 """Serving hook used by fusion: the head ONLY if it beat its baselines (else None,
303 and the gate keeps Claude-only content -- never serve a model that didn't beat baseline)."""
304 if not (VERDICT.exists() and MODEL_PATH.exists()):
305 return None
306 if not json.loads(VERDICT.read_text()).get("content_wins"):
307 return None
308 return load()
309
310
311def main() -> None:
312 v = train_and_compare()
313 ts, rh = v["time_split"] or {}, v["repo_holdout"] or {}
314 print(f"[content] {v['rows']} labelled+embedded PRs")
315 print(f"[content] time-split: AUC={ts.get('auc')} slop_auc={ts.get('slop_auc')} "
316 f"n_val={ts.get('n_val')} pos={ts.get('pos_val')}")
317 print(f"[content] repo-holdout ({v['held_repos']} repos): AUC={rh.get('auc')} "
318 f"slop_auc={rh.get('slop_auc')} n_val={rh.get('n_val')} pos={rh.get('pos_val')}")
319 print(f"[content] content_wins={v['content_wins']} -> "
320 + ("SERVED (beats majority + slop-kNN on time AND repo holdout)" if v["content_wins"]
321 else "NOT served; gate keeps Claude-only content (beat-the-baseline guardrail)"))
322 print("[content] reliability (predicted vs actual P(safe)):")
323 for b in v["reliability"]:
324 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
325
326
327def demo() -> None:
328 """Self-check (no DB/API): synthetic embeddings separable by label -> a held-out
329 bad diff out-risks a clean one; print the reliability curve."""
330 rng = np.random.RandomState(0) # Math.random-free determinism; seeded numpy is fine
331 D = 16
332 emb, scal, y = [], [], []
333 for i in range(60): # time-ordered, classes alternate so both land in each fold
334 clean = i % 2
335 base = np.zeros(D); base[0] = 1.0 if clean else -1.0
336 emb.append((base + rng.normal(0, 0.3, D)).tolist())
337 scal.append([rng.randint(1, 200), rng.randint(0, 100), rng.randint(1, 10), rng.randint(0, 500)])
338 y.append(clean)
339 scorer, st = _fit(emb, scal, np.array(y), split=0.7)
340 r_clean = scorer.risk_for([1.0] + [0.0] * (D - 1), [50, 10, 3, 100])
341 r_bad = scorer.risk_for([-1.0] + [0.0] * (D - 1), [50, 10, 3, 100])
342 print(f"content_risk: bad={r_bad:.3f} clean={r_clean:.3f}")
343 for b in _reliability(st["cal_val"], st["yval"]):
344 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
345 assert scorer.risk_for([1.0] * D, [0, 0, 0, 0]) is not None
346 assert scorer.risk_for([1.0] * (D + 1), [0, 0, 0, 0]) is None, "dim mismatch must be 'unavailable', not a crash"
347 assert r_bad > r_clean, "content head must score a known-bad diff riskier than a clean one"
348 print("ok")
349
350
351if __name__ == "__main__":
352 demo()