Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 13 kB View raw
1"""M4 fusion gate + scoring worker (PRD 6.7, 6.9). 2 3The gate is NOT an average: a low structural score can never be lifted into the 4fast lane by clean-looking content (constraint 2). Content can only penalize. 5""" 6 7from __future__ import annotations 8 9import json 10 11from .config import CFG 12from . import eigentrust, review as review_mod, vouchsafe 13 14 15def decide(structural_trust: float, content: dict | None, cfg=CFG.gate, *, 16 attestation_required: bool = False, attested: bool = True): 17 """PRD 6.7 gate. structural_trust is calibrated P(clean) in [0,1]. 18 19 6.13: a sensitive/dual-use repo requires a valid jurisdiction attestation; a 20 missing one forces needs_human regardless of structural trust or content risk. 21 This is the only control that overrides the score, so it is checked first. 22 """ 23 if attestation_required and not attested: 24 return "needs_human" 25 risk = 0.0 if content is None else content["content_risk"] 26 review = False if content is None else content["review_recommended"] 27 high_flag = bool(content) and any(f["severity"] == "high" for f in content["flags"]) 28 29 if structural_trust < cfg.T_LOW or risk >= cfg.R_HIGH or high_flag: 30 return "needs_human" 31 if structural_trust >= cfg.T_HIGH and risk <= cfg.R_LOW and not review: 32 return "fast_lane" 33 return "normal_queue" 34 35 36def repo_tier(con, repo: str | None) -> str: 37 """'sensitive' if the repo is in the sensitive/dual-use tier (6.13), else 'public'.""" 38 if not repo: 39 return "public" 40 row = con.execute("SELECT tier FROM repo_tiers WHERE repo=?", [repo]).fetchone() 41 return row[0] if row else "public" 42 43 44def is_attested(con, did: str) -> bool: 45 """True if the DID has a contributor-issued jurisdiction attestation (declared, not inferred).""" 46 return con.execute("SELECT 1 FROM attestations WHERE did=? LIMIT 1", [did]).fetchone() is not None 47 48 49def displayed_prob(structural_trust: float, content: dict | None) -> float: 50 """Start from structural P(clean), penalize for content risk. Never lifts (6.7).""" 51 if content is None: 52 return structural_trust 53 return structural_trust * (1.0 - content["content_risk"]) 54 55 56def _fold_content(content: dict | None, model_risk: float | None) -> dict | None: 57 """Phase 4: fold the Tower B head risk into the content signal MONOTONICALLY. 58 59 Combine model + Claude as max(model_risk, claude_risk): can raise risk, never lower 60 it, so content still only penalizes (never lifts an untrusted DID). When Claude was 61 skipped (content is None), the head alone synthesizes the content signal so the gate 62 still sees content for this PR -- the head covers EVERY PR cheaply, unlike Claude.""" 63 if model_risk is None: 64 return content 65 if content is None: 66 return {"content_risk": model_risk, "review_recommended": False, "flags": [], 67 "summary": f"content-head risk {model_risk:.2f} (no Claude review)"} 68 return {**content, "content_risk": max(content["content_risk"], model_risk)} 69 70 71def _scorer(): 72 """Load the M5 LightGBM scorer if trained AND lightgbm is installed; else None.""" 73 try: 74 from . import learned 75 except ImportError: 76 return None 77 return learned.load() 78 79 80def _gnn_winner(): 81 """M6 GraphSAGE scorer ONLY if it beat M5 on the holdout (PRD guardrail); else None.""" 82 try: 83 from . import gnn 84 except ImportError: 85 return None 86 return gnn.load_if_winner() 87 88 89def _slop(con, diff, pr_id): 90 """Best-effort diff/slop similarity (6.12). Optional path -- never fail a score on it 91 (no key, empty corpus, or a network blip all collapse to None = signal unavailable).""" 92 if not diff: 93 return None 94 try: 95 from . import embed 96 return embed.slop_score(con, diff, exclude_pr_id=pr_id) 97 except Exception: 98 return None 99 100 101def _content_head(con, pr_id, diff): 102 """Tower B learned content risk for this PR (PRD Tier 1), identity-blind. Winner-gated: 103 None unless the head beat its baselines (content.load_if_winner) and an embedding (or a 104 live-embeddable diff) exists. Best-effort -- never fail a score on it. Unlike Claude, this 105 covers EVERY PR cheaply, so the gate gets a content signal even when review is skipped.""" 106 try: 107 from . import content 108 scorer = content.load_if_winner() 109 if scorer is None: 110 return None 111 return scorer.risk(con, pr_id=pr_id, diff=diff) 112 except Exception: 113 return None 114 115 116def structural_for(did, er: eigentrust.EigenResult, feats: dict | None): 117 """Calibrated P(clean) for the gate. Precedence: winning GNN (M6) -> LightGBM (M5) 118 -> raw EigenTrust (M3). The GNN is used only if it provably beat the baseline.""" 119 g = _gnn_winner() 120 if g is not None: 121 return g.prob(did), None # GNN explanations are weak; reason falls back to path/SHAP 122 scorer = _scorer() 123 if scorer is not None: 124 return scorer.prob(did, feats or {}, er), scorer.contributions(did, feats or {}, er) 125 return er.trust.get(did, 0.0), None 126 127 128def build_reason(did, structural_trust, content, er: eigentrust.EigenResult, feats: dict | None, 129 model_factors: list | None = None, gate_note: str | None = None): 130 """Structured explanation (6.9): EigenTrust path + top factors + Claude's rationale.""" 131 path = er.path_from_seed(did) 132 top_factors = [] 133 if gate_note: # 6.13 compliance override, surfaced first so the human sees why 134 top_factors.append(gate_note) 135 if path: 136 top_factors.append(f"trust reaches {did} via {' -> '.join(path)}") 137 if feats: 138 if feats.get("merged_pr_count"): 139 top_factors.append(f"{int(feats['merged_pr_count'])} merged PRs") 140 if feats.get("revert_rate") is not None: 141 top_factors.append(f"revert rate {feats['revert_rate']:.0%}") 142 if feats.get("denounce_count"): 143 top_factors.append(f"{int(feats['denounce_count'])} denounce(s)") 144 if feats.get("stars_received"): # advisory popularity; trust-weighted version lives in the model 145 top_factors.append(f"{int(feats['stars_received'])} star(s) received") 146 if model_factors: # M5 LightGBM TreeSHAP contributions (6.9) 147 for mf in model_factors: 148 top_factors.append(f"{mf['feature']} ({mf['contribution'] + 0.0:+.3f})") 149 reason = { 150 "structural_trust": round(structural_trust, 4), 151 "trust_path": path, 152 "top_factors": top_factors, 153 "model_factors": model_factors or [], 154 "compliance_block": gate_note, 155 } 156 if content is not None: 157 reason["content_summary"] = content["summary"] 158 reason["flags"] = content["flags"] 159 reason["content_risk"] = content["content_risk"] 160 return reason 161 162 163def should_review(structural_trust: float, security_sensitive: bool, cfg=CFG.gate) -> bool: 164 """Cost gate (6.6): skip Sonnet for clearly-trusted unless security-sensitive.""" 165 if structural_trust >= cfg.T_HIGH: 166 return security_sensitive 167 return True # ambiguous band and below: review earns its keep / attaches a reason 168 169 170def score_pr(con, pr_id: str, run_review: bool = True) -> dict: 171 """Full hybrid score for one PR: EigenTrust + (gated) Claude -> decision + write.""" 172 row = con.execute( 173 "SELECT author_did, diff_text, repo, target FROM pull_requests WHERE pr_id=?", [pr_id] 174 ).fetchone() 175 if not row: 176 raise ValueError(f"unknown pr {pr_id}") 177 did, diff, repo, target = row 178 179 er = eigentrust.compute(con) 180 feats = _features_for(con, did) 181 structural, model_factors = structural_for(did, er, feats) 182 183 tier = repo_tier(con, repo) # 6.13 repo tiering 184 attested = is_attested(con, did) 185 sensitive = tier == "sensitive" 186 slop = _slop(con, diff, pr_id) # 6.12 diff/slop similarity to known-bad (advisory) 187 model_risk = _content_head(con, pr_id, diff) # Tower B head: content risk for ALL PRs (winner-gated) 188 scan = vouchsafe.scan_diff(diff) # 6.12 static secret/SAST findings (advisory, redacted) 189 machine = {"slop_similarity_to_known_bad": round(slop, 3)} if slop is not None else None 190 if model_risk is not None: 191 machine = (machine or {}) | {"content_head_risk": round(model_risk, 3)} 192 if scan: 193 machine = (machine or {}) | {"static_scan_findings": scan} 194 content = None 195 if run_review and should_review(structural, sensitive): 196 content = review_mod.review_pr(diff or "", title=repo or "", discussion="", 197 machine_findings=machine) 198 content = _fold_content(content, model_risk) # Phase 4: monotone fold, content only penalizes 199 200 decision = decide(structural, content, attestation_required=sensitive, attested=attested) 201 prob = displayed_prob(structural, content) 202 gate_note = ("sensitive-tier repo: a valid jurisdiction attestation is required before " 203 "fast-lane/merge (6.13)") if sensitive and not attested else None 204 reason = build_reason(did, structural, content, er, feats, model_factors, gate_note) 205 if model_risk is not None: # Tower B factor, surfaced like the others (Phase 4) 206 reason["content_head_risk"] = round(model_risk, 3) 207 reason["top_factors"].append(f"content-head risk {model_risk:.0%}") 208 if slop is not None: 209 reason["slop_similarity"] = round(slop, 3) 210 if slop >= 0.9: # advisory: surfaces for the human, never flips the gate (6.12) 211 reason["top_factors"].append(f"diff {slop:.0%} similar to a known-bad pattern") 212 if scan: # advisory: surfaces for the human even when review is skipped (6.12) 213 reason["static_scan_findings"] = scan 214 worst = min(scan, key=lambda f: ["critical", "high", "medium"].index(f["severity"])) 215 reason["top_factors"].append( 216 f"static scan: {worst['severity']} {worst['type']} in added lines (line {worst['line']})") 217 218 con.execute( 219 "INSERT INTO scores (did, structural_trust, content_risk, calibrated_prob, decision, explanation_json) " 220 "VALUES (?,?,?,?,?,?)", 221 [did, structural, (content or {}).get("content_risk"), prob, decision, json.dumps(reason)], 222 ) 223 return {"did": did, "structural_trust": structural, "calibrated_prob": prob, 224 "decision": decision, "explanation": reason} 225 226 227def _features_for(con, did: str) -> dict | None: 228 cols = [c[0] for c in con.execute("DESCRIBE features").fetchall()] 229 row = con.execute("SELECT * FROM features WHERE did=?", [did]).fetchone() 230 return dict(zip(cols, row)) if row else None 231 232 233def _process_pending(con) -> int: 234 pending = con.execute( 235 "SELECT pr_id FROM pull_requests WHERE author_did NOT IN (SELECT did FROM scores)" 236 ).fetchall() 237 for (pr_id,) in pending: 238 r = score_pr(con, pr_id) 239 print(f"{r['decision']:<13} {r['calibrated_prob']:.3f} {pr_id}", flush=True) 240 return len(pending) 241 242 243def main() -> None: 244 """Scoring worker: score PRs that have no score yet, write decisions (6.10). 245 246 Default is one-shot; --loop polls forever (a long-lived pane under mprocs), 247 opening a short-lived read-write connection per cycle so the API can read 248 between cycles. 249 """ 250 import argparse 251 import time 252 253 from .db import connection, ensure_schema 254 255 ap = argparse.ArgumentParser(description="trust scoring worker") 256 ap.add_argument("--loop", action="store_true", help="poll forever instead of one pass") 257 ap.add_argument("--interval", type=float, default=5.0, help="seconds between polls") 258 args = ap.parse_args() 259 260 ensure_schema() 261 while True: 262 with connection(read_only=False) as con: 263 n = _process_pending(con) 264 if not args.loop: 265 print(f"[score] processed {n} PRs") 266 return 267 time.sleep(args.interval) 268 269 270def demo() -> None: 271 """Self-check: gate never fast-lanes a low-trust DID, even on clean content (constraint 2).""" 272 clean = {"content_risk": 0.0, "review_recommended": False, "flags": [], "summary": "ok"} 273 risky = {"content_risk": 0.9, "review_recommended": True, 274 "flags": [{"severity": "high", "type": "subtle_bug", "location": "x", "explanation": "y"}], 275 "summary": "bad"} 276 assert decide(0.1, clean) == "needs_human", "low trust + clean content must NOT fast-lane" 277 assert decide(0.95, clean) == "fast_lane" 278 assert decide(0.95, risky) == "needs_human", "high-severity flag forces human" 279 assert decide(0.5, None) == "normal_queue" 280 assert displayed_prob(0.9, risky) < 0.9, "content risk must penalize, never lift" 281 # Phase 4 fold: head risk can only raise the content signal, never lower it. 282 assert _fold_content(None, 0.3)["content_risk"] == 0.3, "Claude skipped -> head synthesizes content" 283 assert _fold_content(clean, 0.4)["content_risk"] == 0.4, "head raises a clean Claude verdict" 284 assert _fold_content(risky, 0.1)["content_risk"] == 0.9, "head never lowers a risky Claude verdict" 285 assert _fold_content(clean, None) is clean, "no head -> content untouched" 286 # 6.13: a sensitive-tier repo with no attestation forces human even for a perfect score. 287 assert decide(0.99, clean, attestation_required=True, attested=False) == "needs_human" 288 assert decide(0.99, clean, attestation_required=True, attested=True) == "fast_lane" 289 print("ok") 290 291 292if __name__ == "__main__": 293 demo()