Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 6.7 kB View raw
1"""M5 learned signal: LightGBM on per-DID features, isotonic-calibrated (PRD 6.5/6.8). 2 3Predicts clean_merge from the feature vector (eigentrust_score included as a 4feature, so the model builds on the graph signal). Trained offline on a 5time-based split, calibrated with isotonic regression so the output is a real 6P(clean). Explanations use LightGBM's native TreeSHAP (`pred_contrib`) — no 7separate shap/numba dependency. 8 9Stretch milestone: needs `uv pip install -e '.[learned]'`. The system runs 10without it (fusion falls back to raw EigenTrust). 11""" 12 13from __future__ import annotations 14 15import pickle 16 17import numpy as np 18 19from .config import MODEL_DIR 20from .db import connection 21from . import eigentrust 22 23# PRD 6.5 feature list, restricted to what the features view currently produces. 24# bsky_graph_degree / bsky_account_age join in once the app.bsky social graph is ingested. 25FEATURE_COLS = [ 26 "eigentrust_score", "did_age_days", "merged_pr_count", "revert_rate", "ci_pass_rate", 27 "close_without_merge_ratio", "mean_diff_size", "mean_files_touched", "churn", 28 "mean_discussion_len", "denounce_count", 29 "stars_received", "stars_trust", # popularity (raw, gameable) + trust-weighted (sybil-resistant) 30] 31MODEL_PATH = MODEL_DIR / "learned.pkl" 32 33# Features sourced from the EigenResult (Python), not the SQL features view. 34_FROM_ER = { 35 "eigentrust_score": lambda did, er: er.trust.get(did, 0.0), 36 "stars_trust": lambda did, er: er.stars_trust.get(did, 0.0), 37} 38 39 40def _vec(did: str, feats: dict, er: eigentrust.EigenResult) -> list[float]: 41 out = [] 42 for c in FEATURE_COLS: 43 src = _FROM_ER.get(c) 44 out.append(src(did, er) if src else float(feats.get(c) or 0.0)) 45 return out 46 47 48class LearnedScorer: 49 def __init__(self, booster, iso, cols): 50 self.booster, self.iso, self.cols = booster, iso, cols 51 52 def prob(self, did, feats, er) -> float: 53 raw = float(self.booster.predict(np.array([_vec(did, feats, er)]))[0]) 54 return float(self.iso.predict([raw])[0]) if self.iso is not None else raw 55 56 def contributions(self, did, feats, er, top: int = 3) -> list[dict]: 57 c = self.booster.predict(np.array([_vec(did, feats, er)]), pred_contrib=True)[0][:-1] 58 idx = np.argsort(np.abs(c))[::-1][:top] 59 return [{"feature": self.cols[i], "contribution": round(float(c[i]), 3)} for i in idx] 60 61 62_cache: LearnedScorer | None = None 63_loaded = False 64 65 66def load() -> LearnedScorer | None: 67 global _cache, _loaded 68 if not _loaded: 69 _loaded = True 70 if MODEL_PATH.exists(): 71 import lightgbm as lgb 72 73 d = pickle.loads(MODEL_PATH.read_bytes()) 74 _cache = LearnedScorer(lgb.Booster(model_str=d["booster"]), d["iso"], d["cols"]) 75 return _cache 76 77 78def _matrix(con, er): 79 rows = con.execute( 80 "SELECT author_did, opened_at, clean_merge FROM pr_labels WHERE clean_merge IS NOT NULL " 81 "ORDER BY opened_at" 82 ).fetchall() 83 fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()] 84 feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()} 85 X = np.array([_vec(did, feats.get(did, {}), er) for did, _, _ in rows], dtype=float) 86 y = np.array([int(lbl) for _, _, lbl in rows], dtype=int) 87 return X, y 88 89 90def _reliability(p, y, bins=5): 91 """Reliability curve (PRD 6.8): predicted vs empirical P(clean) per bin.""" 92 edges = np.linspace(0, 1, bins + 1) 93 out = [] 94 for lo, hi in zip(edges, edges[1:]): 95 m = (p >= lo) & (p <= hi if hi == 1 else p < hi) 96 if m.any(): 97 out.append({"bin": f"{lo:.1f}-{hi:.1f}", "predicted": round(float(p[m].mean()), 3), 98 "actual": round(float(y[m].mean()), 3), "n": int(m.sum())}) 99 return out 100 101 102def train(split: float = 0.7) -> dict: 103 import lightgbm as lgb 104 from sklearn.isotonic import IsotonicRegression 105 106 with connection(read_only=True) as con: 107 er = eigentrust.compute(con) 108 X, y = _matrix(con, er) 109 if len(X) < 4 or len(set(y.tolist())) < 2: 110 raise SystemExit(f"need >=4 labelled PRs spanning both classes; got {len(X)} rows, " 111 f"classes={set(y.tolist())}. Seed/ingest more history first.") 112 113 k = max(2, int(len(X) * split)) 114 Xtr, ytr, Xval, yval = X[:k], y[:k], X[k:], y[k:] 115 params = dict(objective="binary", num_leaves=15, min_data_in_leaf=1, min_data_in_bin=1, 116 learning_rate=0.1, verbose=-1, feature_pre_filter=False) 117 booster = lgb.train(params, lgb.Dataset(Xtr, label=ytr, feature_name=FEATURE_COLS), 118 num_boost_round=60) 119 120 raw_val = booster.predict(Xval) 121 iso = None 122 if len(set(yval.tolist())) > 1: # isotonic needs both classes in the holdout 123 iso = IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval) 124 cal_val = iso.predict(raw_val) if iso is not None else raw_val 125 126 MODEL_DIR.mkdir(parents=True, exist_ok=True) 127 MODEL_PATH.write_bytes(pickle.dumps( 128 {"booster": booster.model_to_string(), "iso": iso, "cols": FEATURE_COLS})) 129 global _loaded, _cache 130 _loaded, _cache = False, None # force reload of the fresh model 131 132 rel = _reliability(np.asarray(cal_val), yval) 133 return {"rows": len(X), "train": k, "val": len(Xval), 134 "calibrated": iso is not None, "reliability": rel, "model": str(MODEL_PATH)} 135 136 137def main() -> None: 138 r = train() 139 print(f"[train] {r['rows']} labelled PRs (train={r['train']} / val={r['val']}), " 140 f"calibrated={r['calibrated']} -> {r['model']}") 141 print("[train] reliability curve (predicted vs actual P(clean)):") 142 for b in r["reliability"]: 143 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}") 144 145 146def demo() -> None: 147 """Self-check: after training, a trusted DID scores above a sybil (graph + history).""" 148 from .db import connection as conn, init_db 149 from .seed import seed as load_seed 150 151 with conn(read_only=False) as con: 152 init_db(con) # schema + features + pr_labels views 153 load_seed(con) 154 train() 155 s = load() 156 with conn(read_only=True) as con: 157 er = eigentrust.compute(con) 158 fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()] 159 feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()} 160 trusted = s.prob("did:plc:carol", feats.get("did:plc:carol", {}), er) 161 sybil = s.prob("did:plc:sybil2", feats.get("did:plc:sybil2", {}), er) 162 print(f"calibrated P(clean): carol={trusted:.3f} sybil2={sybil:.3f}") 163 assert trusted > sybil, "learned score must rank the trusted DID above the sybil" 164 print("ok") 165 166 167if __name__ == "__main__": 168 demo()