Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 16 kB View raw
1"""Tower B (content risk): a calibrated head on FROZEN diff embeddings (PRD Tier 1). 2 3Identity-blind by construction (PRD constraint 1): the only inputs are the diff 4embedding (diff_vectors, from the frozen Qwen3 transformer) and PR-intrinsic 5scalars (additions, deletions, files_touched, discussion_len). NO author 6DID/handle/history/aggregate ever enters this model -- leakage-free because the 7features simply do not contain identity. 8 9Trained on the clean_merge label with a time split; calibrated so the output is a 10real P(content safe). content_risk = 1 - P. Served only if it BEATS its baselines 11(majority + slop-kNN) on a time split AND a repo holdout -- same beat-the-baseline 12guardrail the GNN follows. fusion consults the verdict via load_if_winner(). 13 14Model: L2-normalize the embedding -> L2-regularized logistic regression (a linear 15probe, the correct choice for frozen embeddings at low data; a tree on the raw 162560-dim vector overfits) -> isotonic calibration on a held-out fold. 17 18Optional: needs `uv pip install -e '.[learned]'` (scikit-learn). fusion imports 19this lazily and treats an ImportError / unfit model as "signal unavailable". 20""" 21 22from __future__ import annotations 23 24import json 25import pickle 26 27import numpy as np 28 29from .config import MODEL_DIR 30from .db import connection 31from .learned import _reliability # reuse the PRD 6.8 reliability curve (predicted vs actual) 32 33MODEL_PATH = MODEL_DIR / "content.pkl" 34VERDICT = MODEL_DIR / "content_verdict.json" 35 36# PR-intrinsic scalars appended after the embedding. Size/shape of the CHANGE only -- 37# never identity. Order is fixed; it is the contract between _rows and serving. 38SCALAR_COLS = ["additions", "deletions", "files_touched", "discussion_len"] 39MIN_ROWS = 8 # hard floor: below this a linear probe is noise 40 41 42# --- feature construction (leakage-free: scaler fit on TRAIN only) ---------- 43 44def _l2(E: np.ndarray) -> np.ndarray: 45 """Row-wise L2-normalize; zero rows stay zero (clamped denominator).""" 46 return E / np.clip(np.linalg.norm(E, axis=1, keepdims=True), 1e-9, None) 47 48 49def _featurize_fit(emb, scal): 50 """Fit the scalar standardizer on these rows; return (X, mean, std).""" 51 E = _l2(np.asarray(emb, dtype=float)) 52 S = np.log1p(np.asarray(scal, dtype=float).clip(min=0)) # size scalars: tame the heavy tail 53 mean, std = S.mean(0), np.clip(S.std(0), 1e-9, None) 54 return np.hstack([E, (S - mean) / std]), mean, std 55 56 57def _featurize_apply(emb, scal, mean, std): 58 E = _l2(np.asarray(emb, dtype=float)) 59 S = np.log1p(np.asarray(scal, dtype=float).clip(min=0)) 60 return np.hstack([E, (S - mean) / std]) 61 62 63# --- data access ------------------------------------------------------------ 64 65def _rows(con): 66 """(pr_id, embedding, additions, deletions, files_touched, discussion_len, 67 clean_merge, opened_at, repo) for every PR with BOTH an embedding and a non-NULL 68 clean_merge label, ordered by opened_at so [:k]/[k:] is a leakage-free time split.""" 69 return con.execute( 70 "SELECT v.pr_id, v.embedding, p.additions, p.deletions, p.files_touched, " 71 " p.discussion_len, l.clean_merge, p.opened_at, p.repo " 72 "FROM diff_vectors v " 73 "JOIN pr_labels l USING (pr_id) " 74 "JOIN pull_requests p ON p.pr_id = v.pr_id " 75 "WHERE l.clean_merge IS NOT NULL " 76 "ORDER BY p.opened_at" 77 ).fetchall() 78 79 80def _matrix(rows): 81 """rows -> (emb, scal, y, repos). y=1 is clean_merge (content safe).""" 82 emb = [r[1] for r in rows] 83 scal = [[r[2] or 0, r[3] or 0, r[4] or 0, r[5] or 0] for r in rows] 84 y = np.array([int(r[6]) for r in rows], dtype=int) 85 repos = [r[8] or "" for r in rows] 86 return emb, scal, y, repos 87 88 89def _require(rows) -> None: 90 classes = {int(r[6]) for r in rows} 91 if len(rows) < MIN_ROWS or len(classes) < 2: 92 raise SystemExit( 93 f"content head needs >={MIN_ROWS} embedded+labelled PRs spanning both classes; " 94 f"got {len(rows)} rows, classes={classes}. Run Phase 0 (trust.diffs), Phase 1 " 95 f"(backfill sh.tangled.repo.pull.status for a positive class), Phase 2 " 96 f"(trust.embed --build) first.") 97 98 99# --- the scorer ------------------------------------------------------------- 100 101class ContentScorer: 102 """Serves P(content safe) / content_risk for a PR, identity-blind.""" 103 104 def __init__(self, clf, iso, mean, std, emb_dim): 105 self.clf, self.iso, self.mean, self.std, self.emb_dim = clf, iso, mean, std, emb_dim 106 107 def _prob_safe(self, emb, scalars) -> float | None: 108 if emb is None or len(emb) != self.emb_dim: # dim mismatch (different EMBED_DIMENSIONS) -> unavailable 109 return None 110 X = _featurize_apply([emb], [scalars], self.mean, self.std) 111 raw = float(self.clf.predict_proba(X)[0, 1]) 112 return float(self.iso.predict([raw])[0]) if self.iso is not None else raw 113 114 def risk_for(self, emb, scalars) -> float | None: 115 """content_risk from an embedding + scalars directly. None if dim mismatch.""" 116 p = self._prob_safe(emb, scalars) 117 return None if p is None else 1.0 - p 118 119 def risk(self, con, pr_id: str | None = None, diff: str | None = None, 120 scalars: list | None = None) -> float | None: 121 """content_risk for a PR: prefer the stored embedding (cheap, every PR), else 122 embed the diff live (serving a brand-new PR). None if neither is available 123 (no embedding + no API key), so the gate treats content as simply absent.""" 124 emb = None 125 if pr_id is not None: 126 row = con.execute("SELECT embedding FROM diff_vectors WHERE pr_id=?", [pr_id]).fetchone() 127 emb = row[0] if row else None 128 if emb is None and diff: 129 from . import embed as embed_mod 130 v = embed_mod.embed(diff) # None without FEATHERLESS_API_KEY 131 emb = v[0] if v else None 132 if emb is None: 133 return None 134 if scalars is None and pr_id is not None: 135 srow = con.execute( 136 f"SELECT {', '.join(SCALAR_COLS)} FROM pull_requests WHERE pr_id=?", [pr_id]).fetchone() 137 scalars = [s or 0 for s in srow] if srow else [0, 0, 0, 0] 138 return self.risk_for(emb, scalars or [0, 0, 0, 0]) 139 140 def dump(self) -> dict: 141 return {"clf": self.clf, "iso": self.iso, "mean": self.mean, "std": self.std, 142 "emb_dim": self.emb_dim} 143 144 145# --- fit / train ------------------------------------------------------------ 146 147def _fit(emb, scal, y, split: float = 0.7): 148 """Time-ordered emb/scalars/labels -> (ContentScorer, val_stats). Pure, no DB/IO. 149 Scaler + model fit on the train fold only; isotonic calibrated on the val fold.""" 150 from sklearn.isotonic import IsotonicRegression 151 from sklearn.linear_model import LogisticRegression 152 153 k = max(2, int(len(emb) * split)) 154 Xtr, mean, std = _featurize_fit(emb[:k], scal[:k]) 155 Xval = _featurize_apply(emb[k:], scal[k:], mean, std) 156 ytr, yval = y[:k], y[k:] 157 if len(set(ytr.tolist())) < 2: 158 raise SystemExit("time-split train fold has a single class; need more history spanning both.") 159 clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr) 160 raw_val = clf.predict_proba(Xval)[:, 1] 161 iso = (IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval) 162 if len(set(yval.tolist())) > 1 else None) # isotonic needs both classes in the holdout 163 cal_val = iso.predict(raw_val) if iso is not None else raw_val 164 scorer = ContentScorer(clf, iso, mean, std, len(emb[0])) 165 return scorer, {"cal_val": np.asarray(cal_val), "yval": yval, "n_train": k, "n_val": len(yval)} 166 167 168def _save(scorer: ContentScorer) -> None: 169 MODEL_DIR.mkdir(parents=True, exist_ok=True) 170 MODEL_PATH.write_bytes(pickle.dumps(scorer.dump())) 171 global _loaded, _cache 172 _loaded, _cache = False, None # force reload of the fresh model 173 174 175def train(split: float = 0.7) -> dict: 176 """Fit + calibrate + save content.pkl (Phase 3). Returns the reliability curve.""" 177 with connection(read_only=True) as con: 178 rows = _rows(con) 179 _require(rows) 180 emb, scal, y, _ = _matrix(rows) 181 scorer, st = _fit(emb, scal, y, split) 182 _save(scorer) 183 return {"rows": len(rows), "train": st["n_train"], "val": st["n_val"], "emb_dim": scorer.emb_dim, 184 "calibrated": scorer.iso is not None, 185 "reliability": _reliability(st["cal_val"], st["yval"]), "model": str(MODEL_PATH)} 186 187 188# --- eval + beat-the-baseline gate (Phase 5) -------------------------------- 189 190def _auc(p_safe, y) -> float | None: 191 """ROC-AUC of P(safe) vs the clean label. None if the fold has a single class.""" 192 from sklearn.metrics import roc_auc_score 193 if len(set(np.asarray(y).tolist())) < 2: 194 return None 195 return float(roc_auc_score(y, p_safe)) 196 197 198def _ap(p_safe, y) -> float | None: 199 from sklearn.metrics import average_precision_score 200 if len(set(np.asarray(y).tolist())) < 2: 201 return None 202 return float(average_precision_score(y, p_safe)) 203 204 205def _slop_baseline(emb_val, emb_train, y_train): 206 """slop-kNN as a P(safe) proxy: 1 - nearest-cosine to a TRAIN known-bad diff 207 (clean_merge=0). Leakage-free (train corpus only), no API (stored vectors).""" 208 bad = [e for e, yy in zip(emb_train, y_train) if int(yy) == 0] 209 if not bad: 210 return None 211 B = _l2(np.asarray(bad, dtype=float)) 212 V = _l2(np.asarray(emb_val, dtype=float)) 213 sim = (V @ B.T).max(axis=1) 214 return 1.0 - np.clip(sim, 0.0, 1.0) 215 216 217def _eval_fold(train_rows, val_rows) -> dict | None: 218 """Fit on train_rows, score val_rows. Returns model + slop-kNN AUC/AP on the val 219 fold, or None if either fold lacks both classes (AUC undefined -> can't claim a win).""" 220 emb_tr, scal_tr, ytr, _ = _matrix(train_rows) 221 emb_va, scal_va, yva, _ = _matrix(val_rows) 222 if len(set(ytr.tolist())) < 2 or len(set(yva.tolist())) < 2: 223 return None 224 from sklearn.linear_model import LogisticRegression 225 Xtr, mean, std = _featurize_fit(emb_tr, scal_tr) 226 clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr) 227 p_safe = clf.predict_proba(_featurize_apply(emb_va, scal_va, mean, std))[:, 1] 228 slop = _slop_baseline(emb_va, emb_tr, ytr) 229 return {"auc": _auc(p_safe, yva), "ap": _ap(p_safe, yva), 230 "slop_auc": (_auc(slop, yva) if slop is not None else None), 231 "n_val": len(yva), "pos_val": int(yva.sum())} 232 233 234def _repo_split(rows, frac: float = 0.3): 235 """Hold out whole repos (generalize to UNSEEN repos). Deterministic: the first 236 ceil(frac*R) repos by name, so the holdout never overlaps the training repos.""" 237 repos = sorted({r[8] or "" for r in rows}) 238 held = set(repos[: max(1, int(len(repos) * frac + 0.999))]) 239 train = [r for r in rows if (r[8] or "") not in held] 240 val = [r for r in rows if (r[8] or "") in held] 241 return train, val, held 242 243 244def _wins(ev) -> bool: 245 """A fold is a win if the head beats majority (AUC>0.5) and slop-kNN (where the 246 slop baseline is computable). A missing/single-class fold is NOT a win (conservative).""" 247 if not ev or ev["auc"] is None: 248 return False 249 beats_majority = ev["auc"] > 0.5 250 beats_slop = ev["slop_auc"] is None or ev["auc"] > ev["slop_auc"] 251 return bool(beats_majority and beats_slop) 252 253 254def _jsonable(o): 255 if isinstance(o, (np.floating, np.integer)): 256 return float(o) 257 return str(o) 258 259 260def train_and_compare(split: float = 0.7) -> dict: 261 """Phase 5: fit + save the head, then write a verdict. content_wins is True only 262 if it beats majority + slop-kNN on BOTH a time split and a repo holdout. fusion 263 serves the head only when content_wins (load_if_winner).""" 264 with connection(read_only=True) as con: 265 rows = _rows(con) 266 _require(rows) 267 emb, scal, y, _ = _matrix(rows) 268 scorer, st = _fit(emb, scal, y, split) 269 _save(scorer) 270 271 k = max(2, int(len(rows) * split)) 272 time_eval = _eval_fold(rows[:k], rows[k:]) 273 repo_tr, repo_va, held = _repo_split(rows) 274 repo_eval = _eval_fold(repo_tr, repo_va) 275 content_wins = bool(_wins(time_eval) and _wins(repo_eval)) 276 277 verdict = {"rows": len(rows), "time_split": time_eval, "repo_holdout": repo_eval, 278 "held_repos": len(held), "content_wins": content_wins, 279 "reliability": _reliability(st["cal_val"], st["yval"])} 280 MODEL_DIR.mkdir(parents=True, exist_ok=True) 281 VERDICT.write_text(json.dumps(verdict, indent=2, default=_jsonable)) 282 return verdict 283 284 285# --- serving (winner-gated, like gnn.load_if_winner) ------------------------ 286 287_cache: ContentScorer | None = None 288_loaded = False 289 290 291def load() -> ContentScorer | None: 292 global _cache, _loaded 293 if not _loaded: 294 _loaded = True 295 if MODEL_PATH.exists(): 296 d = pickle.loads(MODEL_PATH.read_bytes()) 297 _cache = ContentScorer(d["clf"], d["iso"], d["mean"], d["std"], d["emb_dim"]) 298 return _cache 299 300 301def load_if_winner() -> ContentScorer | None: 302 """Serving hook used by fusion: the head ONLY if it beat its baselines (else None, 303 and the gate keeps Claude-only content -- never serve a model that didn't beat baseline).""" 304 if not (VERDICT.exists() and MODEL_PATH.exists()): 305 return None 306 if not json.loads(VERDICT.read_text()).get("content_wins"): 307 return None 308 return load() 309 310 311def main() -> None: 312 v = train_and_compare() 313 ts, rh = v["time_split"] or {}, v["repo_holdout"] or {} 314 print(f"[content] {v['rows']} labelled+embedded PRs") 315 print(f"[content] time-split: AUC={ts.get('auc')} slop_auc={ts.get('slop_auc')} " 316 f"n_val={ts.get('n_val')} pos={ts.get('pos_val')}") 317 print(f"[content] repo-holdout ({v['held_repos']} repos): AUC={rh.get('auc')} " 318 f"slop_auc={rh.get('slop_auc')} n_val={rh.get('n_val')} pos={rh.get('pos_val')}") 319 print(f"[content] content_wins={v['content_wins']} -> " 320 + ("SERVED (beats majority + slop-kNN on time AND repo holdout)" if v["content_wins"] 321 else "NOT served; gate keeps Claude-only content (beat-the-baseline guardrail)")) 322 print("[content] reliability (predicted vs actual P(safe)):") 323 for b in v["reliability"]: 324 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}") 325 326 327def demo() -> None: 328 """Self-check (no DB/API): synthetic embeddings separable by label -> a held-out 329 bad diff out-risks a clean one; print the reliability curve.""" 330 rng = np.random.RandomState(0) # Math.random-free determinism; seeded numpy is fine 331 D = 16 332 emb, scal, y = [], [], [] 333 for i in range(60): # time-ordered, classes alternate so both land in each fold 334 clean = i % 2 335 base = np.zeros(D); base[0] = 1.0 if clean else -1.0 336 emb.append((base + rng.normal(0, 0.3, D)).tolist()) 337 scal.append([rng.randint(1, 200), rng.randint(0, 100), rng.randint(1, 10), rng.randint(0, 500)]) 338 y.append(clean) 339 scorer, st = _fit(emb, scal, np.array(y), split=0.7) 340 r_clean = scorer.risk_for([1.0] + [0.0] * (D - 1), [50, 10, 3, 100]) 341 r_bad = scorer.risk_for([-1.0] + [0.0] * (D - 1), [50, 10, 3, 100]) 342 print(f"content_risk: bad={r_bad:.3f} clean={r_clean:.3f}") 343 for b in _reliability(st["cal_val"], st["yval"]): 344 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}") 345 assert scorer.risk_for([1.0] * D, [0, 0, 0, 0]) is not None 346 assert scorer.risk_for([1.0] * (D + 1), [0, 0, 0, 0]) is None, "dim mismatch must be 'unavailable', not a crash" 347 assert r_bad > r_clean, "content head must score a known-bad diff riskier than a clean one" 348 print("ok") 349 350 351if __name__ == "__main__": 352 demo()