Sunstead trust scoring project
1"""M3/M4 API + built-in surfaces (PRD 6.10, section 7).
2
3The scoring service is the brain; the UIs are thin clients that read these JSON
4endpoints, never the DuckDB file. Serves the three section-7 surfaces as static
5pages (PRD blesses a built-in dashboard; same laziness applies to the others).
6ponytail: built-in HTML over a separate SvelteKit/Bun stack; swap if the native
7overlay (M7) or richer UI is needed.
8"""
9
10from __future__ import annotations
11
12import json
13import urllib.request
14from contextlib import asynccontextmanager
15from functools import lru_cache
16from pathlib import Path
17
18from fastapi import Depends, FastAPI, HTTPException
19from fastapi.middleware.cors import CORSMiddleware
20from fastapi.responses import FileResponse, Response
21from fastapi.staticfiles import StaticFiles
22from pydantic import BaseModel
23
24from .config import CFG
25from .db import connection, ensure_schema
26from . import eigentrust, fusion, review as review_mod, voice
27
28
29@asynccontextmanager
30async def lifespan(app):
31 ensure_schema() # create tables/view once; readers below open read-only
32 yield
33
34
35app = FastAPI(title="Tangled trust scoring", lifespan=lifespan)
36# ponytail: permissive CORS on a local read-only service so the tangled.org overlay
37# (7.4) can read /score client-side. Lock to the extension origin for a hosted demo.
38app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["GET"], allow_headers=["*"])
39STATIC = Path(__file__).parent / "static"
40
41
42def get_con():
43 # Short-lived read-only connection per request so the score-loop writer can interleave.
44 with connection(read_only=True) as con:
45 yield con
46
47
48def _eigen(con):
49 # ponytail: recompute per request; few-thousand-edge graph -> sub-ms. Cache if it grows.
50 return eigentrust.compute(con)
51
52
53def _gate_probs(con, er):
54 """did -> the SAME calibrated P(clean) the gate uses (winning GNN -> M5 -> raw
55 EigenTrust), so the dashboard mirrors real decisions instead of raw structural trust.
56 M5 blends the non-vouch signals (age/merged-history/stars) that lift active-but-unvouched
57 contributors; reading raw er.trust here hides that. Loads the scorer + features ONCE
58 (per-did SQL x10k would crawl)."""
59 gnn = fusion._gnn_winner()
60 scorer = None if gnn else fusion._scorer()
61 if gnn is None and scorer is None:
62 return dict(er.trust)
63 cols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
64 di = cols.index("did")
65 feats = {r[di]: dict(zip(cols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
66 return {did: (gnn.prob(did) if gnn else scorer.prob(did, feats.get(did) or {}, er))
67 for did in er.trust}
68
69
70def _profiles(con) -> dict:
71 """did -> declared profile bits from the latest sh.tangled.actor.profile per did:
72 display name (preferredHandle), avatar blob CID, declared links, location/pronouns/
73 description. One query, in-memory join onto the list endpoints (per-row SQL would
74 crawl). Avatars resolve client-side via the bsky CDN thumbnail from (did + cid);
75 PRD 6.10 wants avatar+handle on each row."""
76 clean = lambda s: (s or "").strip() or None
77 out = {}
78 for did, rec in con.execute(
79 "SELECT did, record FROM events WHERE collection='sh.tangled.actor.profile' "
80 "QUALIFY row_number() OVER (PARTITION BY did ORDER BY time_us DESC) = 1"
81 ).fetchall():
82 try:
83 d = json.loads(rec)
84 except (TypeError, ValueError):
85 continue
86 av = d.get("avatar")
87 out[did] = {
88 "name": clean(d.get("preferredHandle")),
89 "avatar_cid": av.get("ref", {}).get("$link") if isinstance(av, dict) else None,
90 "links": [l for l in (d.get("links") or []) if isinstance(l, str) and l.strip()],
91 "location": clean(d.get("location")),
92 "pronouns": clean(d.get("pronouns")),
93 "description": clean(d.get("description")),
94 }
95 return out
96
97
98def _assess(con, did: str) -> dict:
99 """Shared assessment: latest written score, else a fresh structural compute."""
100 er = _eigen(con)
101 feats = fusion._features_for(con, did)
102 structural, model_factors = fusion.structural_for(did, er, feats) # M5 calibrated, or raw EigenTrust
103 latest = con.execute(
104 "SELECT content_risk, calibrated_prob, decision, explanation_json FROM scores "
105 "WHERE did=? ORDER BY as_of DESC LIMIT 1", [did]
106 ).fetchone()
107 content_risk = latest[0] if latest else None
108 decision = latest[2] if latest else fusion.decide(structural, None)
109 reason = json.loads(latest[3]) if latest else fusion.build_reason(
110 did, structural, None, er, feats, model_factors)
111 prob = latest[1] if latest else structural
112 handle = con.execute("SELECT handle FROM contributors WHERE did=?", [did]).fetchone()
113 return {"did": did, "handle": handle[0] if handle else None,
114 "structural_trust": structural, "content_risk": content_risk,
115 "calibrated_prob": prob, "decision": decision, "explanation": reason,
116 "top_factors": reason.get("top_factors", [])}
117
118
119@app.get("/score/{did}")
120def score(did: str, con=Depends(get_con)):
121 return _assess(con, did)
122
123
124@lru_cache(maxsize=4096) # ponytail: in-process cache, cleared on restart. Batch-backfill if you want it persisted.
125def _resolve_identity(did: str) -> dict:
126 """did -> {handle, pds} via the PLC directory (did:plc) or .well-known (did:web).
127 The atproto handle IS the human identity/domain (e.g. alice.bsky.social)."""
128 try:
129 if did.startswith("did:plc:"):
130 doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}", timeout=5).read())
131 elif did.startswith("did:web:"):
132 domain = did[len("did:web:"):].replace(":", "/")
133 doc = json.loads(urllib.request.urlopen(f"https://{domain}/.well-known/did.json", timeout=5).read())
134 else:
135 return {"handle": None, "pds": None}
136 handle = next((a[len("at://"):] for a in (doc.get("alsoKnownAs") or []) if a.startswith("at://")), None)
137 pds = next((s.get("serviceEndpoint") for s in (doc.get("service") or [])
138 if s.get("type") == "AtprotoPersonalDataServer"), None)
139 return {"handle": handle, "pds": pds}
140 except Exception:
141 return {"handle": None, "pds": None} # offline / unknown did -> caller falls back to the DID
142
143
144@app.get("/identity/{did}")
145def identity(did: str, con=Depends(get_con)):
146 # Gate to known contributors: bounds resolution to our own DIDs (no SSRF via arbitrary did:web).
147 if not con.execute("SELECT 1 FROM contributors WHERE did=?", [did]).fetchone():
148 raise HTTPException(404, "unknown did")
149 stored = con.execute("SELECT handle FROM contributors WHERE did=?", [did]).fetchone()
150 info = _resolve_identity(did)
151 return {"did": did, "handle": (stored and stored[0]) or info["handle"], "pds": info["pds"]}
152
153
154@app.get("/brief/{did}")
155def brief(did: str, text: bool = False, con=Depends(get_con)):
156 """Spoken briefing of an assessment (PRD M7, ElevenLabs). Returns audio/mpeg when
157 ELEVENLABS_API_KEY is set, else the brief text as JSON (so it's demoable keyless)."""
158 script = voice.brief_text(_assess(con, did))
159 audio = None if text else voice.synthesize(script)
160 if audio is None:
161 return {"did": did, "brief": script, "audio": False}
162 return Response(content=audio, media_type="audio/mpeg")
163
164
165class ReviewBody(BaseModel):
166 diff: str
167 title: str = ""
168 description: str = ""
169 discussion: str = ""
170
171
172@app.post("/review/pr")
173def review(body: ReviewBody):
174 out = review_mod.review_pr(body.diff, body.title, body.description, body.discussion)
175 if out is None:
176 raise HTTPException(503, f"set {CFG.review.api_key_env} to enable Claude review")
177 return out
178
179
180@app.get("/leaderboard")
181def leaderboard(limit: int = 50, con=Depends(get_con)):
182 er = _eigen(con)
183 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
184 profiles = _profiles(con)
185 vouches = dict(con.execute( # positive vouches received (the in-degree that earns trust)
186 "SELECT subject_did, count(*) FROM vouches WHERE COALESCE(polarity,1) > 0 GROUP BY subject_did"
187 ).fetchall())
188 ranked = sorted(er.trust.items(), key=lambda kv: kv[1], reverse=True)[:limit]
189 return [{"did": d, "handle": handles.get(d), "calibrated_prob": round(t, 4),
190 "decision": fusion.decide(t, None), "profile": profiles.get(d),
191 "vouches": vouches.get(d, 0)} for d, t in ranked]
192
193
194@app.get("/scores")
195def scores_all(con=Depends(get_con)):
196 """Compact did -> {p, d, h} for every contributor (CyberCred badge-cache source).
197 One EigenTrust compute; stored calibrated score wins, else structural + gate decision."""
198 er = _eigen(con)
199 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
200 latest = {r[0]: (r[1], r[2]) for r in con.execute(
201 "SELECT s.did, s.calibrated_prob, s.decision FROM scores s "
202 "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
203 "ON s.did=l.did AND s.as_of=l.m").fetchall()}
204 out = {}
205 for did in set(er.trust) | set(latest) | set(handles):
206 if did in latest:
207 p, d = latest[did]
208 else:
209 p = er.trust.get(did, 0.0)
210 d = fusion.decide(p, None)
211 out[did] = {"p": round(float(p), 4), "d": d, "h": handles.get(did)}
212 return out
213
214
215@app.get("/graph")
216def graph(connected: bool = False, con=Depends(get_con)):
217 """Vouch graph as {nodes, links} for the Obsidian-style force view (7.x).
218 Every contributor is a node (trust drives size/color); vouches are the links.
219 `?connected=1` drops unvouched contributors down to just the relationship core."""
220 er = _eigen(con)
221 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
222 seeds = set(er.seeds)
223 links = [{"source": v, "target": s, "polarity": p}
224 for v, s, p in con.execute(
225 "SELECT voucher_did, subject_did, polarity FROM vouches").fetchall()
226 if v in er.trust and s in er.trust] # drop dangling edges the layout can't place
227 dids = ({d for l in links for d in (l["source"], l["target"])} | seeds
228 if connected else er.trust.keys()) # all known dids unless asked to trim
229 nodes = [{"id": did, "handle": handles.get(did) or did[:14],
230 "trust": round(er.trust[did], 4), "decision": fusion.decide(er.trust[did], None),
231 "seed": did in seeds}
232 for did in dids]
233 return {"nodes": nodes, "links": links}
234
235
236@app.get("/triage")
237def triage(con=Depends(get_con)):
238 """Open PRs grouped by decision, with the explanation breakdown (section 7.1).
239 One EigenTrust compute + one bulk scores query (was N per-PR queries + recomputes,
240 which timed out at ~5.7k open PRs)."""
241 er = _eigen(con)
242 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
243 profiles = _profiles(con)
244 # latest stored score per did in a single pass (prob, decision, reason already computed)
245 scored = {r[0]: (r[1], r[2], r[3]) for r in con.execute(
246 "SELECT s.did, s.calibrated_prob, s.decision, s.explanation_json FROM scores s "
247 "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
248 "ON s.did=l.did AND s.as_of=l.m").fetchall()}
249 rows = con.execute(
250 # merged/closed_unmerged are NULL on backfilled PRs; `NOT NULL` is NULL (drops the
251 # row), so COALESCE the unknowns to FALSE -- otherwise every open PR vanishes.
252 "SELECT pr_id, author_did, repo FROM pull_requests "
253 "WHERE merged IS NOT TRUE AND closed_unmerged IS NOT TRUE"
254 ).fetchall()
255 out = []
256 for pr_id, did, repo in rows:
257 if did in scored:
258 prob, decision, reason_json = scored[did]
259 reason = json.loads(reason_json or "{}")
260 else: # author never scored (rare): structural fallback, no per-PR recompute
261 prob = er.trust.get(did, 0.0)
262 decision = fusion.decide(prob, None)
263 reason = {"top_factors": ["no stored score; raw structural trust"]}
264 out.append({"pr_id": pr_id, "repo": repo, "handle": handles.get(did), "did": did,
265 "calibrated_prob": round(prob, 4), "decision": decision, "explanation": reason,
266 "profile": profiles.get(did)})
267 return out
268
269
270@app.get("/metrics")
271def metrics(con=Depends(get_con)):
272 """Aggregates for the dashboard (PRD 6.10 / 7.2). JSON only; the UI renders it."""
273 er = _eigen(con)
274 probs = _gate_probs(con, er) # the gate's real P(clean) per did (M5 if trained), not raw trust
275 dist = [0] * 10
276 for p in probs.values():
277 dist[min(int(p * 10), 9)] += 1
278
279 decisions = {"fast_lane": 0, "normal_queue": 0, "needs_human": 0}
280 for p in probs.values():
281 decisions[fusion.decide(p, None)] += 1
282 total = max(sum(decisions.values()), 1)
283
284 # False-approval backtest (PRD 6.8): of historical PRs whose author is fast-lane-eligible
285 # (gate prob >= T_HIGH), what fraction were NOT clean merges? Counted PER PR (not per
286 # author-with-any-blemish, which over-counts prolific authors). This is an UPPER BOUND:
287 # merge detection only confirmed 801/5768 merges, so most clean_merge=0 are merges we
288 # couldn't CONFIRM (82% have no fetched diff), not bad PRs -- the true rate is lower.
289 # Widen merge coverage (git-on-knots: more knots + Change-Id squash matching) to tighten it.
290 fa_total = fa_bad = 0
291 for author_did, clean_merge in con.execute(
292 "SELECT author_did, clean_merge FROM pr_labels WHERE clean_merge IS NOT NULL"
293 ).fetchall():
294 if probs.get(author_did, er.trust.get(author_did, 0.0)) >= CFG.gate.T_HIGH:
295 fa_total += 1
296 if clean_merge == 0:
297 fa_bad += 1
298 cur = con.execute("SELECT last_time_us FROM ingest_state WHERE stream='jetstream'").fetchone()
299 return {
300 "score_distribution": dist,
301 "decisions": decisions,
302 "fast_lane_rate": decisions["fast_lane"] / total,
303 "false_approval_rate": (fa_bad / fa_total) if fa_total else None,
304 "vouch_graph": {
305 "contributors": con.execute("SELECT count(*) FROM contributors").fetchone()[0],
306 "edges": con.execute("SELECT count(*) FROM vouches").fetchone()[0],
307 "seeds": len(er.seeds),
308 },
309 "ingest_last_time_us": cur[0] if cur else None,
310 }
311
312
313@app.get("/backfill/status")
314def backfill_status(con=Depends(get_con)):
315 """Live scrape progress: per-collection record counts in the raw `events` mirror,
316 plus what derive() has typed-extracted. The page polls this every couple seconds."""
317 from .backfill import COLLECTIONS as expected
318
319 counts = dict(con.execute(
320 "SELECT collection, count(*) FROM events GROUP BY collection"
321 ).fetchall())
322 # Show every planned collection (0 until reached), then any extras actually seen.
323 collections = {c: counts.get(c, 0) for c in expected}
324 for c, n in counts.items():
325 collections.setdefault(c, n)
326 return {
327 "collections": collections,
328 "total": sum(counts.values()),
329 "derived": {
330 "contributors": con.execute("SELECT count(*) FROM contributors").fetchone()[0],
331 "vouches": con.execute("SELECT count(*) FROM vouches").fetchone()[0],
332 "pull_requests": con.execute("SELECT count(*) FROM pull_requests").fetchone()[0],
333 },
334 }
335
336
337# --- static surfaces -------------------------------------------------------
338@app.get("/")
339def root():
340 return FileResponse(STATIC / "triage.html")
341
342
343@app.get("/backfill")
344def backfill_page():
345 return FileResponse(STATIC / "backfill.html")
346
347
348@app.get("/dashboard")
349def dashboard():
350 return FileResponse(STATIC / "dashboard.html")
351
352
353@app.get("/leaderboard.html")
354def leaderboard_page():
355 return FileResponse(STATIC / "leaderboard.html")
356
357
358@app.get("/graph.html")
359def graph_page():
360 return FileResponse(STATIC / "graph.html")
361
362
363app.mount("/static", StaticFiles(directory=STATIC), name="static")
364
365
366def main() -> None:
367 import uvicorn
368
369 uvicorn.run(app, host="127.0.0.1", port=8003)
370
371
372if __name__ == "__main__":
373 main()