Sunstead trust scoring project
1"""M5 learned signal: LightGBM on per-DID features, isotonic-calibrated (PRD 6.5/6.8).
2
3Predicts clean_merge from the feature vector (eigentrust_score included as a
4feature, so the model builds on the graph signal). Trained offline on a
5time-based split, calibrated with isotonic regression so the output is a real
6P(clean). Explanations use LightGBM's native TreeSHAP (`pred_contrib`) — no
7separate shap/numba dependency.
8
9Stretch milestone: needs `uv pip install -e '.[learned]'`. The system runs
10without it (fusion falls back to raw EigenTrust).
11"""
12
13from __future__ import annotations
14
15import pickle
16
17import numpy as np
18
19from .config import MODEL_DIR
20from .db import connection
21from . import eigentrust
22
23# PRD 6.5 feature list, restricted to what the features view currently produces.
24# bsky_graph_degree / bsky_account_age join in once the app.bsky social graph is ingested.
25FEATURE_COLS = [
26 "eigentrust_score", "did_age_days", "merged_pr_count", "revert_rate", "ci_pass_rate",
27 "close_without_merge_ratio", "mean_diff_size", "mean_files_touched", "churn",
28 "mean_discussion_len", "denounce_count",
29 "stars_received", "stars_trust", # popularity (raw, gameable) + trust-weighted (sybil-resistant)
30]
31MODEL_PATH = MODEL_DIR / "learned.pkl"
32
33# Features sourced from the EigenResult (Python), not the SQL features view.
34_FROM_ER = {
35 "eigentrust_score": lambda did, er: er.trust.get(did, 0.0),
36 "stars_trust": lambda did, er: er.stars_trust.get(did, 0.0),
37}
38
39
40def _vec(did: str, feats: dict, er: eigentrust.EigenResult) -> list[float]:
41 out = []
42 for c in FEATURE_COLS:
43 src = _FROM_ER.get(c)
44 out.append(src(did, er) if src else float(feats.get(c) or 0.0))
45 return out
46
47
48class LearnedScorer:
49 def __init__(self, booster, iso, cols):
50 self.booster, self.iso, self.cols = booster, iso, cols
51
52 def prob(self, did, feats, er) -> float:
53 raw = float(self.booster.predict(np.array([_vec(did, feats, er)]))[0])
54 return float(self.iso.predict([raw])[0]) if self.iso is not None else raw
55
56 def contributions(self, did, feats, er, top: int = 3) -> list[dict]:
57 c = self.booster.predict(np.array([_vec(did, feats, er)]), pred_contrib=True)[0][:-1]
58 idx = np.argsort(np.abs(c))[::-1][:top]
59 return [{"feature": self.cols[i], "contribution": round(float(c[i]), 3)} for i in idx]
60
61
62_cache: LearnedScorer | None = None
63_loaded = False
64
65
66def load() -> LearnedScorer | None:
67 global _cache, _loaded
68 if not _loaded:
69 _loaded = True
70 if MODEL_PATH.exists():
71 import lightgbm as lgb
72
73 d = pickle.loads(MODEL_PATH.read_bytes())
74 _cache = LearnedScorer(lgb.Booster(model_str=d["booster"]), d["iso"], d["cols"])
75 return _cache
76
77
78def _matrix(con, er):
79 rows = con.execute(
80 "SELECT author_did, opened_at, clean_merge FROM pr_labels WHERE clean_merge IS NOT NULL "
81 "ORDER BY opened_at"
82 ).fetchall()
83 fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
84 feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
85 X = np.array([_vec(did, feats.get(did, {}), er) for did, _, _ in rows], dtype=float)
86 y = np.array([int(lbl) for _, _, lbl in rows], dtype=int)
87 return X, y
88
89
90def _reliability(p, y, bins=5):
91 """Reliability curve (PRD 6.8): predicted vs empirical P(clean) per bin."""
92 edges = np.linspace(0, 1, bins + 1)
93 out = []
94 for lo, hi in zip(edges, edges[1:]):
95 m = (p >= lo) & (p <= hi if hi == 1 else p < hi)
96 if m.any():
97 out.append({"bin": f"{lo:.1f}-{hi:.1f}", "predicted": round(float(p[m].mean()), 3),
98 "actual": round(float(y[m].mean()), 3), "n": int(m.sum())})
99 return out
100
101
102def train(split: float = 0.7) -> dict:
103 import lightgbm as lgb
104 from sklearn.isotonic import IsotonicRegression
105
106 with connection(read_only=True) as con:
107 er = eigentrust.compute(con)
108 X, y = _matrix(con, er)
109 if len(X) < 4 or len(set(y.tolist())) < 2:
110 raise SystemExit(f"need >=4 labelled PRs spanning both classes; got {len(X)} rows, "
111 f"classes={set(y.tolist())}. Seed/ingest more history first.")
112
113 k = max(2, int(len(X) * split))
114 Xtr, ytr, Xval, yval = X[:k], y[:k], X[k:], y[k:]
115 params = dict(objective="binary", num_leaves=15, min_data_in_leaf=1, min_data_in_bin=1,
116 learning_rate=0.1, verbose=-1, feature_pre_filter=False)
117 booster = lgb.train(params, lgb.Dataset(Xtr, label=ytr, feature_name=FEATURE_COLS),
118 num_boost_round=60)
119
120 raw_val = booster.predict(Xval)
121 iso = None
122 if len(set(yval.tolist())) > 1: # isotonic needs both classes in the holdout
123 iso = IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval)
124 cal_val = iso.predict(raw_val) if iso is not None else raw_val
125
126 MODEL_DIR.mkdir(parents=True, exist_ok=True)
127 MODEL_PATH.write_bytes(pickle.dumps(
128 {"booster": booster.model_to_string(), "iso": iso, "cols": FEATURE_COLS}))
129 global _loaded, _cache
130 _loaded, _cache = False, None # force reload of the fresh model
131
132 rel = _reliability(np.asarray(cal_val), yval)
133 return {"rows": len(X), "train": k, "val": len(Xval),
134 "calibrated": iso is not None, "reliability": rel, "model": str(MODEL_PATH)}
135
136
137def main() -> None:
138 r = train()
139 print(f"[train] {r['rows']} labelled PRs (train={r['train']} / val={r['val']}), "
140 f"calibrated={r['calibrated']} -> {r['model']}")
141 print("[train] reliability curve (predicted vs actual P(clean)):")
142 for b in r["reliability"]:
143 print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
144
145
146def demo() -> None:
147 """Self-check: after training, a trusted DID scores above a sybil (graph + history)."""
148 from .db import connection as conn, init_db
149 from .seed import seed as load_seed
150
151 with conn(read_only=False) as con:
152 init_db(con) # schema + features + pr_labels views
153 load_seed(con)
154 train()
155 s = load()
156 with conn(read_only=True) as con:
157 er = eigentrust.compute(con)
158 fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
159 feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
160 trusted = s.prob("did:plc:carol", feats.get("did:plc:carol", {}), er)
161 sybil = s.prob("did:plc:sybil2", feats.get("did:plc:sybil2", {}), er)
162 print(f"calibrated P(clean): carol={trusted:.3f} sybil2={sybil:.3f}")
163 assert trusted > sybil, "learned score must rank the trusted DID above the sybil"
164 print("ok")
165
166
167if __name__ == "__main__":
168 demo()