···1616export ANTHROPIC_API_KEY="sk-ant-..."
1717export CLAUDE_MODEL="claude-sonnet-4-6"
18181919+# Featherless embeddings (diff/slop-similarity path). EMBED_DIMENSIONS optional (MRL).
2020+export FEATHERLESS_API_KEY="..."
2121+export EMBED_MODEL="Qwen/Qwen3-Embedding-4B"
2222+1923# Create the venv ON the drive, not in the repo:
2024# python -m venv "$DATA_ROOT/venv" && source "$DATA_ROOT/venv/bin/activate"
+16-7
README.md
···99with isotonic calibration; GraphSAGE trained offline and compared (not served — it doesn't
1010beat M5 on this sparse graph, and the PRD says ship it only if it does); the
1111attestation-gated sensitive-repo tier (6.13); AT-Proto writeback of assessments as records
1212-(6.11); and the Tangled browser overlay (7.4). Only the ElevenLabs voice briefing is left
1313-as a seam — see "What's skipped".
1212+(6.11); the diff-embedding slop signal (6.12); a spoken `/brief` (ElevenLabs); and the
1313+Tangled browser overlay (7.4).
14141515## Layout
1616···147147- **Browser overlay (7.4).** `extension/` is a minimal MV3 content script that injects a
148148 trust hat onto tangled.org from the same `/score` API. Load unpacked; see `extension/README.md`.
149149 Confirm the DID selector against the real DOM (the UI analog of confirming NSIDs).
150150+- **Diff-embedding slop signal (6.12).** `trust-embed --build` embeds **every** scraped PR diff
151151+ (Featherless / `Qwen3-Embedding-4B`) into the `diff_vectors` table — idempotent and resumable
152152+ (`pr_id NOT IN diff_vectors`), so re-run it as `trust.backfill` keeps filling `pull_requests`,
153153+ or leave `trust-embed --build --watch` running to keep pace. Scoring then cosine-k-NNs each new
154154+ diff against the embeddings of *currently* known-bad PRs (`slop_score` joins `pr_labels`
155155+ `clean_merge=0`, so re-labelling never needs a re-embed) and hands the max similarity to Claude
156156+ as a `machine_findings` hint (advisory — surfaces in the explanation, never flips the gate).
157157+ Vector search stays inside DuckDB; no key → nothing embedded and the signal is just absent.
158158+- **Spoken briefing (M7).** `GET /brief/{did}` composes a speakable summary of the decision
159159+ (no DIDs read aloud) and returns `audio/mpeg` when `ELEVENLABS_API_KEY` is set, JSON text
160160+ otherwise. `trust.voice.brief_text` is the composer; reused by the API.
150161151162## What's skipped (and when to add it)
152163153153-- **ElevenLabs voice briefing (M7).** A thin wrapper: the explanation `summary` is already
154154- "suitable to read aloud" (6.6), so a `/brief/{did}` endpoint that pipes it to TTS is the
155155- whole job — add when you have an ElevenLabs key.
156164- **Per-PR writeback subject.** `sh.tangled.trust.score` currently keys on the contributor
157165 DID; carry `pr_id` on the `scores` table to reference a specific PR's `at://` URI.
158166- **SvelteKit frontend.** The three surfaces ship as built-in static pages (the PRD blesses
159167 this for the dashboard); swap to SvelteKit if you need the richer UI kit / native overlay.
160160-- **External signals (6.12): OSV/secret-scan/SAST.** `review_pr` already accepts
161161- `machine_findings` as structured input — wire the scanners into that arg.
168168+- **More external signals (6.12): OSV/secret-scan/SAST.** `review_pr` already accepts
169169+ `machine_findings` (the slop similarity is the first one wired in) — add the scanners' output
170170+ to that same dict.
+228
docs/content-tower-plan.md
···11+# Content Tower (Tier 1: frozen embeddings + calibrated head) — build plan
22+33+Self-contained build doc. You can clear the conversation and hand this to a fresh
44+agent. It captures the plan **and** the facts discovered while exploring the live data,
55+so nothing here depends on chat history.
66+77+---
88+99+## 0. What this is
1010+1111+The PRD fuses two independent signals through a **monotone gate** (not an average):
1212+1313+- **Tower A — identity trust** (per-DID, sybil-resistant, **load-bearing**): EigenTrust
1414+ over the vouch graph. Already built (`eigentrust.py`).
1515+- **Tower B — content risk** (per-PR, **identity-blind**): how risky *this diff* is,
1616+ judged with no knowledge of the author. Today this is only Claude (`review.py`) at the
1717+ gate, plus an advisory slop-kNN. **This doc builds the learned Tower B.**
1818+1919+Tier 1 = run each diff through the **already-wired embedding transformer**
2020+(Featherless Qwen3-Embedding) and train a small **calibrated head** on `clean_merge`,
2121+using only the diff. Transformer representation power, no fine-tuning, leakage-free by
2222+construction (the model never sees identity).
2323+2424+Tier 2 (fine-tuning a code transformer) is **deferred** until there are ~10³–10⁴ labeled
2525+diffs — below that it loses to frozen-embeddings + a linear head. Not in scope here.
2626+2727+### Non-negotiable constraints (PRD) — must hold for every phase
2828+- Content models judge **content, never identity**: no author handle/DID/history/aggregates
2929+ feed Tower B. Diff + PR-intrinsic stats (size, files, discussion length) only.
3030+- Structural signal (Tower A / EigenTrust) stays load-bearing and sybil-resistant.
3131+- The gate is **not an average**: content can only **penalize**, never lift an untrusted DID
3232+ into the fast lane.
3333+- Calibrated + explainable. Serve a learned model only if it **beats its baseline** on a
3434+ proper holdout (same rule the GNN already follows).
3535+- Serviceless: single DuckDB file, large artifacts under `DATA_ROOT`.
3636+3737+---
3838+3939+## 1. Critical path
4040+4141+```
4242+Phase 0: fetch diffs (patchBlobs) ─┐
4343+ ├─► Phase 2 embed ─► Phase 3 head ─► Phase 4 fuse ─► Phase 5 eval-gate
4444+Phase 1: merged labels ────────────┘
4545+```
4646+4747+**Phase 0 + Phase 1 are prerequisites for ANY content model** (transformer or not) and for
4848+Claude review and slop-kNN — all three are dead without diffs. Start at Phase 0.
4949+5050+---
5151+5252+## 2. Live data facts (as observed)
5353+5454+- **Live DB**: `/Volumes/spectrofi-rec/tangled-data/duckdb/trust.duckdb`
5555+ (`DATA_ROOT=/Volumes/spectrofi-rec/tangled-data`). The repo-local `.data/…` is a stale dev DB — ignore it.
5656+- Backfill is rich but the derived/label layer is **stale** (it ran before some `derive()`
5757+ branches existed). Snapshot:
5858+ - events 83,991 · contributors 10,848 · **vouches 2,029 (+) / 37 (−)** · pulls 5,768
5959+ - `seeds` = **0**, `pull_status` = **0** (collection was not in the old backfill — not even archived),
6060+ `stars` = 0 (14,409 `feed.star` events archived but not re-derived), `diff_text` = **0**
6161+ (patchBlobs never fetched), **0 positive `clean_merge` labels**, no trained model.
6262+- **Read the live DB read-only with retry** (single-writer; a held lock blocks every open):
6363+ ```python
6464+ import duckdb, time
6565+ con=None
6666+ for _ in range(80):
6767+ try: con=duckdb.connect("/Volumes/spectrofi-rec/tangled-data/duckdb/trust.duckdb", read_only=True); break
6868+ except duckdb.IOException: time.sleep(0.3)
6969+ ```
7070+ Pause `ingest`/`api`/`backfill` before writing, or writes crawl on the lock.
7171+7272+### Record shapes you'll need (confirmed from the network)
7373+7474+`sh.tangled.repo.pull` (the diff is a gzipped blob, NOT inline):
7575+```json
7676+{ "rounds": [ { "createdAt": "...",
7777+ "patchBlob": { "$type": "blob", "ref": { "$link": "<CID>" },
7878+ "mimeType": "application/gzip", "size": 49502 } } ],
7979+ "source": { "branch": "..." },
8080+ "target": { "branch": "...", "repo": "did:plc:…", "repoDid": "did:plc:…" } }
8181+```
8282+- `pr_id` convention (set in `ingest.derive`): `f"{author_did}/{collection}/{rkey}"`,
8383+ e.g. `did:plc:X/sh.tangled.repo.pull/3mp…`.
8484+- The **latest round** (`rounds[-1]`) is the final proposed change — embed/review that.
8585+8686+`sh.tangled.repo.pull.status` (authoritative outcome, public; sparse):
8787+```json
8888+{ "pull": "at://did:plc:X/sh.tangled.repo.pull/<rkey>",
8989+ "status": "sh.tangled.repo.pull.status.merged" } // .merged / .closed / .open
9090+```
9191+- Status author may differ from the pull owner — parse `pr_id` from the `pull` field
9292+ (`uri[len("at://"):]`), never from the status record's own did/rkey. (`derive()` already does this.)
9393+9494+Knot git clone URL (for the Phase-1 label backstop, git-on-knots): `https://{knot}/{owner_did}/{repo}`
9595+(https, no auth for public repos; `git ls-remote` returns `refs/heads/main`).
9696+9797+### Existing code to build on
9898+- `src/trust/embed.py` — `index_diffs(con, limit=256)` already embeds every `pull_requests.diff_text`
9999+ into `diff_vectors(pr_id, label, embedding DOUBLE[])`, idempotent/resumable; `embed()` returns
100100+ `None` without `FEATHERLESS_API_KEY`; `slop_score()` cosine-kNN vs `clean_merge=0`.
101101+- `src/trust/backfill.py` — reuse `_pds(did)`, `_get(url)`, `_records(pds,did,coll)`, the
102102+ `ThreadPoolExecutor` fan-out pattern, `_archive_and_derive`.
103103+- `src/trust/db.py` — `pull_requests.diff_text`, `pull_status`, `diff_vectors`, `pr_labels`
104104+ view (`clean_merge`), `connection(read_only=…)`, `ensure_schema()`.
105105+- `src/trust/ingest.py` — `derive()` (pull / pull_status / star branches).
106106+- `src/trust/learned.py` — copy its shape: `FEATURE_COLS`, `_vec`, `train(split)`,
107107+ `LearnedScorer`, isotonic calibration, `_reliability`, `MODEL_PATH = MODEL_DIR/…`.
108108+- `src/trust/fusion.py` — `score_pr`, `decide`, `should_review`, `_features_for`.
109109+- `src/trust/config.py` — `CFG.embed` (Featherless), `CFG.review`, `MODEL_DIR`.
110110+111111+---
112112+113113+## 3. Phases
114114+115115+### Phase 0 — Fetch the diffs (new `src/trust/diffs.py`)
116116+The highest-leverage unblock: lights up the content head, Claude review, **and** slop-kNN.
117117+118118+Steps:
119119+1. Select pulls needing a diff: `SELECT pr_id, author_did, record(from events) FROM pull_requests WHERE diff_text IS NULL`.
120120+ The CID lives in the archived `events.record` JSON (`rounds[-1].patchBlob.ref.$link`); join
121121+ `events` on `(did, collection, rkey)` or re-read it.
122122+2. For each: resolve `_pds(author_did)`, then
123123+ `GET {pds}/xrpc/com.atproto.sync.getBlob?did={author_did}&cid={cid}` → bytes.
124124+3. `gzip.decompress(bytes).decode("utf-8", "replace")` → unified-diff text. Cap stored length
125125+ (~50 KB; embeddings/Claude truncate anyway). `UPDATE pull_requests SET diff_text=? WHERE pr_id=?`.
126126+4. Parallelize like `backfill`: network fetch in a 12-thread pool, DB writes in chunks (single writer).
127127+ Skip missing/oversized blobs gracefully (never abort the run).
128128+129129+Deliverable: `pull_requests.diff_text` populated for ~5,768 PRs (minutes of network).
130130+Self-check: a `demo()` that fetches one known blob and asserts it gunzips to text containing `diff`/`@@`.
131131+132132+### Phase 1 — Merged labels (you need a positive class)
133133+- Targeted scrape of `sh.tangled.repo.pull.status` (already mapped in `COLLECTION_KINDS` and handled
134134+ in `derive()`): `python -m trust.backfill --collection sh.tangled.repo.pull.status` (capped first
135135+ with `--max-repos`).
136136+- **Measure positives** before building the head:
137137+ `SELECT clean_merge, count(*) FROM pr_labels GROUP BY 1`.
138138+- **Risk:** pull.status is sparse. If positives are only tens, the head is data-starved too.
139139+ Backstop = **git-on-knots `merged` detection** (clone default branch via the knot URL above,
140140+ check whether each pull's patch landed) for broad `merged` coverage + `reverted`/`re-patched`.
141141+ Only build the backstop if pull.status coverage proves insufficient.
142142+143143+Deliverable: `pr_labels.clean_merge` with a real positive class (need ≥ a few hundred ideally;
144144+the trainer requires ≥4 rows spanning both classes as a hard floor).
145145+146146+### Phase 2 — Embed the diffs (frozen transformer)
147147+- Set `FEATHERLESS_API_KEY`. Run `index_diffs` to caught-up (loop while it returns > 0):
148148+ ```python
149149+ from trust.db import connection, ensure_schema
150150+ from trust import embed
151151+ ensure_schema()
152152+ with connection(read_only=False) as con:
153153+ while embed.index_diffs(con, limit=256): pass
154154+ ```
155155+- Optional GPU: self-host Qwen3-Embedding-4B (fits one GPU) to embed ~6k diffs locally for free
156156+ instead of the API. The head itself is CPU-trivial.
157157+158158+Deliverable: `diff_vectors` filled for every PR with a diff.
159159+160160+### Phase 3 — The calibrated head (new `src/trust/content.py`, Tower B)
161161+- `_xy(con)`: `X` = `diff_vectors.embedding` for PRs that have a non-NULL `clean_merge`
162162+ (join `pr_labels`); `y` = `clean_merge`. Optionally concat **PR-intrinsic** scalars
163163+ (`additions, deletions, files_touched, discussion_len`) and the slop-kNN similarity.
164164+ **Never** identity/author features.
165165+- **Model: L2-normalize the embedding → logistic regression (linear probe, L2-reg) → isotonic
166166+ or Platt calibration.** Linear probe is correct for frozen embeddings at low data; LightGBM on
167167+ raw 2560-dim embeddings overfits — keep it only as an alt.
168168+- Time-split train/val (order by `opened_at`). Save `content.pkl` under `MODEL_DIR`.
169169+- `ContentScorer.prob(pr_id) -> P(content safe)`; expose `content_risk = 1 - P`.
170170+- Self-check `demo()`: on held-out PRs, a known-bad diff scores higher risk than a clean one;
171171+ print the reliability curve.
172172+173173+Deliverable: a calibrated content risk for **every** PR (cheap, no API), not just reviewed ones.
174174+175175+### Phase 4 — Fuse into the gate (monotone, unchanged)
176176+- In `fusion.score_pr`: the head supplies `content_risk` for all PRs; Claude (`review_pr`, gated by
177177+ `should_review`) refines ambiguous/sensitive ones. Combine conservatively:
178178+ `content_risk = max(model_risk, claude_risk)` so content still only **penalizes**.
179179+- Win: every PR gets a content signal; today only the Claude-reviewed subset does.
180180+- Keep `decide()` and its thresholds; surface the head's risk in the explanation
181181+ (`build_reason`) like the other factors.
182182+183183+### Phase 5 — Eval + beat-the-baseline gate
184184+- Calibration: reliability curve (reuse `learned._reliability`). Ranking: AUC / average precision.
185185+- **Serve only if it beats**: (a) majority-class, (b) Claude-alone risk where available,
186186+ (c) slop-kNN alone — on a **time-split AND a repo-holdout** (generalize to unseen repos).
187187+- Write a verdict (like `gnn` does); `fusion` consults it before using the head.
188188+189189+---
190190+191191+## 4. Effort & runtime
192192+193193+| Phase | Build | Runtime |
194194+|---|---|---|
195195+| 0 diffs (`diffs.py`) | ~1 hr | few min (network) |
196196+| 1 labels (scrape) | wired | ~10 min capped |
197197+| 2 embed (`index_diffs`) | done | few min (API) |
198198+| 3 head (`content.py`) | ~1 hr | seconds |
199199+| 4 fuse (`fusion.py`) | ~30 min | — |
200200+| 5 eval-gate | ~30 min | seconds |
201201+202202+≈ half a day of build + minutes of runtime, given `FEATHERLESS_API_KEY` and enough Phase-1 positives.
203203+204204+## 5. GPU guidance
205205+- **Tier 1 needs no GPU** — embedding runs on Featherless (remote); the head is CPU-trivial.
206206+- Use a GPU now only to **self-host Qwen3-Embedding-4B** for free bulk embedding of ~6k diffs
207207+ (skip API cost/limits).
208208+- Save the GPU for **Tier 2** (fine-tuning CodeBERT/StarEncoder) — deferred until ~10³–10⁴
209209+ labeled diffs exist.
210210+211211+## 6. Definition of done
212212+- `diffs.py` populates `diff_text`; `pr_labels` has a positive class; `diff_vectors` filled.
213213+- `content.py` trains a calibrated head, identity-blind, with a reliability curve.
214214+- It **beats** majority / Claude-alone / slop-kNN on a time + repo holdout, else it doesn't serve.
215215+- `fusion` consumes it monotonically (content only penalizes); explanation shows the content factor.
216216+- Smoke test added (mirror `tests/test_smoke.py` style: `importorskip` the embedding path; assert a
217217+ bad diff out-risks a clean one).
218218+219219+## 7. Parallel unblock (not this tower, but the other gating item)
220220+Structural scoring is still blocked by **`seeds = 0`** + stale derives. Independent of Tower B:
221221+1. `--rederive` from archived `events` (no network) → repopulates `stars` (and any archived
222222+ collections) through the current `derive()`.
223223+2. Seed real maintainer DIDs — top vouch-receivers are the anchors:
224224+ `did:plc:onu3oqfahfubgbetlr4giknc` (141 in), `did:plc:wshs7t2adsemcrrd4snkeqli` (89),
225225+ `did:plc:qfpnj4og54vl56wngdriaxug` (56)… → `INSERT INTO seeds …`.
226226+3. `trust-train` once labels (Phase 1) exist.
227227+EigenTrust (Tower A) and the content head (Tower B) can be built in either order; the gate needs both.
228228+```
+9-1
mprocs.yaml
···1515 autostart: true
16161717 api:
1818- shell: uv run trust-api # http://127.0.0.1:8000 (triage/dashboard/leaderboard)
1818+ shell: uv run trust-api # http://127.0.0.1:8003 (JSON: triage/dashboard/leaderboard/graph)
1919 autostart: true
2020+2121+ web:
2222+ shell: cd web && bun run dev # http://127.0.0.1:5173 (SvelteKit UI -> proxies /api to :8003)
2323+ autostart: true # set API_BASE to override the :8003 proxy default
2424+2525+ embed:
2626+ shell: uv run trust-embed --build # (re)build the known-bad slop corpus (6.12)
2727+ autostart: false # one-shot; needs FEATHERLESS_API_KEY, else indexes 0
20282129 ingest:
2230 shell: uv run python -m trust.ingest # live firehose -> DuckDB
···11+# CyberCred scorer: thin read-API over the trust DuckDB (no torch/lightgbm needed —
22+# fusion._gnn_winner()/_scorer() degrade to EigenTrust + the precomputed scores table).
33+FROM python:3.12-slim
44+WORKDIR /app
55+RUN pip install --no-cache-dir \
66+ "duckdb>=1.1" "numpy>=1.26" "scipy>=1.11" "fastapi>=0.115" \
77+ "uvicorn>=0.30" "httpx>=0.27" "pydantic>=2.7" "anthropic>=0.40" "websockets>=12"
88+COPY src/ /app/src/
99+ENV PYTHONPATH=/app/src
1010+ENV DATA_ROOT=/data
1111+ENV DUCKDB_PATH=/data/trust.duckdb
1212+EXPOSE 8000
1313+# single worker: DuckDB is single-writer and ensure_schema() opens RW once at startup
1414+CMD ["uvicorn", "trust.api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
+220-29
src/trust/api.py
···1010from __future__ import annotations
11111212import json
1313+import urllib.request
1314from contextlib import asynccontextmanager
1515+from functools import lru_cache
1416from pathlib import Path
15171618from fastapi import Depends, FastAPI, HTTPException
1719from fastapi.middleware.cors import CORSMiddleware
1818-from fastapi.responses import FileResponse
2020+from fastapi.responses import FileResponse, Response
1921from fastapi.staticfiles import StaticFiles
2022from pydantic import BaseModel
21232224from .config import CFG
2325from .db import connection, ensure_schema
2424-from . import eigentrust, fusion, review as review_mod
2626+from . import eigentrust, fusion, review as review_mod, voice
252726282729@asynccontextmanager
···4850 return eigentrust.compute(con)
495150525151-@app.get("/score/{did}")
5252-def score(did: str, con=Depends(get_con)):
5353+def _gate_probs(con, er):
5454+ """did -> the SAME calibrated P(clean) the gate uses (winning GNN -> M5 -> raw
5555+ EigenTrust), so the dashboard mirrors real decisions instead of raw structural trust.
5656+ M5 blends the non-vouch signals (age/merged-history/stars) that lift active-but-unvouched
5757+ contributors; reading raw er.trust here hides that. Loads the scorer + features ONCE
5858+ (per-did SQL x10k would crawl)."""
5959+ gnn = fusion._gnn_winner()
6060+ scorer = None if gnn else fusion._scorer()
6161+ if gnn is None and scorer is None:
6262+ return dict(er.trust)
6363+ cols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
6464+ di = cols.index("did")
6565+ feats = {r[di]: dict(zip(cols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
6666+ return {did: (gnn.prob(did) if gnn else scorer.prob(did, feats.get(did) or {}, er))
6767+ for did in er.trust}
6868+6969+7070+def _profiles(con) -> dict:
7171+ """did -> declared profile bits from the latest sh.tangled.actor.profile per did:
7272+ display name (preferredHandle), avatar blob CID, declared links, location/pronouns/
7373+ description. One query, in-memory join onto the list endpoints (per-row SQL would
7474+ crawl). Avatars resolve client-side via the bsky CDN thumbnail from (did + cid);
7575+ PRD 6.10 wants avatar+handle on each row."""
7676+ clean = lambda s: (s or "").strip() or None
7777+ out = {}
7878+ for did, rec in con.execute(
7979+ "SELECT did, record FROM events WHERE collection='sh.tangled.actor.profile' "
8080+ "QUALIFY row_number() OVER (PARTITION BY did ORDER BY time_us DESC) = 1"
8181+ ).fetchall():
8282+ try:
8383+ d = json.loads(rec)
8484+ except (TypeError, ValueError):
8585+ continue
8686+ av = d.get("avatar")
8787+ out[did] = {
8888+ "name": clean(d.get("preferredHandle")),
8989+ "avatar_cid": av.get("ref", {}).get("$link") if isinstance(av, dict) else None,
9090+ "links": [l for l in (d.get("links") or []) if isinstance(l, str) and l.strip()],
9191+ "location": clean(d.get("location")),
9292+ "pronouns": clean(d.get("pronouns")),
9393+ "description": clean(d.get("description")),
9494+ }
9595+ return out
9696+9797+9898+def _assess(con, did: str) -> dict:
9999+ """Shared assessment: latest written score, else a fresh structural compute."""
53100 er = _eigen(con)
54101 feats = fusion._features_for(con, did)
55102 structural, model_factors = fusion.structural_for(did, er, feats) # M5 calibrated, or raw EigenTrust
···62109 reason = json.loads(latest[3]) if latest else fusion.build_reason(
63110 did, structural, None, er, feats, model_factors)
64111 prob = latest[1] if latest else structural
6565- return {"did": did, "structural_trust": structural, "content_risk": content_risk,
112112+ handle = con.execute("SELECT handle FROM contributors WHERE did=?", [did]).fetchone()
113113+ return {"did": did, "handle": handle[0] if handle else None,
114114+ "structural_trust": structural, "content_risk": content_risk,
66115 "calibrated_prob": prob, "decision": decision, "explanation": reason,
67116 "top_factors": reason.get("top_factors", [])}
6811769118119119+@app.get("/score/{did}")
120120+def score(did: str, con=Depends(get_con)):
121121+ return _assess(con, did)
122122+123123+124124+@lru_cache(maxsize=4096) # ponytail: in-process cache, cleared on restart. Batch-backfill if you want it persisted.
125125+def _resolve_identity(did: str) -> dict:
126126+ """did -> {handle, pds} via the PLC directory (did:plc) or .well-known (did:web).
127127+ The atproto handle IS the human identity/domain (e.g. alice.bsky.social)."""
128128+ try:
129129+ if did.startswith("did:plc:"):
130130+ doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}", timeout=5).read())
131131+ elif did.startswith("did:web:"):
132132+ domain = did[len("did:web:"):].replace(":", "/")
133133+ doc = json.loads(urllib.request.urlopen(f"https://{domain}/.well-known/did.json", timeout=5).read())
134134+ else:
135135+ return {"handle": None, "pds": None}
136136+ handle = next((a[len("at://"):] for a in (doc.get("alsoKnownAs") or []) if a.startswith("at://")), None)
137137+ pds = next((s.get("serviceEndpoint") for s in (doc.get("service") or [])
138138+ if s.get("type") == "AtprotoPersonalDataServer"), None)
139139+ return {"handle": handle, "pds": pds}
140140+ except Exception:
141141+ return {"handle": None, "pds": None} # offline / unknown did -> caller falls back to the DID
142142+143143+144144+@app.get("/identity/{did}")
145145+def identity(did: str, con=Depends(get_con)):
146146+ # Gate to known contributors: bounds resolution to our own DIDs (no SSRF via arbitrary did:web).
147147+ if not con.execute("SELECT 1 FROM contributors WHERE did=?", [did]).fetchone():
148148+ raise HTTPException(404, "unknown did")
149149+ stored = con.execute("SELECT handle FROM contributors WHERE did=?", [did]).fetchone()
150150+ info = _resolve_identity(did)
151151+ return {"did": did, "handle": (stored and stored[0]) or info["handle"], "pds": info["pds"]}
152152+153153+154154+@app.get("/brief/{did}")
155155+def brief(did: str, text: bool = False, con=Depends(get_con)):
156156+ """Spoken briefing of an assessment (PRD M7, ElevenLabs). Returns audio/mpeg when
157157+ ELEVENLABS_API_KEY is set, else the brief text as JSON (so it's demoable keyless)."""
158158+ script = voice.brief_text(_assess(con, did))
159159+ audio = None if text else voice.synthesize(script)
160160+ if audio is None:
161161+ return {"did": did, "brief": script, "audio": False}
162162+ return Response(content=audio, media_type="audio/mpeg")
163163+164164+70165class ReviewBody(BaseModel):
71166 diff: str
72167 title: str = ""
···86181def leaderboard(limit: int = 50, con=Depends(get_con)):
87182 er = _eigen(con)
88183 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
184184+ profiles = _profiles(con)
185185+ vouches = dict(con.execute( # positive vouches received (the in-degree that earns trust)
186186+ "SELECT subject_did, count(*) FROM vouches WHERE COALESCE(polarity,1) > 0 GROUP BY subject_did"
187187+ ).fetchall())
89188 ranked = sorted(er.trust.items(), key=lambda kv: kv[1], reverse=True)[:limit]
90189 return [{"did": d, "handle": handles.get(d), "calibrated_prob": round(t, 4),
9191- "decision": fusion.decide(t, None)} for d, t in ranked]
190190+ "decision": fusion.decide(t, None), "profile": profiles.get(d),
191191+ "vouches": vouches.get(d, 0)} for d, t in ranked]
192192+193193+194194+@app.get("/scores")
195195+def scores_all(con=Depends(get_con)):
196196+ """Compact did -> {p, d, h} for every contributor (CyberCred badge-cache source).
197197+ One EigenTrust compute; stored calibrated score wins, else structural + gate decision."""
198198+ er = _eigen(con)
199199+ handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
200200+ latest = {r[0]: (r[1], r[2]) for r in con.execute(
201201+ "SELECT s.did, s.calibrated_prob, s.decision FROM scores s "
202202+ "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
203203+ "ON s.did=l.did AND s.as_of=l.m").fetchall()}
204204+ out = {}
205205+ for did in set(er.trust) | set(latest) | set(handles):
206206+ if did in latest:
207207+ p, d = latest[did]
208208+ else:
209209+ p = er.trust.get(did, 0.0)
210210+ d = fusion.decide(p, None)
211211+ out[did] = {"p": round(float(p), 4), "d": d, "h": handles.get(did)}
212212+ return out
213213+214214+215215+@app.get("/graph")
216216+def graph(connected: bool = False, con=Depends(get_con)):
217217+ """Vouch graph as {nodes, links} for the Obsidian-style force view (7.x).
218218+ Every contributor is a node (trust drives size/color); vouches are the links.
219219+ `?connected=1` drops unvouched contributors down to just the relationship core."""
220220+ er = _eigen(con)
221221+ handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
222222+ seeds = set(er.seeds)
223223+ links = [{"source": v, "target": s, "polarity": p}
224224+ for v, s, p in con.execute(
225225+ "SELECT voucher_did, subject_did, polarity FROM vouches").fetchall()
226226+ if v in er.trust and s in er.trust] # drop dangling edges the layout can't place
227227+ dids = ({d for l in links for d in (l["source"], l["target"])} | seeds
228228+ if connected else er.trust.keys()) # all known dids unless asked to trim
229229+ nodes = [{"id": did, "handle": handles.get(did) or did[:14],
230230+ "trust": round(er.trust[did], 4), "decision": fusion.decide(er.trust[did], None),
231231+ "seed": did in seeds}
232232+ for did in dids]
233233+ return {"nodes": nodes, "links": links}
922349323594236@app.get("/triage")
95237def triage(con=Depends(get_con)):
9696- """Open PRs grouped by decision, with the explanation breakdown (section 7.1)."""
238238+ """Open PRs grouped by decision, with the explanation breakdown (section 7.1).
239239+ One EigenTrust compute + one bulk scores query (was N per-PR queries + recomputes,
240240+ which timed out at ~5.7k open PRs)."""
97241 er = _eigen(con)
98242 handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
243243+ profiles = _profiles(con)
244244+ # latest stored score per did in a single pass (prob, decision, reason already computed)
245245+ scored = {r[0]: (r[1], r[2], r[3]) for r in con.execute(
246246+ "SELECT s.did, s.calibrated_prob, s.decision, s.explanation_json FROM scores s "
247247+ "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
248248+ "ON s.did=l.did AND s.as_of=l.m").fetchall()}
99249 rows = con.execute(
100100- "SELECT pr_id, author_did, repo FROM pull_requests WHERE NOT merged AND NOT closed_unmerged"
250250+ # merged/closed_unmerged are NULL on backfilled PRs; `NOT NULL` is NULL (drops the
251251+ # row), so COALESCE the unknowns to FALSE -- otherwise every open PR vanishes.
252252+ "SELECT pr_id, author_did, repo FROM pull_requests "
253253+ "WHERE merged IS NOT TRUE AND closed_unmerged IS NOT TRUE"
101254 ).fetchall()
102255 out = []
103256 for pr_id, did, repo in rows:
104104- structural = er.trust.get(did, 0.0)
105105- latest = con.execute(
106106- "SELECT calibrated_prob, decision, explanation_json FROM scores WHERE did=? "
107107- "ORDER BY as_of DESC LIMIT 1", [did]
108108- ).fetchone()
109109- prob = latest[0] if latest else structural
110110- decision = latest[1] if latest else fusion.decide(structural, None)
111111- reason = json.loads(latest[2]) if latest else fusion.build_reason(
112112- did, structural, None, er, fusion._features_for(con, did))
257257+ if did in scored:
258258+ prob, decision, reason_json = scored[did]
259259+ reason = json.loads(reason_json or "{}")
260260+ else: # author never scored (rare): structural fallback, no per-PR recompute
261261+ prob = er.trust.get(did, 0.0)
262262+ decision = fusion.decide(prob, None)
263263+ reason = {"top_factors": ["no stored score; raw structural trust"]}
113264 out.append({"pr_id": pr_id, "repo": repo, "handle": handles.get(did), "did": did,
114114- "calibrated_prob": round(prob, 4), "decision": decision, "explanation": reason})
265265+ "calibrated_prob": round(prob, 4), "decision": decision, "explanation": reason,
266266+ "profile": profiles.get(did)})
115267 return out
116268117269···119271def metrics(con=Depends(get_con)):
120272 """Aggregates for the dashboard (PRD 6.10 / 7.2). JSON only; the UI renders it."""
121273 er = _eigen(con)
274274+ probs = _gate_probs(con, er) # the gate's real P(clean) per did (M5 if trained), not raw trust
122275 dist = [0] * 10
123123- for t in er.trust.values():
124124- dist[min(int(t * 10), 9)] += 1
276276+ for p in probs.values():
277277+ dist[min(int(p * 10), 9)] += 1
125278126279 decisions = {"fast_lane": 0, "normal_queue": 0, "needs_human": 0}
127127- for t in er.trust.values():
128128- decisions[fusion.decide(t, None)] += 1
280280+ for p in probs.values():
281281+ decisions[fusion.decide(p, None)] += 1
129282 total = max(sum(decisions.values()), 1)
130283131131- # False-approval backtest: of historical PRs whose author is fast-lane-eligible
132132- # (structural >= T_HIGH), what fraction were NOT clean_merge? (PRD 6.8)
284284+ # False-approval backtest (PRD 6.8): of historical PRs whose author is fast-lane-eligible
285285+ # (gate prob >= T_HIGH), what fraction were NOT clean merges? Counted PER PR (not per
286286+ # author-with-any-blemish, which over-counts prolific authors). This is an UPPER BOUND:
287287+ # merge detection only confirmed 801/5768 merges, so most clean_merge=0 are merges we
288288+ # couldn't CONFIRM (82% have no fetched diff), not bad PRs -- the true rate is lower.
289289+ # Widen merge coverage (git-on-knots: more knots + Change-Id squash matching) to tighten it.
133290 fa_total = fa_bad = 0
134134- for did, clean_rate in con.execute(
135135- "SELECT did, clean_merge_rate FROM features WHERE clean_merge_rate IS NOT NULL"
291291+ for author_did, clean_merge in con.execute(
292292+ "SELECT author_did, clean_merge FROM pr_labels WHERE clean_merge IS NOT NULL"
136293 ).fetchall():
137137- if er.trust.get(did, 0.0) >= CFG.gate.T_HIGH:
294294+ if probs.get(author_did, er.trust.get(author_did, 0.0)) >= CFG.gate.T_HIGH:
138295 fa_total += 1
139139- if clean_rate < 1.0:
296296+ if clean_merge == 0:
140297 fa_bad += 1
141298 cur = con.execute("SELECT last_time_us FROM ingest_state WHERE stream='jetstream'").fetchone()
142299 return {
···153310 }
154311155312313313+@app.get("/backfill/status")
314314+def backfill_status(con=Depends(get_con)):
315315+ """Live scrape progress: per-collection record counts in the raw `events` mirror,
316316+ plus what derive() has typed-extracted. The page polls this every couple seconds."""
317317+ from .backfill import COLLECTIONS as expected
318318+319319+ counts = dict(con.execute(
320320+ "SELECT collection, count(*) FROM events GROUP BY collection"
321321+ ).fetchall())
322322+ # Show every planned collection (0 until reached), then any extras actually seen.
323323+ collections = {c: counts.get(c, 0) for c in expected}
324324+ for c, n in counts.items():
325325+ collections.setdefault(c, n)
326326+ return {
327327+ "collections": collections,
328328+ "total": sum(counts.values()),
329329+ "derived": {
330330+ "contributors": con.execute("SELECT count(*) FROM contributors").fetchone()[0],
331331+ "vouches": con.execute("SELECT count(*) FROM vouches").fetchone()[0],
332332+ "pull_requests": con.execute("SELECT count(*) FROM pull_requests").fetchone()[0],
333333+ },
334334+ }
335335+336336+156337# --- static surfaces -------------------------------------------------------
157338@app.get("/")
158339def root():
159340 return FileResponse(STATIC / "triage.html")
341341+342342+343343+@app.get("/backfill")
344344+def backfill_page():
345345+ return FileResponse(STATIC / "backfill.html")
160346161347162348@app.get("/dashboard")
···169355 return FileResponse(STATIC / "leaderboard.html")
170356171357358358+@app.get("/graph.html")
359359+def graph_page():
360360+ return FileResponse(STATIC / "graph.html")
361361+362362+172363app.mount("/static", StaticFiles(directory=STATIC), name="static")
173364174365175366def main() -> None:
176367 import uvicorn
177368178178- uvicorn.run(app, host="127.0.0.1", port=8000)
369369+ uvicorn.run(app, host="127.0.0.1", port=8003)
179370180371181372if __name__ == "__main__":
+75-31
src/trust/backfill.py
···3838RELAY = "https://relay1.us-west.bsky.network"
3939PLC = "https://plc.directory"
40404141-# Collections we know how to derive (must hit a COLLECTION_KINDS substring in config).
4242-COLLECTIONS = ["sh.tangled.repo.pull", "sh.tangled.graph.vouch", "sh.tangled.graph.follow"]
4141+# The WHOLE sh.tangled.* lexicon that actually holds records (live census via
4242+# listReposByCollection; git.refUpdate and bobbin came back empty, so they're omitted).
4343+# Every record is archived raw to `events`; derive() typed-extracts the subset it
4444+# knows (pull / vouch / follow), the rest just lives in the raw mirror.
4545+COLLECTIONS = [
4646+ "sh.tangled.repo.pull", "sh.tangled.repo.pull.status", "sh.tangled.graph.vouch",
4747+ "sh.tangled.graph.follow", "sh.tangled.repo.issue", "sh.tangled.repo",
4848+ "sh.tangled.actor.profile", "sh.tangled.feed.star", "sh.tangled.knot",
4949+ "sh.tangled.knot.member", "sh.tangled.repo.collaborator", "sh.tangled.spindle.member",
5050+ "sh.tangled.repo.artifact", "sh.tangled.pipeline",
5151+]
5252+5353+_PDS_CACHE: dict[str, str | None] = {} # DIDs recur across collections; resolve each once.
435444554556def _get(url: str, tries: int = 4) -> dict:
···586959706071def _pds(did: str) -> str | None:
6161- """DID -> PDS endpoint. Handles did:plc via PLC directory and did:web inline."""
6262- if did.startswith("did:web:"):
6363- doc = _get(f"https://{did[len('did:web:'):]}/.well-known/did.json")
6464- else:
6565- doc = _get(f"{PLC}/{urllib.parse.quote(did)}")
6666- for s in doc.get("service", []):
6767- if s.get("type") == "AtprotoPersonalDataServer":
6868- return s["serviceEndpoint"].rstrip("/")
6969- return None
7272+ """DID -> PDS endpoint (cached). Handles did:plc via PLC directory and did:web inline."""
7373+ if did in _PDS_CACHE:
7474+ return _PDS_CACHE[did]
7575+ endpoint = None
7676+ try:
7777+ if did.startswith("did:web:"):
7878+ doc = _get(f"https://{did[len('did:web:'):]}/.well-known/did.json")
7979+ else:
8080+ doc = _get(f"{PLC}/{urllib.parse.quote(did)}")
8181+ for s in doc.get("service", []):
8282+ if s.get("type") == "AtprotoPersonalDataServer":
8383+ endpoint = s["serviceEndpoint"].rstrip("/")
8484+ break
8585+ except Exception:
8686+ endpoint = None
8787+ _PDS_CACHE[did] = endpoint
8888+ return endpoint
708971907291def _repos(collection: str):
···112131 derive(con, buf)
113132114133115115-def backfill(collections=COLLECTIONS, max_repos: int | None = None) -> dict:
134134+def _fetch_repo(col: str, did: str) -> list[tuple] | None:
135135+ """Network only (thread-safe): all of one repo's records as event tuples, or None on error.
136136+ No DB access here, so WORKERS of these run concurrently without touching the single writer."""
137137+ try:
138138+ pds = _pds(did)
139139+ if not pds:
140140+ return None
141141+ out = []
142142+ for rec in _records(pds, did, col):
143143+ # time_us=0: listRecords has no firehose seq; archive key is (did, collection, rkey).
144144+ rkey = rec["uri"].rsplit("/", 1)[-1]
145145+ out.append((did, 0, "create", col, rkey, json.dumps(rec["value"])))
146146+ return out
147147+ except Exception: # a dead PDS / 400 must not abort the run
148148+ return None
149149+150150+151151+def backfill(collections=COLLECTIONS, max_repos: int | None = None, workers: int = 12) -> dict:
152152+ """Parallel scrape: a thread pool fetches repos concurrently (the slow network part),
153153+ the main thread writes in chunks (DuckDB is single-writer; chunking also frees the file
154154+ between flushes so the dashboard's read-only polls interleave). ponytail: ThreadPoolExecutor
155155+ over a 12-wide pool, not asyncio — the work is I/O-bound and the GIL releases on socket waits."""
156156+ from concurrent.futures import ThreadPoolExecutor
157157+116158 ensure_schema()
117159 counts: dict[str, int] = {}
118160 for col in collections:
119119- repos = records = 0
120120- for did in _repos(col):
121121- if max_repos and repos >= max_repos:
122122- break
123123- repos += 1
124124- pds = _pds(did)
125125- if not pds:
126126- print(f"[backfill] {col} {did}: no PDS, skip")
127127- continue
128128- buf = []
129129- for rec in _records(pds, did, col):
130130- # time_us=0: listRecords has no firehose seq; the archive key is the (did, collection, rkey).
131131- rkey = rec["uri"].rsplit("/", 1)[-1]
132132- buf.append((did, 0, "create", col, rkey, json.dumps(rec["value"])))
133133- _archive_and_derive(buf)
134134- records += len(buf)
135135- print(f"[backfill] {col} {did} -> {len(buf)} records")
161161+ dids = list(_repos(col))
162162+ if max_repos:
163163+ dids = dids[:max_repos]
164164+ records = errors = 0
165165+ buf: list[tuple] = []
166166+ with ThreadPoolExecutor(max_workers=workers) as ex:
167167+ for i, res in enumerate(ex.map(lambda d: _fetch_repo(col, d), dids), 1):
168168+ if res is None:
169169+ errors += 1
170170+ else:
171171+ buf.extend(res)
172172+ records += len(res)
173173+ if len(buf) >= 1000: # flush in chunks: one connection per ~1k records, not per repo
174174+ _archive_and_derive(buf)
175175+ buf = []
176176+ if i % 200 == 0:
177177+ print(f"[backfill] {col}: {i}/{len(dids)} repos, {records} records ({errors} skipped)", flush=True)
178178+ _archive_and_derive(buf)
136179 counts[col] = records
137137- print(f"[backfill] {col}: {records} records from {repos} repos")
180180+ print(f"[backfill] DONE {col}: {records} records from {len(dids)} repos ({errors} skipped)", flush=True)
138181 return counts
139182140183···157200 ap.add_argument("--sample", action="store_true", help="print real records to confirm field shapes, write nothing")
158201 ap.add_argument("--collection", default=None, help="restrict to one NSID (default: all known)")
159202 ap.add_argument("--max-repos", type=int, default=None, help="cap repos per collection (smoke test)")
203203+ ap.add_argument("--workers", type=int, default=12, help="concurrent repo fetchers (default 12)")
160204 args = ap.parse_args()
161205 if args.sample:
162206 sample(args.collection or COLLECTIONS[0])
163207 return
164208 cols = [args.collection] if args.collection else COLLECTIONS
165165- c = backfill(cols, max_repos=args.max_repos)
209209+ c = backfill(cols, max_repos=args.max_repos, workers=args.workers)
166210 print(f"[backfill] done: {c}")
167211168212
+21-1
src/trust/config.py
···5757COLLECTION_KINDS: dict[str, str] = {
5858 "tangled.pull": "pull_request",
5959 "tangled.repo.pull": "pull_request",
6060+ # authoritative merge outcome (.merged/.closed/.open). MUST out-specific the pull rule
6161+ # above; _kind() takes the longest matching needle so this wins over "tangled.repo.pull".
6262+ "tangled.repo.pull.status": "pull_status",
6063 "tangled.vouch": "vouch",
6164 "tangled.graph.vouch": "vouch",
6265 "tangled.denounce": "denounce",
6366 "tangled.pipeline": "ci",
6467 "tangled.spindle": "ci",
6568 "tangled.issue": "issue",
6666- "tangled.star": "star",
6969+ "tangled.feed.star": "star", # real NSID is sh.tangled.feed.star (".feed." breaks "tangled.star")
6770 "tangled.attestation": "attestation", # jurisdiction attestation (6.13); CONFIRM NSID
6871 "tangled.jurisdiction": "attestation",
6972 "bsky.graph.follow": "follow",
···99102100103101104@dataclass
105105+class EmbedConfig:
106106+ """Featherless (OpenAI-compatible) embeddings for the diff/slop-similarity path.
107107+ Model + base_url are env-overridable so a renamed model never needs a code edit."""
108108+109109+ model: str = os.environ.get("EMBED_MODEL", "Qwen/Qwen3-Embedding-4B")
110110+ base_url: str = os.environ.get("FEATHERLESS_BASE_URL", "https://api.featherless.ai/v1")
111111+ api_key_env: str = "FEATHERLESS_API_KEY"
112112+ # MRL truncation: None -> model-native dim (Qwen3-Embedding-4B = 2560). Set to
113113+ # store smaller vectors in DuckDB. Server ignores it if unsupported.
114114+ dimensions: int | None = int(os.environ["EMBED_DIMENSIONS"]) if os.environ.get("EMBED_DIMENSIONS") else None
115115+ batch: int = 32 # inputs per request
116116+ max_chars: int = 24_000 # truncate giant diffs (matches the review token budget)
117117+ timeout: float = 60.0
118118+119119+120120+@dataclass
102121class Config:
103122 gate: GateConfig = field(default_factory=GateConfig)
104123 eigen: EigenConfig = field(default_factory=EigenConfig)
105124 review: ReviewConfig = field(default_factory=ReviewConfig)
125125+ embed: EmbedConfig = field(default_factory=EmbedConfig)
106126 clean_merge_window_days: int = 14 # PRD 6.3 label-mining N
107127108128
+352
src/trust/content.py
···11+"""Tower B (content risk): a calibrated head on FROZEN diff embeddings (PRD Tier 1).
22+33+Identity-blind by construction (PRD constraint 1): the only inputs are the diff
44+embedding (diff_vectors, from the frozen Qwen3 transformer) and PR-intrinsic
55+scalars (additions, deletions, files_touched, discussion_len). NO author
66+DID/handle/history/aggregate ever enters this model -- leakage-free because the
77+features simply do not contain identity.
88+99+Trained on the clean_merge label with a time split; calibrated so the output is a
1010+real P(content safe). content_risk = 1 - P. Served only if it BEATS its baselines
1111+(majority + slop-kNN) on a time split AND a repo holdout -- same beat-the-baseline
1212+guardrail the GNN follows. fusion consults the verdict via load_if_winner().
1313+1414+Model: L2-normalize the embedding -> L2-regularized logistic regression (a linear
1515+probe, the correct choice for frozen embeddings at low data; a tree on the raw
1616+2560-dim vector overfits) -> isotonic calibration on a held-out fold.
1717+1818+Optional: needs `uv pip install -e '.[learned]'` (scikit-learn). fusion imports
1919+this lazily and treats an ImportError / unfit model as "signal unavailable".
2020+"""
2121+2222+from __future__ import annotations
2323+2424+import json
2525+import pickle
2626+2727+import numpy as np
2828+2929+from .config import MODEL_DIR
3030+from .db import connection
3131+from .learned import _reliability # reuse the PRD 6.8 reliability curve (predicted vs actual)
3232+3333+MODEL_PATH = MODEL_DIR / "content.pkl"
3434+VERDICT = MODEL_DIR / "content_verdict.json"
3535+3636+# PR-intrinsic scalars appended after the embedding. Size/shape of the CHANGE only --
3737+# never identity. Order is fixed; it is the contract between _rows and serving.
3838+SCALAR_COLS = ["additions", "deletions", "files_touched", "discussion_len"]
3939+MIN_ROWS = 8 # hard floor: below this a linear probe is noise
4040+4141+4242+# --- feature construction (leakage-free: scaler fit on TRAIN only) ----------
4343+4444+def _l2(E: np.ndarray) -> np.ndarray:
4545+ """Row-wise L2-normalize; zero rows stay zero (clamped denominator)."""
4646+ return E / np.clip(np.linalg.norm(E, axis=1, keepdims=True), 1e-9, None)
4747+4848+4949+def _featurize_fit(emb, scal):
5050+ """Fit the scalar standardizer on these rows; return (X, mean, std)."""
5151+ E = _l2(np.asarray(emb, dtype=float))
5252+ S = np.log1p(np.asarray(scal, dtype=float).clip(min=0)) # size scalars: tame the heavy tail
5353+ mean, std = S.mean(0), np.clip(S.std(0), 1e-9, None)
5454+ return np.hstack([E, (S - mean) / std]), mean, std
5555+5656+5757+def _featurize_apply(emb, scal, mean, std):
5858+ E = _l2(np.asarray(emb, dtype=float))
5959+ S = np.log1p(np.asarray(scal, dtype=float).clip(min=0))
6060+ return np.hstack([E, (S - mean) / std])
6161+6262+6363+# --- data access ------------------------------------------------------------
6464+6565+def _rows(con):
6666+ """(pr_id, embedding, additions, deletions, files_touched, discussion_len,
6767+ clean_merge, opened_at, repo) for every PR with BOTH an embedding and a non-NULL
6868+ clean_merge label, ordered by opened_at so [:k]/[k:] is a leakage-free time split."""
6969+ return con.execute(
7070+ "SELECT v.pr_id, v.embedding, p.additions, p.deletions, p.files_touched, "
7171+ " p.discussion_len, l.clean_merge, p.opened_at, p.repo "
7272+ "FROM diff_vectors v "
7373+ "JOIN pr_labels l USING (pr_id) "
7474+ "JOIN pull_requests p ON p.pr_id = v.pr_id "
7575+ "WHERE l.clean_merge IS NOT NULL "
7676+ "ORDER BY p.opened_at"
7777+ ).fetchall()
7878+7979+8080+def _matrix(rows):
8181+ """rows -> (emb, scal, y, repos). y=1 is clean_merge (content safe)."""
8282+ emb = [r[1] for r in rows]
8383+ scal = [[r[2] or 0, r[3] or 0, r[4] or 0, r[5] or 0] for r in rows]
8484+ y = np.array([int(r[6]) for r in rows], dtype=int)
8585+ repos = [r[8] or "" for r in rows]
8686+ return emb, scal, y, repos
8787+8888+8989+def _require(rows) -> None:
9090+ classes = {int(r[6]) for r in rows}
9191+ if len(rows) < MIN_ROWS or len(classes) < 2:
9292+ raise SystemExit(
9393+ f"content head needs >={MIN_ROWS} embedded+labelled PRs spanning both classes; "
9494+ f"got {len(rows)} rows, classes={classes}. Run Phase 0 (trust.diffs), Phase 1 "
9595+ f"(backfill sh.tangled.repo.pull.status for a positive class), Phase 2 "
9696+ f"(trust.embed --build) first.")
9797+9898+9999+# --- the scorer -------------------------------------------------------------
100100+101101+class ContentScorer:
102102+ """Serves P(content safe) / content_risk for a PR, identity-blind."""
103103+104104+ def __init__(self, clf, iso, mean, std, emb_dim):
105105+ self.clf, self.iso, self.mean, self.std, self.emb_dim = clf, iso, mean, std, emb_dim
106106+107107+ def _prob_safe(self, emb, scalars) -> float | None:
108108+ if emb is None or len(emb) != self.emb_dim: # dim mismatch (different EMBED_DIMENSIONS) -> unavailable
109109+ return None
110110+ X = _featurize_apply([emb], [scalars], self.mean, self.std)
111111+ raw = float(self.clf.predict_proba(X)[0, 1])
112112+ return float(self.iso.predict([raw])[0]) if self.iso is not None else raw
113113+114114+ def risk_for(self, emb, scalars) -> float | None:
115115+ """content_risk from an embedding + scalars directly. None if dim mismatch."""
116116+ p = self._prob_safe(emb, scalars)
117117+ return None if p is None else 1.0 - p
118118+119119+ def risk(self, con, pr_id: str | None = None, diff: str | None = None,
120120+ scalars: list | None = None) -> float | None:
121121+ """content_risk for a PR: prefer the stored embedding (cheap, every PR), else
122122+ embed the diff live (serving a brand-new PR). None if neither is available
123123+ (no embedding + no API key), so the gate treats content as simply absent."""
124124+ emb = None
125125+ if pr_id is not None:
126126+ row = con.execute("SELECT embedding FROM diff_vectors WHERE pr_id=?", [pr_id]).fetchone()
127127+ emb = row[0] if row else None
128128+ if emb is None and diff:
129129+ from . import embed as embed_mod
130130+ v = embed_mod.embed(diff) # None without FEATHERLESS_API_KEY
131131+ emb = v[0] if v else None
132132+ if emb is None:
133133+ return None
134134+ if scalars is None and pr_id is not None:
135135+ srow = con.execute(
136136+ f"SELECT {', '.join(SCALAR_COLS)} FROM pull_requests WHERE pr_id=?", [pr_id]).fetchone()
137137+ scalars = [s or 0 for s in srow] if srow else [0, 0, 0, 0]
138138+ return self.risk_for(emb, scalars or [0, 0, 0, 0])
139139+140140+ def dump(self) -> dict:
141141+ return {"clf": self.clf, "iso": self.iso, "mean": self.mean, "std": self.std,
142142+ "emb_dim": self.emb_dim}
143143+144144+145145+# --- fit / train ------------------------------------------------------------
146146+147147+def _fit(emb, scal, y, split: float = 0.7):
148148+ """Time-ordered emb/scalars/labels -> (ContentScorer, val_stats). Pure, no DB/IO.
149149+ Scaler + model fit on the train fold only; isotonic calibrated on the val fold."""
150150+ from sklearn.isotonic import IsotonicRegression
151151+ from sklearn.linear_model import LogisticRegression
152152+153153+ k = max(2, int(len(emb) * split))
154154+ Xtr, mean, std = _featurize_fit(emb[:k], scal[:k])
155155+ Xval = _featurize_apply(emb[k:], scal[k:], mean, std)
156156+ ytr, yval = y[:k], y[k:]
157157+ if len(set(ytr.tolist())) < 2:
158158+ raise SystemExit("time-split train fold has a single class; need more history spanning both.")
159159+ clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr)
160160+ raw_val = clf.predict_proba(Xval)[:, 1]
161161+ iso = (IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval)
162162+ if len(set(yval.tolist())) > 1 else None) # isotonic needs both classes in the holdout
163163+ cal_val = iso.predict(raw_val) if iso is not None else raw_val
164164+ scorer = ContentScorer(clf, iso, mean, std, len(emb[0]))
165165+ return scorer, {"cal_val": np.asarray(cal_val), "yval": yval, "n_train": k, "n_val": len(yval)}
166166+167167+168168+def _save(scorer: ContentScorer) -> None:
169169+ MODEL_DIR.mkdir(parents=True, exist_ok=True)
170170+ MODEL_PATH.write_bytes(pickle.dumps(scorer.dump()))
171171+ global _loaded, _cache
172172+ _loaded, _cache = False, None # force reload of the fresh model
173173+174174+175175+def train(split: float = 0.7) -> dict:
176176+ """Fit + calibrate + save content.pkl (Phase 3). Returns the reliability curve."""
177177+ with connection(read_only=True) as con:
178178+ rows = _rows(con)
179179+ _require(rows)
180180+ emb, scal, y, _ = _matrix(rows)
181181+ scorer, st = _fit(emb, scal, y, split)
182182+ _save(scorer)
183183+ return {"rows": len(rows), "train": st["n_train"], "val": st["n_val"], "emb_dim": scorer.emb_dim,
184184+ "calibrated": scorer.iso is not None,
185185+ "reliability": _reliability(st["cal_val"], st["yval"]), "model": str(MODEL_PATH)}
186186+187187+188188+# --- eval + beat-the-baseline gate (Phase 5) --------------------------------
189189+190190+def _auc(p_safe, y) -> float | None:
191191+ """ROC-AUC of P(safe) vs the clean label. None if the fold has a single class."""
192192+ from sklearn.metrics import roc_auc_score
193193+ if len(set(np.asarray(y).tolist())) < 2:
194194+ return None
195195+ return float(roc_auc_score(y, p_safe))
196196+197197+198198+def _ap(p_safe, y) -> float | None:
199199+ from sklearn.metrics import average_precision_score
200200+ if len(set(np.asarray(y).tolist())) < 2:
201201+ return None
202202+ return float(average_precision_score(y, p_safe))
203203+204204+205205+def _slop_baseline(emb_val, emb_train, y_train):
206206+ """slop-kNN as a P(safe) proxy: 1 - nearest-cosine to a TRAIN known-bad diff
207207+ (clean_merge=0). Leakage-free (train corpus only), no API (stored vectors)."""
208208+ bad = [e for e, yy in zip(emb_train, y_train) if int(yy) == 0]
209209+ if not bad:
210210+ return None
211211+ B = _l2(np.asarray(bad, dtype=float))
212212+ V = _l2(np.asarray(emb_val, dtype=float))
213213+ sim = (V @ B.T).max(axis=1)
214214+ return 1.0 - np.clip(sim, 0.0, 1.0)
215215+216216+217217+def _eval_fold(train_rows, val_rows) -> dict | None:
218218+ """Fit on train_rows, score val_rows. Returns model + slop-kNN AUC/AP on the val
219219+ fold, or None if either fold lacks both classes (AUC undefined -> can't claim a win)."""
220220+ emb_tr, scal_tr, ytr, _ = _matrix(train_rows)
221221+ emb_va, scal_va, yva, _ = _matrix(val_rows)
222222+ if len(set(ytr.tolist())) < 2 or len(set(yva.tolist())) < 2:
223223+ return None
224224+ from sklearn.linear_model import LogisticRegression
225225+ Xtr, mean, std = _featurize_fit(emb_tr, scal_tr)
226226+ clf = LogisticRegression(C=1.0, class_weight="balanced", max_iter=1000).fit(Xtr, ytr)
227227+ p_safe = clf.predict_proba(_featurize_apply(emb_va, scal_va, mean, std))[:, 1]
228228+ slop = _slop_baseline(emb_va, emb_tr, ytr)
229229+ return {"auc": _auc(p_safe, yva), "ap": _ap(p_safe, yva),
230230+ "slop_auc": (_auc(slop, yva) if slop is not None else None),
231231+ "n_val": len(yva), "pos_val": int(yva.sum())}
232232+233233+234234+def _repo_split(rows, frac: float = 0.3):
235235+ """Hold out whole repos (generalize to UNSEEN repos). Deterministic: the first
236236+ ceil(frac*R) repos by name, so the holdout never overlaps the training repos."""
237237+ repos = sorted({r[8] or "" for r in rows})
238238+ held = set(repos[: max(1, int(len(repos) * frac + 0.999))])
239239+ train = [r for r in rows if (r[8] or "") not in held]
240240+ val = [r for r in rows if (r[8] or "") in held]
241241+ return train, val, held
242242+243243+244244+def _wins(ev) -> bool:
245245+ """A fold is a win if the head beats majority (AUC>0.5) and slop-kNN (where the
246246+ slop baseline is computable). A missing/single-class fold is NOT a win (conservative)."""
247247+ if not ev or ev["auc"] is None:
248248+ return False
249249+ beats_majority = ev["auc"] > 0.5
250250+ beats_slop = ev["slop_auc"] is None or ev["auc"] > ev["slop_auc"]
251251+ return bool(beats_majority and beats_slop)
252252+253253+254254+def _jsonable(o):
255255+ if isinstance(o, (np.floating, np.integer)):
256256+ return float(o)
257257+ return str(o)
258258+259259+260260+def train_and_compare(split: float = 0.7) -> dict:
261261+ """Phase 5: fit + save the head, then write a verdict. content_wins is True only
262262+ if it beats majority + slop-kNN on BOTH a time split and a repo holdout. fusion
263263+ serves the head only when content_wins (load_if_winner)."""
264264+ with connection(read_only=True) as con:
265265+ rows = _rows(con)
266266+ _require(rows)
267267+ emb, scal, y, _ = _matrix(rows)
268268+ scorer, st = _fit(emb, scal, y, split)
269269+ _save(scorer)
270270+271271+ k = max(2, int(len(rows) * split))
272272+ time_eval = _eval_fold(rows[:k], rows[k:])
273273+ repo_tr, repo_va, held = _repo_split(rows)
274274+ repo_eval = _eval_fold(repo_tr, repo_va)
275275+ content_wins = bool(_wins(time_eval) and _wins(repo_eval))
276276+277277+ verdict = {"rows": len(rows), "time_split": time_eval, "repo_holdout": repo_eval,
278278+ "held_repos": len(held), "content_wins": content_wins,
279279+ "reliability": _reliability(st["cal_val"], st["yval"])}
280280+ MODEL_DIR.mkdir(parents=True, exist_ok=True)
281281+ VERDICT.write_text(json.dumps(verdict, indent=2, default=_jsonable))
282282+ return verdict
283283+284284+285285+# --- serving (winner-gated, like gnn.load_if_winner) ------------------------
286286+287287+_cache: ContentScorer | None = None
288288+_loaded = False
289289+290290+291291+def load() -> ContentScorer | None:
292292+ global _cache, _loaded
293293+ if not _loaded:
294294+ _loaded = True
295295+ if MODEL_PATH.exists():
296296+ d = pickle.loads(MODEL_PATH.read_bytes())
297297+ _cache = ContentScorer(d["clf"], d["iso"], d["mean"], d["std"], d["emb_dim"])
298298+ return _cache
299299+300300+301301+def load_if_winner() -> ContentScorer | None:
302302+ """Serving hook used by fusion: the head ONLY if it beat its baselines (else None,
303303+ and the gate keeps Claude-only content -- never serve a model that didn't beat baseline)."""
304304+ if not (VERDICT.exists() and MODEL_PATH.exists()):
305305+ return None
306306+ if not json.loads(VERDICT.read_text()).get("content_wins"):
307307+ return None
308308+ return load()
309309+310310+311311+def main() -> None:
312312+ v = train_and_compare()
313313+ ts, rh = v["time_split"] or {}, v["repo_holdout"] or {}
314314+ print(f"[content] {v['rows']} labelled+embedded PRs")
315315+ print(f"[content] time-split: AUC={ts.get('auc')} slop_auc={ts.get('slop_auc')} "
316316+ f"n_val={ts.get('n_val')} pos={ts.get('pos_val')}")
317317+ print(f"[content] repo-holdout ({v['held_repos']} repos): AUC={rh.get('auc')} "
318318+ f"slop_auc={rh.get('slop_auc')} n_val={rh.get('n_val')} pos={rh.get('pos_val')}")
319319+ print(f"[content] content_wins={v['content_wins']} -> "
320320+ + ("SERVED (beats majority + slop-kNN on time AND repo holdout)" if v["content_wins"]
321321+ else "NOT served; gate keeps Claude-only content (beat-the-baseline guardrail)"))
322322+ print("[content] reliability (predicted vs actual P(safe)):")
323323+ for b in v["reliability"]:
324324+ print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
325325+326326+327327+def demo() -> None:
328328+ """Self-check (no DB/API): synthetic embeddings separable by label -> a held-out
329329+ bad diff out-risks a clean one; print the reliability curve."""
330330+ rng = np.random.RandomState(0) # Math.random-free determinism; seeded numpy is fine
331331+ D = 16
332332+ emb, scal, y = [], [], []
333333+ for i in range(60): # time-ordered, classes alternate so both land in each fold
334334+ clean = i % 2
335335+ base = np.zeros(D); base[0] = 1.0 if clean else -1.0
336336+ emb.append((base + rng.normal(0, 0.3, D)).tolist())
337337+ scal.append([rng.randint(1, 200), rng.randint(0, 100), rng.randint(1, 10), rng.randint(0, 500)])
338338+ y.append(clean)
339339+ scorer, st = _fit(emb, scal, np.array(y), split=0.7)
340340+ r_clean = scorer.risk_for([1.0] + [0.0] * (D - 1), [50, 10, 3, 100])
341341+ r_bad = scorer.risk_for([-1.0] + [0.0] * (D - 1), [50, 10, 3, 100])
342342+ print(f"content_risk: bad={r_bad:.3f} clean={r_clean:.3f}")
343343+ for b in _reliability(st["cal_val"], st["yval"]):
344344+ print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
345345+ assert scorer.risk_for([1.0] * D, [0, 0, 0, 0]) is not None
346346+ assert scorer.risk_for([1.0] * (D + 1), [0, 0, 0, 0]) is None, "dim mismatch must be 'unavailable', not a crash"
347347+ assert r_bad > r_clean, "content head must score a known-bad diff riskier than a clean one"
348348+ print("ok")
349349+350350+351351+if __name__ == "__main__":
352352+ demo()
+39-8
src/trust/db.py
···3838 pr_id VARCHAR PRIMARY KEY, reverted BOOLEAN DEFAULT FALSE,
3939 patched_same_lines_within_n_days BOOLEAN DEFAULT FALSE
4040);
4141+-- authoritative pull outcome from sh.tangled.repo.pull.status (public record). Separate
4242+-- table so a status arriving before its pull record (ordering not guaranteed) is never lost.
4343+CREATE TABLE IF NOT EXISTS pull_status (
4444+ pr_id VARCHAR PRIMARY KEY, status VARCHAR, updated_at TIMESTAMP DEFAULT now()
4545+);
4146CREATE TABLE IF NOT EXISTS scores (
4247 did VARCHAR, as_of TIMESTAMP DEFAULT now(), structural_trust DOUBLE,
4348 content_risk DOUBLE, calibrated_prob DOUBLE, decision VARCHAR, explanation_json JSON
···4752CREATE TABLE IF NOT EXISTS seeds (did VARCHAR PRIMARY KEY);
4853-- repo tiering (PRD 6.13): sensitive/dual-use repos gate fast-lane on an attestation
4954CREATE TABLE IF NOT EXISTS repo_tiers (repo VARCHAR PRIMARY KEY, tier VARCHAR DEFAULT 'public');
5555+-- star graph (sh.tangled.feed.star): starrer -> repo owner. NOT sybil-resistant on its
5656+-- own (a star is cheap), so it's a model FEATURE, never a trust-graph edge. Keyed by
5757+-- (starrer, owner) so one DID endorsing an owner counts once, not once per repo.
5858+CREATE TABLE IF NOT EXISTS stars (
5959+ starrer_did VARCHAR, owner_did VARCHAR, created_at TIMESTAMP,
6060+ PRIMARY KEY (starrer_did, owner_did)
6161+);
5062-- contributor-issued jurisdiction attestations (signed records); declared, never inferred
5163CREATE TABLE IF NOT EXISTS attestations (
5264 did VARCHAR, jurisdiction VARCHAR, method VARCHAR, created_at TIMESTAMP,
···5668CREATE TABLE IF NOT EXISTS published_records (
5769 did VARCHAR, as_of TIMESTAMP, uri VARCHAR, PRIMARY KEY (did, as_of)
5870);
7171+-- diff-embedding corpus (PRD 6.12 / section 4): near-duplicate detection of known-bad
7272+-- patterns. Vector search stays in DuckDB (list_cosine_similarity) -- no separate engine.
7373+CREATE TABLE IF NOT EXISTS diff_vectors (pr_id VARCHAR PRIMARY KEY, label VARCHAR, embedding DOUBLE[]);
5974"""
60756176# Per-DID feature view (PRD 6.3/6.5). eigentrust_score + bsky_* are joined in
···6580WITH pr AS (
6681 SELECT p.*, COALESCE(f.reverted, FALSE) AS reverted,
6782 COALESCE(f.patched_same_lines_within_n_days, FALSE) AS patched_quick,
6868- -- clean_merge label (PRD 6.3); NULL when too recent to have elapsed the N-day window
8383+ -- merge outcome: pull_status (authoritative, public record) overrides the PDS
8484+ -- record's merged field, which is always NULL on real sh.tangled.repo.pull.
8585+ COALESCE(ps.status LIKE '%.merged', p.merged, FALSE) AS is_merged,
8686+ COALESCE(ps.status LIKE '%.closed', p.closed_unmerged, FALSE) AS is_closed,
8787+ -- clean_merge label (PRD 6.3); NULL when too recent to have elapsed the N-day window.
8888+ -- CI relaxed: pass/fail isn't a public Tangled record, and a `merged` PR already
8989+ -- cleared the merge-gate (which runs CI). So merged-and-not-CI-failed counts; only an
9090+ -- explicit ci_status='failed' disqualifies. Tighten if a CI verdict source is wired.
6991 CASE
7092 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
7171- WHEN p.merged AND p.ci_status = 'passed'
9393+ WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE)
9494+ AND COALESCE(p.ci_status, 'passed') <> 'failed'
7295 AND NOT COALESCE(f.reverted, FALSE)
7396 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
7497 ELSE 0
7598 END AS clean_merge
7676- FROM pull_requests p LEFT JOIN pr_followups f USING (pr_id)
9999+ FROM pull_requests p
100100+ LEFT JOIN pr_followups f USING (pr_id)
101101+ LEFT JOIN pull_status ps USING (pr_id)
77102)
78103SELECT
79104 c.did,
80105 date_diff('day', c.did_created_at, now()) AS did_age_days,
8181- COUNT(*) FILTER (WHERE pr.merged) AS merged_pr_count,
106106+ COUNT(*) FILTER (WHERE pr.is_merged) AS merged_pr_count,
82107 COALESCE(AVG(CASE WHEN pr.reverted THEN 1.0 ELSE 0.0 END), 0) AS revert_rate,
83108 COALESCE(AVG(CASE WHEN pr.ci_status='passed' THEN 1.0 ELSE 0.0 END), 0) AS ci_pass_rate,
8484- COALESCE(AVG(CASE WHEN pr.closed_unmerged THEN 1.0 ELSE 0.0 END), 0) AS close_without_merge_ratio,
109109+ COALESCE(AVG(CASE WHEN pr.is_closed THEN 1.0 ELSE 0.0 END), 0) AS close_without_merge_ratio,
85110 COALESCE(AVG(pr.additions + pr.deletions), 0) AS mean_diff_size,
86111 COALESCE(AVG(pr.files_touched), 0) AS mean_files_touched,
87112 COALESCE(SUM(pr.additions + pr.deletions), 0) AS churn,
88113 COALESCE(AVG(pr.discussion_len), 0) AS mean_discussion_len,
89114 (SELECT COUNT(*) FROM vouches v WHERE v.subject_did = c.did AND v.polarity < 0) AS denounce_count,
115115+ -- raw star count (advisory feature, gameable); the sybil-resistant trust-weighted
116116+ -- version (stars_trust) is computed in Python and rides on the EigenResult.
117117+ (SELECT COUNT(*) FROM stars st WHERE st.owner_did = c.did) AS stars_received,
90118 AVG(pr.clean_merge) AS clean_merge_rate
91119FROM contributors c
92120LEFT JOIN pr ON pr.author_did = c.did
···100128SELECT p.pr_id, p.author_did, p.opened_at,
101129 CASE
102130 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
103103- WHEN p.merged AND p.ci_status = 'passed'
131131+ WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE)
132132+ AND COALESCE(p.ci_status, 'passed') <> 'failed'
104133 AND NOT COALESCE(f.reverted, FALSE)
105134 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
106135 ELSE 0
107136 END AS clean_merge
108108-FROM pull_requests p LEFT JOIN pr_followups f USING (pr_id);
137137+FROM pull_requests p
138138+LEFT JOIN pr_followups f USING (pr_id)
139139+LEFT JOIN pull_status ps USING (pr_id);
109140"""
110141111142···124155125156126157@contextmanager
127127-def connection(read_only: bool = False, attempts: int = 40, delay: float = 0.25):
158158+def connection(read_only: bool = False, attempts: int = 500, delay: float = 0.02):
128159 """Short-lived connection with retry on DuckDB's cross-process file lock.
129160130161 DuckDB allows only one read-write process; a held lock blocks every other
+150
src/trust/diffs.py
···11+"""Phase 0: fetch PR diffs (patchBlobs) into pull_requests.diff_text.
22+33+The sh.tangled.repo.pull record carries its diff as a gzipped blob CID
44+(rounds[-1].patchBlob.ref.$link), NOT inline. This resolves each pull's blob from
55+the author's PDS, gunzips it to unified-diff text, and stores it -- the single
66+highest-leverage unblock: it lights up the content head, Claude review, AND the
77+slop-kNN, all of which are dead without diffs.
88+99+Reuses backfill's _pds plumbing; the network fetch fans out over a thread pool,
1010+DB writes go in chunks (DuckDB is single-writer). Idempotent/resumable: only
1111+fetches pulls whose diff_text IS NULL, so re-running just resumes. Pause
1212+ingest/api/backfill first or the writes crawl on the single-writer lock.
1313+"""
1414+1515+from __future__ import annotations
1616+1717+import argparse
1818+import gzip
1919+import json
2020+import urllib.parse
2121+import urllib.request
2222+from concurrent.futures import ThreadPoolExecutor
2323+2424+from .backfill import _pds
2525+from .db import connection, ensure_schema
2626+2727+MAX_DIFF_CHARS = 50_000 # cap stored text; embeddings/Claude truncate well below this anyway
2828+MAX_BLOB_BYTES = 5_000_000 # a patch blob is normally < 100 KB; skip absurd ones, never OOM
2929+3030+3131+def _cid_for(record_json: str) -> str | None:
3232+ """rounds[-1].patchBlob.ref.$link from an archived sh.tangled.repo.pull record.
3333+ The LAST round is the final proposed change -- embed/review/store that one."""
3434+ try:
3535+ rounds = (json.loads(record_json) or {}).get("rounds") or []
3636+ pb = (rounds[-1].get("patchBlob") if rounds else None) or {}
3737+ return (pb.get("ref") or {}).get("$link")
3838+ except Exception:
3939+ return None
4040+4141+4242+def _get_blob(pds: str, did: str, cid: str) -> bytes | None:
4343+ """com.atproto.sync.getBlob -> raw bytes (the gzipped patch). None on any error
4444+ (a dead PDS / missing blob / 4xx must never abort the run)."""
4545+ q = urllib.parse.urlencode({"did": did, "cid": cid})
4646+ url = f"{pds}/xrpc/com.atproto.sync.getBlob?{q}"
4747+ try:
4848+ req = urllib.request.Request(url, headers={"User-Agent": "trust-diffs"})
4949+ with urllib.request.urlopen(req, timeout=30) as r:
5050+ return r.read(MAX_BLOB_BYTES + 1) # +1 so an oversized blob is detectable, not silently capped
5151+ except Exception:
5252+ return None
5353+5454+5555+def _diff_text(blob: bytes | None) -> str | None:
5656+ """Gunzip a patch blob to unified-diff text, capped. None if empty, oversized,
5757+ or not gzip/decodable (skip gracefully)."""
5858+ if not blob or len(blob) > MAX_BLOB_BYTES:
5959+ return None
6060+ try:
6161+ text = gzip.decompress(blob).decode("utf-8", "replace")
6262+ except Exception: # bad magic (BadGzipFile/OSError), truncation (EOFError), or a corrupt deflate
6363+ return None # body (zlib.error -- NOT an OSError) all skip this one blob, never abort the run
6464+ return text[:MAX_DIFF_CHARS] or None
6565+6666+6767+def _fetch_one(work: tuple[str, str, str]) -> tuple[str, str] | None:
6868+ """(pr_id, author_did, cid) -> (pr_id, diff_text), or None. Network only, so
6969+ WORKERS of these run concurrently without touching the single DB writer."""
7070+ pr_id, did, cid = work
7171+ pds = _pds(did)
7272+ if not pds:
7373+ return None
7474+ text = _diff_text(_get_blob(pds, did, cid))
7575+ return (pr_id, text) if text else None
7676+7777+7878+def _store(buf: list[tuple[str, str]]) -> None:
7979+ """Write a chunk of (pr_id, diff_text) under one short-lived write lock."""
8080+ if not buf:
8181+ return
8282+ with connection(read_only=False) as con:
8383+ con.executemany("UPDATE pull_requests SET diff_text=? WHERE pr_id=?",
8484+ [(text, pr_id) for pr_id, text in buf])
8585+8686+8787+def fetch_diffs(limit: int | None = None, workers: int = 12, chunk: int = 200) -> dict:
8888+ """Fetch+store diffs for every pull still missing one. The CID lives in the
8989+ archived events.record (joined back to the pull by the derive() pr_id convention
9090+ did||/||collection||/||rkey)."""
9191+ ensure_schema()
9292+ with connection(read_only=True) as con:
9393+ rows = con.execute(
9494+ "SELECT p.pr_id, p.author_did, e.record FROM pull_requests p "
9595+ "JOIN events e ON p.pr_id = e.did || '/' || e.collection || '/' || e.rkey "
9696+ "WHERE p.diff_text IS NULL AND e.collection = 'sh.tangled.repo.pull'"
9797+ + (f" LIMIT {int(limit)}" if limit else "")
9898+ ).fetchall()
9999+ work = [(pr_id, did, cid) for pr_id, did, rec in rows if (cid := _cid_for(rec))]
100100+ fetched = skipped = 0
101101+ buf: list[tuple[str, str]] = []
102102+ with ThreadPoolExecutor(max_workers=workers) as ex:
103103+ for i, res in enumerate(ex.map(_fetch_one, work), 1):
104104+ if res is None:
105105+ skipped += 1
106106+ else:
107107+ buf.append(res)
108108+ fetched += 1
109109+ if len(buf) >= chunk:
110110+ _store(buf)
111111+ buf = []
112112+ if i % 500 == 0:
113113+ print(f"[diffs] {i}/{len(work)} pulls, {fetched} stored ({skipped} skipped)", flush=True)
114114+ _store(buf)
115115+ out = {"candidates": len(rows), "with_cid": len(work), "stored": fetched, "skipped": skipped}
116116+ print(f"[diffs] DONE: {out}", flush=True)
117117+ return out
118118+119119+120120+def demo() -> None:
121121+ """Offline self-check: a gzipped unified diff round-trips through _diff_text (and
122122+ a non-gzip blob is skipped, not crashed), and _cid_for pulls the CID from a pull
123123+ record. No network -- the live fetch path is exercised by `python -m trust.diffs`."""
124124+ sample = "diff --git a/x.py b/x.py\n@@ -1 +1 @@\n-old\n+new\n"
125125+ assert _diff_text(gzip.compress(sample.encode())) == sample, "gunzip round-trip failed"
126126+ assert "@@" in sample and "diff" in sample
127127+ assert _diff_text(b"not gzip at all") is None, "non-gzip blob must be skipped, not crash"
128128+ assert _diff_text(b"") is None
129129+ # valid gzip magic + garbage deflate body -> zlib.error (which is NOT an OSError); must skip, not crash.
130130+ assert _diff_text(b"\x1f\x8b\x08\x00" + bytes(20)) is None, "corrupt deflate body must be skipped, not abort the run"
131131+ rec = json.dumps({"rounds": [{"patchBlob": {"ref": {"$link": "bafyCID"}, "mimeType": "application/gzip"}}]})
132132+ assert _cid_for(rec) == "bafyCID", "CID extraction from pull record failed"
133133+ assert _cid_for("{}") is None and _cid_for("not json") is None
134134+ print("gunzip round-trip + CID parse ok")
135135+136136+137137+def main() -> None:
138138+ ap = argparse.ArgumentParser(description="Fetch PR diff patchBlobs into pull_requests.diff_text")
139139+ ap.add_argument("--limit", type=int, default=None, help="cap pulls fetched this run (smoke test)")
140140+ ap.add_argument("--workers", type=int, default=12, help="concurrent blob fetchers (default 12)")
141141+ ap.add_argument("--demo", action="store_true", help="run the offline self-check and exit")
142142+ args = ap.parse_args()
143143+ if args.demo:
144144+ demo()
145145+ return
146146+ fetch_diffs(limit=args.limit, workers=args.workers)
147147+148148+149149+if __name__ == "__main__":
150150+ main()
+14-2
src/trust/eigentrust.py
···991010import math
1111from collections import deque
1212-from dataclasses import dataclass
1212+from dataclasses import dataclass, field
13131414import numpy as np
1515from scipy import sparse
···2323 index: dict[str, int]
2424 seeds: list[str]
2525 _adj: dict[str, list[str]] # positive-edge adjacency for BFS paths
2626+ stars_trust: dict[str, float] = field(default_factory=dict) # owner -> Σ trust[starrer] (sybil-resistant stars)
26272728 def path_from_seed(self, did: str, max_hops: int = 4) -> list[str]:
2829 """Shortest positive-vouch path seed -> did, for the explanation (PRD 6.4)."""
···112113113114 hi = t.max() or 1.0
114115 trust = {d: float(t[i] / hi) for d, i in index.items()} # max-normalize to [0,1]
115115- return EigenResult(trust, index, seeds, adj)
116116+117117+ # Trust-weighted stars: a star counts only as much as the starrer is itself trusted,
118118+ # so sybil star-farms (trust ~0) contribute ~nothing. Turns a gameable popularity count
119119+ # into a sybil-resistant reputation feature, same philosophy as the vouch graph.
120120+ stars_trust: dict[str, float] = {}
121121+ try:
122122+ for owner, starrer in con.execute("SELECT owner_did, starrer_did FROM stars").fetchall():
123123+ stars_trust[owner] = stars_trust.get(owner, 0.0) + trust.get(starrer, 0.0)
124124+ except Exception:
125125+ pass # stars table absent on a pre-stars DB -> feature stays 0 until schema upgrades
126126+127127+ return EigenResult(trust, index, seeds, adj, stars_trust)
116128117129118130def demo() -> None:
+211
src/trust/embed.py
···11+"""Embeddings via the Featherless API (OpenAI-compatible) using Qwen3-Embedding-4B.
22+33+Feeds the diff-embedding / slop-similarity path (PRD 6.x): embed a PR diff, then
44+cosine-k-NN it against known-bad diffs. Returns None when no FEATHERLESS_API_KEY
55+is set — exactly like review.py, so the caller treats it as "signal unavailable"
66+rather than crashing.
77+88+OpenAI-compatible, so this is a thin POST to /embeddings; no openai SDK needed.
99+"""
1010+1111+from __future__ import annotations
1212+1313+import math
1414+import os
1515+1616+from .config import CFG
1717+1818+1919+def _key() -> str | None:
2020+ return os.environ.get(CFG.embed.api_key_env)
2121+2222+2323+def embed(texts: str | list[str], model: str | None = None) -> list[list[float]] | None:
2424+ """Embed text(s) -> list of float vectors, input order preserved.
2525+2626+ Returns None if no API key is configured. Long inputs are truncated to
2727+ CFG.embed.max_chars; sent in batches of CFG.embed.batch.
2828+ """
2929+ if _key() is None:
3030+ return None
3131+ if isinstance(texts, str):
3232+ texts = [texts]
3333+ if not texts:
3434+ return []
3535+3636+ import httpx
3737+3838+ model = model or CFG.embed.model
3939+ out: list[list[float]] = []
4040+ with httpx.Client(base_url=CFG.embed.base_url, timeout=CFG.embed.timeout,
4141+ headers={"Authorization": f"Bearer {_key()}"}) as client:
4242+ for i in range(0, len(texts), CFG.embed.batch):
4343+ chunk = [t[: CFG.embed.max_chars] for t in texts[i : i + CFG.embed.batch]]
4444+ body: dict = {"model": model, "input": chunk}
4545+ if CFG.embed.dimensions:
4646+ body["dimensions"] = CFG.embed.dimensions # MRL truncation, if the server honors it
4747+ r = client.post("/embeddings", json=body)
4848+ r.raise_for_status()
4949+ # /embeddings does not guarantee order; sort by the returned index.
5050+ data = sorted(r.json()["data"], key=lambda d: d["index"])
5151+ out.extend(d["embedding"] for d in data)
5252+ return out
5353+5454+5555+def cosine(a: list[float], b: list[float]) -> float:
5656+ """Cosine similarity. 0.0 if either vector is zero-length."""
5757+ dot = sum(x * y for x, y in zip(a, b))
5858+ na = math.sqrt(sum(x * x for x in a))
5959+ nb = math.sqrt(sum(y * y for y in b))
6060+ return dot / (na * nb) if na and nb else 0.0
6161+6262+6363+def _max_cosine(qv: list[float], vecs: list[list[float]]) -> float | None:
6464+ """Nearest-neighbour similarity of qv to a corpus, clamped to [0,1]. None if empty."""
6565+ m = max((cosine(qv, v) for v in vecs), default=None)
6666+ return None if m is None else max(0.0, min(1.0, m))
6767+6868+6969+# --- diff/slop-similarity path (PRD 6.12) ---------------------------------
7070+# Embed every scraped PR diff once into diff_vectors; "known-bad" is decided at
7171+# query time by joining pr_labels (clean_merge=0), so re-labelling never needs a
7272+# re-embed. Search is a cosine scan in Python. ponytail: linear scan -- swap to
7373+# DuckDB's list_cosine_similarity / VSS HNSW index if the corpus grows large.
7474+7575+7676+def index_diffs(limit: int = 256) -> int:
7777+ """Embed up to `limit` PR diffs not yet in diff_vectors (one pass). Idempotent
7878+ and resumable: pr_id NOT IN diff_vectors means a re-run only embeds new diffs,
7979+ so call it repeatedly while the scraper fills pull_requests. Returns the count
8080+ embedded; 0 when caught up or no API key (signal stays absent, like review.py).
8181+8282+ Opens its own short-lived connections: read the batch, release, embed off-lock
8383+ (the network call is slow), then take the write lock only for the insert -- so a
8484+ concurrently-running scraper is never blocked while we wait on Featherless."""
8585+ if _key() is None:
8686+ return 0
8787+ from .db import connection
8888+8989+ with connection(read_only=True) as con:
9090+ rows = con.execute(
9191+ "SELECT pr_id, diff_text FROM pull_requests "
9292+ "WHERE diff_text IS NOT NULL AND length(diff_text) > 0 "
9393+ "AND pr_id NOT IN (SELECT pr_id FROM diff_vectors) LIMIT ?",
9494+ [limit],
9595+ ).fetchall()
9696+ if not rows:
9797+ return 0
9898+ vecs = embed([d for _, d in rows]) # network call, NO db lock held
9999+ if vecs is None:
100100+ return 0
101101+ with connection(read_only=False) as con:
102102+ con.executemany(
103103+ "INSERT INTO diff_vectors (pr_id, label, embedding) VALUES (?, 'pr', ?) "
104104+ "ON CONFLICT (pr_id) DO UPDATE SET embedding = excluded.embedding",
105105+ [[pr_id, v] for (pr_id, _), v in zip(rows, vecs)],
106106+ )
107107+ return len(rows)
108108+109109+110110+def slop_score(con, diff: str, exclude_pr_id: str | None = None) -> float | None:
111111+ """Similarity of `diff` to the nearest *currently* known-bad diff (clean_merge=0),
112112+ in [0,1]. None if the diff is empty, no key is set, or nothing bad is embedded yet.
113113+ Advisory only -- fed to Claude as a machine finding; never decides a PR on its own."""
114114+ if not diff:
115115+ return None
116116+ q = embed(diff)
117117+ if not q: # None (no key) or empty
118118+ return None
119119+ sql = ("SELECT d.embedding FROM diff_vectors d JOIN pr_labels l USING (pr_id) "
120120+ "WHERE l.clean_merge = 0")
121121+ params: list = []
122122+ if exclude_pr_id: # a PR must not match itself
123123+ sql += " AND d.pr_id <> ?"
124124+ params.append(exclude_pr_id)
125125+ vecs = [r[0] for r in con.execute(sql, params).fetchall()]
126126+ return _max_cosine(q[0], vecs)
127127+128128+129129+def demo() -> None:
130130+ """Offline: cosine identities. Live (key set): near-duplicate code embeds
131131+ closer than unrelated prose."""
132132+ v = [1.0, 2.0, 3.0]
133133+ assert abs(cosine(v, v) - 1.0) < 1e-9, "cosine(v,v) must be 1"
134134+ assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9, "orthogonal -> 0"
135135+136136+ # slop-path ranking: a near-duplicate of a corpus vector outranks an unrelated one.
137137+ corpus = [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]
138138+ assert _max_cosine([1.0, 0.05, 0.0], corpus) > 0.99, "near-dup -> ~1"
139139+ assert _max_cosine([0.0, 0.0, 1.0], corpus) < 0.2, "unrelated -> low"
140140+ assert _max_cosine([1.0], []) is None, "empty corpus -> None"
141141+142142+ if _key() is None:
143143+ print(f"cosine ok; no {CFG.embed.api_key_env} -> live embedding skipped")
144144+ return
145145+ a, b, c = embed([
146146+ "def add(x, y): return x + y",
147147+ "def sum_two(p, q): return p + q",
148148+ "the cat sat quietly on the warm windowsill",
149149+ ])
150150+ assert len(a) > 0, "empty embedding"
151151+ assert cosine(a, b) > cosine(a, c), "near-duplicate code should embed closer than prose"
152152+ print(f"dim={len(a)} sim(code,code)={cosine(a, b):.3f} > sim(code,prose)={cosine(a, c):.3f} ok")
153153+154154+155155+def main() -> None:
156156+ import argparse
157157+ import sys
158158+159159+ ap = argparse.ArgumentParser(description="Featherless/Qwen embeddings of scraped Tangled diffs")
160160+ ap.add_argument("text", nargs="*", help="strings to embed; runs the self-check if omitted")
161161+ ap.add_argument("--build", action="store_true",
162162+ help="embed all PR diffs into diff_vectors (idempotent; safe to re-run as the scrape fills)")
163163+ ap.add_argument("--chunk", type=int, default=256, help="diffs per pass / write-lock window")
164164+ ap.add_argument("--limit", type=int, default=None, help="stop after this many this run (quick test)")
165165+ ap.add_argument("--watch", action="store_true",
166166+ help="keep embedding new diffs as the scraper adds them (sleep when caught up)")
167167+ ap.add_argument("--interval", type=float, default=10.0, help="--watch poll seconds")
168168+ args = ap.parse_args()
169169+ if args.build:
170170+ import time
171171+172172+ from .db import connection, ensure_schema
173173+174174+ ensure_schema()
175175+ if _key() is None:
176176+ print(f"[embed] no {CFG.embed.api_key_env} -> nothing embedded (slop signal stays absent)")
177177+ return
178178+ total = 0
179179+ while True:
180180+ # index_diffs manages its own short-lived connections (read -> embed off-lock -> write)
181181+ n = index_diffs(limit=args.chunk)
182182+ if n:
183183+ total += n
184184+ print(f"[embed] {total} diffs embedded", flush=True)
185185+ if args.limit and total >= args.limit:
186186+ break
187187+ continue
188188+ if args.watch: # caught up; wait for the scraper to add more
189189+ time.sleep(args.interval)
190190+ continue
191191+ break
192192+ with connection(read_only=True) as con:
193193+ done, remaining = con.execute(
194194+ "SELECT (SELECT count(*) FROM diff_vectors), "
195195+ "(SELECT count(*) FROM pull_requests WHERE diff_text IS NOT NULL "
196196+ " AND length(diff_text) > 0 AND pr_id NOT IN (SELECT pr_id FROM diff_vectors))"
197197+ ).fetchone()
198198+ print(f"[embed] done: +{total} this run; {done} embedded, {remaining} remaining ({CFG.embed.model})")
199199+ return
200200+ if not args.text:
201201+ demo()
202202+ return
203203+ vecs = embed(args.text)
204204+ if vecs is None:
205205+ sys.exit(f"set {CFG.embed.api_key_env} to embed")
206206+ for t, v in zip(args.text, vecs):
207207+ print(f"[{len(v)}d] {v[:4]}... :: {t[:60]}")
208208+209209+210210+if __name__ == "__main__":
211211+ main()
+73-2
src/trust/fusion.py
···99import json
10101111from .config import CFG
1212-from . import eigentrust, review as review_mod
1212+from . import eigentrust, review as review_mod, vouchsafe
131314141515def decide(structural_trust: float, content: dict | None, cfg=CFG.gate, *,
···5353 return structural_trust * (1.0 - content["content_risk"])
545455555656+def _fold_content(content: dict | None, model_risk: float | None) -> dict | None:
5757+ """Phase 4: fold the Tower B head risk into the content signal MONOTONICALLY.
5858+5959+ Combine model + Claude as max(model_risk, claude_risk): can raise risk, never lower
6060+ it, so content still only penalizes (never lifts an untrusted DID). When Claude was
6161+ skipped (content is None), the head alone synthesizes the content signal so the gate
6262+ still sees content for this PR -- the head covers EVERY PR cheaply, unlike Claude."""
6363+ if model_risk is None:
6464+ return content
6565+ if content is None:
6666+ return {"content_risk": model_risk, "review_recommended": False, "flags": [],
6767+ "summary": f"content-head risk {model_risk:.2f} (no Claude review)"}
6868+ return {**content, "content_risk": max(content["content_risk"], model_risk)}
6969+7070+5671def _scorer():
5772 """Load the M5 LightGBM scorer if trained AND lightgbm is installed; else None."""
5873 try:
···7186 return gnn.load_if_winner()
728773888989+def _slop(con, diff, pr_id):
9090+ """Best-effort diff/slop similarity (6.12). Optional path -- never fail a score on it
9191+ (no key, empty corpus, or a network blip all collapse to None = signal unavailable)."""
9292+ if not diff:
9393+ return None
9494+ try:
9595+ from . import embed
9696+ return embed.slop_score(con, diff, exclude_pr_id=pr_id)
9797+ except Exception:
9898+ return None
9999+100100+101101+def _content_head(con, pr_id, diff):
102102+ """Tower B learned content risk for this PR (PRD Tier 1), identity-blind. Winner-gated:
103103+ None unless the head beat its baselines (content.load_if_winner) and an embedding (or a
104104+ live-embeddable diff) exists. Best-effort -- never fail a score on it. Unlike Claude, this
105105+ covers EVERY PR cheaply, so the gate gets a content signal even when review is skipped."""
106106+ try:
107107+ from . import content
108108+ scorer = content.load_if_winner()
109109+ if scorer is None:
110110+ return None
111111+ return scorer.risk(con, pr_id=pr_id, diff=diff)
112112+ except Exception:
113113+ return None
114114+115115+74116def structural_for(did, er: eigentrust.EigenResult, feats: dict | None):
75117 """Calibrated P(clean) for the gate. Precedence: winning GNN (M6) -> LightGBM (M5)
76118 -> raw EigenTrust (M3). The GNN is used only if it provably beat the baseline."""
···99141 top_factors.append(f"revert rate {feats['revert_rate']:.0%}")
100142 if feats.get("denounce_count"):
101143 top_factors.append(f"{int(feats['denounce_count'])} denounce(s)")
144144+ if feats.get("stars_received"): # advisory popularity; trust-weighted version lives in the model
145145+ top_factors.append(f"{int(feats['stars_received'])} star(s) received")
102146 if model_factors: # M5 LightGBM TreeSHAP contributions (6.9)
103147 for mf in model_factors:
104148 top_factors.append(f"{mf['feature']} ({mf['contribution'] + 0.0:+.3f})")
···139183 tier = repo_tier(con, repo) # 6.13 repo tiering
140184 attested = is_attested(con, did)
141185 sensitive = tier == "sensitive"
186186+ slop = _slop(con, diff, pr_id) # 6.12 diff/slop similarity to known-bad (advisory)
187187+ model_risk = _content_head(con, pr_id, diff) # Tower B head: content risk for ALL PRs (winner-gated)
188188+ scan = vouchsafe.scan_diff(diff) # 6.12 static secret/SAST findings (advisory, redacted)
189189+ machine = {"slop_similarity_to_known_bad": round(slop, 3)} if slop is not None else None
190190+ if model_risk is not None:
191191+ machine = (machine or {}) | {"content_head_risk": round(model_risk, 3)}
192192+ if scan:
193193+ machine = (machine or {}) | {"static_scan_findings": scan}
142194 content = None
143195 if run_review and should_review(structural, sensitive):
144144- content = review_mod.review_pr(diff or "", title=repo or "", discussion="")
196196+ content = review_mod.review_pr(diff or "", title=repo or "", discussion="",
197197+ machine_findings=machine)
198198+ content = _fold_content(content, model_risk) # Phase 4: monotone fold, content only penalizes
145199146200 decision = decide(structural, content, attestation_required=sensitive, attested=attested)
147201 prob = displayed_prob(structural, content)
148202 gate_note = ("sensitive-tier repo: a valid jurisdiction attestation is required before "
149203 "fast-lane/merge (6.13)") if sensitive and not attested else None
150204 reason = build_reason(did, structural, content, er, feats, model_factors, gate_note)
205205+ if model_risk is not None: # Tower B factor, surfaced like the others (Phase 4)
206206+ reason["content_head_risk"] = round(model_risk, 3)
207207+ reason["top_factors"].append(f"content-head risk {model_risk:.0%}")
208208+ if slop is not None:
209209+ reason["slop_similarity"] = round(slop, 3)
210210+ if slop >= 0.9: # advisory: surfaces for the human, never flips the gate (6.12)
211211+ reason["top_factors"].append(f"diff {slop:.0%} similar to a known-bad pattern")
212212+ if scan: # advisory: surfaces for the human even when review is skipped (6.12)
213213+ reason["static_scan_findings"] = scan
214214+ worst = min(scan, key=lambda f: ["critical", "high", "medium"].index(f["severity"]))
215215+ reason["top_factors"].append(
216216+ f"static scan: {worst['severity']} {worst['type']} in added lines (line {worst['line']})")
151217152218 con.execute(
153219 "INSERT INTO scores (did, structural_trust, content_risk, calibrated_prob, decision, explanation_json) "
···212278 assert decide(0.95, risky) == "needs_human", "high-severity flag forces human"
213279 assert decide(0.5, None) == "normal_queue"
214280 assert displayed_prob(0.9, risky) < 0.9, "content risk must penalize, never lift"
281281+ # Phase 4 fold: head risk can only raise the content signal, never lower it.
282282+ assert _fold_content(None, 0.3)["content_risk"] == 0.3, "Claude skipped -> head synthesizes content"
283283+ assert _fold_content(clean, 0.4)["content_risk"] == 0.4, "head raises a clean Claude verdict"
284284+ assert _fold_content(risky, 0.1)["content_risk"] == 0.9, "head never lowers a risky Claude verdict"
285285+ assert _fold_content(clean, None) is clean, "no head -> content untouched"
215286 # 6.13: a sensitive-tier repo with no attestation forces human even for a perfect score.
216287 assert decide(0.99, clean, attestation_required=True, attested=False) == "needs_human"
217288 assert decide(0.99, clean, attestation_required=True, attested=True) == "fast_lane"
+48-7
src/trust/ingest.py
···232324242525def _kind(collection: str) -> str | None:
2626+ # Longest matching needle wins, so "tangled.repo.pull.status" maps to pull_status
2727+ # rather than being swallowed by the shorter "tangled.repo.pull" -> pull_request.
2828+ best, blen = None, -1
2629 for needle, kind in COLLECTION_KINDS.items():
2727- if needle in collection:
2828- return kind
2929- return None
3030+ if needle in collection and len(needle) > blen:
3131+ best, blen = kind, len(needle)
3232+ return best
303331343235def _url(con) -> str:
···5356 buf.clear()
545755585959+def _ts(v):
6060+ """createdAt -> a value DuckDB's TIMESTAMP accepts, or NULL. Real records sometimes
6161+ carry createdAt="" (or missing), which crashes the insert; coerce those to None."""
6262+ v = (v or "").strip() if isinstance(v, str) else v
6363+ return v or None
6464+6565+5666def derive(con, events: list[tuple]) -> None:
5757- """Raw event tuples -> contributors / vouches / pull_requests (PRD 6.1, 6.2)."""
6767+ """Raw event tuples -> contributors / vouches / pull_requests (PRD 6.1, 6.2).
6868+ Per-record try/except: one malformed record is skipped, never aborts the batch."""
5869 for did, time_us, op, collection, rkey, record_json in events:
7070+ try:
5971 kind = _kind(collection)
6072 rec = json.loads(record_json) if record_json else {}
6173 con.execute(
···7587 "VALUES (?,?,?,?,?,?,1.0) ON CONFLICT (voucher_did, subject_did) DO UPDATE SET "
7688 "polarity=excluded.polarity, reason=excluded.reason",
7789 [did, subject, polarity, rec.get("reason"), rec.get("evidence") or rec.get("uri"),
7878- rec.get("createdAt")],
9090+ _ts(rec.get("createdAt"))],
7991 )
9292+ elif kind == "pull_status" and op != "delete":
9393+ # sh.tangled.repo.pull.status: authoritative outcome (.merged/.closed/.open),
9494+ # the label signal absent from the pull record itself. It references the pull by
9595+ # AT-URI and may be authored by a DIFFERENT did than the pull owner, so the pr_id
9696+ # comes from the `pull` field (at://<did>/<coll>/<rkey> -> our pr_id), never from
9797+ # this record's own did/rkey. Side table so it's insertion-order independent.
9898+ pull_uri = rec.get("pull") or ""
9999+ pr_id = pull_uri[len("at://"):] if pull_uri.startswith("at://") else None
100100+ status = rec.get("status")
101101+ if pr_id and status:
102102+ con.execute(
103103+ "INSERT INTO pull_status (pr_id, status, updated_at) VALUES (?,?,now()) "
104104+ "ON CONFLICT (pr_id) DO UPDATE SET status=excluded.status, updated_at=now()",
105105+ [pr_id, status],
106106+ )
107107+ elif kind == "star" and op != "delete":
108108+ # sh.tangled.feed.star: subject.did is the starred repo's OWNER; the record
109109+ # author (did) is the starrer. A star is cheap -> feature only, never a trust
110110+ # edge. PK (starrer, owner) dedups multi-repo stars; skip self-stars.
111111+ subj = rec.get("subject") if isinstance(rec.get("subject"), dict) else {}
112112+ owner = subj.get("did")
113113+ if owner and owner != did:
114114+ con.execute(
115115+ "INSERT INTO stars (starrer_did, owner_did, created_at) VALUES (?,?,?) "
116116+ "ON CONFLICT (starrer_did, owner_did) DO NOTHING",
117117+ [did, owner, _ts(rec.get("createdAt"))],
118118+ )
80119 elif kind == "attestation" and op != "delete": # 6.13 jurisdiction attestation
81120 con.execute(
82121 "INSERT INTO attestations (did, jurisdiction, method, created_at) VALUES (?,?,?,?) "
83122 "ON CONFLICT (did, jurisdiction) DO NOTHING",
8484- [did, rec.get("jurisdiction"), rec.get("method", "signed_record"), rec.get("createdAt")],
123123+ [did, rec.get("jurisdiction"), rec.get("method", "signed_record"), _ts(rec.get("createdAt"))],
85124 )
86125 elif kind == "pull_request" and op != "delete":
87126 pr_id = f"{did}/{collection}/{rkey}"
···97136 "merged, closed_unmerged, additions, deletions, files_touched, diff_text, discussion_len) "
98137 "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (pr_id) DO NOTHING",
99138 [pr_id, did, tgt.get("repo") or rec.get("repo"), tgt.get("branch") or rec.get("target"),
100100- rec.get("createdAt"), rec.get("ciStatus"), rec.get("merged"), False,
139139+ _ts(rec.get("createdAt")), rec.get("ciStatus"), rec.get("merged"), False,
101140 rec.get("additions"), rec.get("deletions"), rec.get("filesTouched"),
102141 rec.get("diff"), len(json.dumps(rec.get("body", "")))],
103142 )
143143+ except Exception as e: # skip a single malformed record; never abort the batch
144144+ print(f"[derive] skip {collection} {rkey}: {type(e).__name__}", flush=True)
104145105146106147def _flush(buf: list[tuple]) -> None:
+9-1
src/trust/learned.py
···2626 "eigentrust_score", "did_age_days", "merged_pr_count", "revert_rate", "ci_pass_rate",
2727 "close_without_merge_ratio", "mean_diff_size", "mean_files_touched", "churn",
2828 "mean_discussion_len", "denounce_count",
2929+ "stars_received", "stars_trust", # popularity (raw, gameable) + trust-weighted (sybil-resistant)
2930]
3031MODEL_PATH = MODEL_DIR / "learned.pkl"
31323333+# Features sourced from the EigenResult (Python), not the SQL features view.
3434+_FROM_ER = {
3535+ "eigentrust_score": lambda did, er: er.trust.get(did, 0.0),
3636+ "stars_trust": lambda did, er: er.stars_trust.get(did, 0.0),
3737+}
3838+32393340def _vec(did: str, feats: dict, er: eigentrust.EigenResult) -> list[float]:
3441 out = []
3542 for c in FEATURE_COLS:
3636- out.append(er.trust.get(did, 0.0) if c == "eigentrust_score" else float(feats.get(c) or 0.0))
4343+ src = _FROM_ER.get(c)
4444+ out.append(src(did, er) if src else float(feats.get(c) or 0.0))
3745 return out
38463947
+235
src/trust/merged.py
···11+"""Phase 1 backstop: git-on-knots merge detection.
22+33+`sh.tangled.repo.pull.status` is published for almost no PRs (41 network-wide), so
44+merge truth lives on the knots. Each PR carries a `git format-patch` (the patchBlob);
55+a PR is MERGED if its patch-id matches a commit on the target repo's default branch.
66+So: clone each target repo bare once, patch-id every commit on HEAD, match all that
77+repo's PRs against the set, store labels, delete the clone.
88+99+Writes `pull_requests.merged=TRUE` for matches (the schema's intended backstop column;
1010+the real `merged` field is always NULL on sh.tangled.repo.pull). Also stores the
1111+decompressed patch into `diff_text` while it's in hand (the Phase-0 fetch, for free).
1212+1313+ponytail: patch-id matching is rebase-tolerant but squash-blind -- a squash/heavily-edited
1414+merge shows up as a false negative. The patches carry a Gerrit-style `Change-Id:`; if recall
1515+proves too low, match on that instead (survives squash). Idempotent: re-running just
1616+recomputes; the UPDATEs are harmless to repeat.
1717+"""
1818+1919+from __future__ import annotations
2020+2121+import argparse
2222+import gzip
2323+import json
2424+import shutil
2525+import re
2626+import subprocess
2727+import tempfile
2828+from concurrent.futures import ThreadPoolExecutor
2929+3030+from .backfill import _pds
3131+from .config import STAGING_DIR
3232+from .db import connection, ensure_schema
3333+from .diffs import MAX_BLOB_BYTES, MAX_DIFF_CHARS, _cid_for, _get_blob
3434+3535+CLONE_TIMEOUT = 180 # seconds; skip a pathologically huge/slow repo rather than hang the run
3636+3737+3838+def _patch_id(patch: bytes) -> str | None:
3939+ """git patch-id --stable -> the leading hash, or None. Same algorithm both sides
4040+ (PR patch and branch commits) is all that matters for comparison."""
4141+ try:
4242+ out = subprocess.run(["git", "patch-id", "--stable"], input=patch,
4343+ capture_output=True, timeout=30).stdout.decode().split()
4444+ return out[0] if out else None
4545+ except Exception:
4646+ return None
4747+4848+4949+_CHANGE_ID = re.compile(r"Change-Id:\s*(\S+)")
5050+5151+5252+def _change_id(patch: str) -> str | None:
5353+ """The Gerrit-style Change-Id in a format-patch's message, if any. Survives rebase
5454+ AND squash (git keeps the trailer), so it catches merges that patch-id misses."""
5555+ m = _CHANGE_ID.search(patch)
5656+ return m.group(1) if m else None
5757+5858+5959+def _branch_keys(bare: str) -> tuple[set[str], set[str]]:
6060+ """(patch_ids, change_ids) over every non-merge commit reachable from HEAD.
6161+ Two independent fingerprints: patch-id matches exact/rebased patches; Change-Id also
6262+ matches squashed/amended ones (where the diff changed but the trailer was preserved)."""
6363+ log = subprocess.run(["git", "-C", bare, "log", "-p", "--no-merges", "HEAD"],
6464+ capture_output=True, timeout=CLONE_TIMEOUT).stdout
6565+ pid = subprocess.run(["git", "patch-id", "--stable"], input=log,
6666+ capture_output=True, timeout=CLONE_TIMEOUT).stdout.decode()
6767+ patch_ids = {ln.split()[0] for ln in pid.splitlines() if ln.split()}
6868+ change_ids = set(_CHANGE_ID.findall(log.decode("utf-8", "replace")))
6969+ return patch_ids, change_ids
7070+7171+7272+def _full_patch(did: str, cid: str) -> str | None:
7373+ """Decompressed patch text, UNCAPPED (patch-id needs the whole diff to match)."""
7474+ pds = _pds(did)
7575+ if not pds:
7676+ return None
7777+ blob = _get_blob(pds, did, cid)
7878+ if not blob or len(blob) > MAX_BLOB_BYTES:
7979+ return None
8080+ try:
8181+ return gzip.decompress(blob).decode("utf-8", "replace") or None
8282+ except Exception:
8383+ return None
8484+8585+8686+def _candidates(con) -> dict[str, list[str]]:
8787+ """repoDid -> clone URLs `https://{knot}/{owner_did}/{name}` (non-localhost), deduped.
8888+ A repoDid can have several repo records (re-registrations/forks); try them in order."""
8989+ out: dict[str, list[str]] = {}
9090+ for did, rkey, rec in con.execute(
9191+ "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo'").fetchall():
9292+ v = json.loads(rec)
9393+ rd, knot = v.get("repoDid"), (v.get("knot") or "")
9494+ if not rd or knot.startswith("localhost"):
9595+ continue
9696+ url = f"https://{knot}/{did}/{rkey}"
9797+ out.setdefault(rd, [])
9898+ if url not in out[rd]:
9999+ out[rd].append(url)
100100+ return out
101101+102102+103103+def _targets(con) -> dict[str, list[tuple[str, str, str]]]:
104104+ """repoDid -> [(pr_id, author_did, patch_cid)] for every pull targeting it."""
105105+ out: dict[str, list[tuple[str, str, str]]] = {}
106106+ for did, rkey, rec in con.execute(
107107+ "SELECT did, rkey, record FROM events WHERE collection='sh.tangled.repo.pull'").fetchall():
108108+ v = json.loads(rec)
109109+ t = v.get("target") or {}
110110+ rd = t.get("repoDid") or t.get("repo")
111111+ cid = _cid_for(rec)
112112+ if not rd or not cid:
113113+ continue
114114+ pr_id = f"{did}/sh.tangled.repo.pull/{rkey}"
115115+ out.setdefault(rd, []).append((pr_id, did, cid))
116116+ return out
117117+118118+119119+def _clone(url: str, dest: str) -> bool:
120120+ try:
121121+ subprocess.run(["git", "clone", "--bare", "--single-branch", url, dest],
122122+ capture_output=True, timeout=CLONE_TIMEOUT,
123123+ env={"GIT_TERMINAL_PROMPT": "0"}, check=True)
124124+ return True
125125+ except Exception:
126126+ return False
127127+128128+129129+def _process(repodid: str, urls: list[str], prs: list[tuple[str, str, str]]) -> dict:
130130+ """Clone the repo, match each of its PRs, return {merged:set, diffs:[(pr_id,text)]}.
131131+ Always removes its clone before returning, so concurrent repos bound disk use."""
132132+ tmp = tempfile.mkdtemp(dir=str(STAGING_DIR), prefix="merged-")
133133+ bare = f"{tmp}/repo.git"
134134+ try:
135135+ if not any(_clone(u, bare) for u in urls):
136136+ return {"merged": set(), "diffs": [], "cloned": False}
137137+ branch_pids, branch_cids = _branch_keys(bare)
138138+ merged, diffs = set(), []
139139+ for pr_id, did, cid in prs:
140140+ patch = _full_patch(did, cid)
141141+ if not patch:
142142+ continue
143143+ diffs.append((pr_id, patch[:MAX_DIFF_CHARS])) # Phase-0 freebie (capped)
144144+ pid = _patch_id(patch.encode())
145145+ chid = _change_id(patch) # squash/rebase-proof fallback
146146+ if (pid and pid in branch_pids) or (chid and chid in branch_cids):
147147+ merged.add(pr_id)
148148+ return {"merged": merged, "diffs": diffs, "cloned": True}
149149+ except Exception:
150150+ return {"merged": set(), "diffs": [], "cloned": False}
151151+ finally:
152152+ shutil.rmtree(tmp, ignore_errors=True)
153153+154154+155155+def _write(merged: set[str], diffs: list[tuple[str, str]]) -> bool:
156156+ """One write lock: merge labels + (any still-missing) diff_text. Patient retry
157157+ (~120s) rides out a concurrent score-loop writer instead of crashing the run; if
158158+ it still can't get the lock, the chunk is dropped (logged) and an idempotent
159159+ re-run recovers it -- never abort 600 repos of work over one lock loss."""
160160+ if not merged and not diffs:
161161+ return True
162162+ try:
163163+ with connection(read_only=False, attempts=480, delay=0.25) as con:
164164+ if merged:
165165+ con.executemany("UPDATE pull_requests SET merged=TRUE WHERE pr_id=?",
166166+ [(p,) for p in merged])
167167+ if diffs:
168168+ con.executemany( # diffs is (pr_id, text); the UPDATE binds (text, pr_id)
169169+ "UPDATE pull_requests SET diff_text=? WHERE pr_id=? AND diff_text IS NULL",
170170+ [(text, pid) for pid, text in diffs])
171171+ return True
172172+ except Exception as e:
173173+ print(f"[merged] WRITE LOST to lock ({len(merged)} labels, {len(diffs)} diffs) -- "
174174+ f"re-run to recover: {e}", flush=True)
175175+ return False
176176+177177+178178+def detect(max_repos: int | None = None, workers: int = 6) -> dict:
179179+ """Clone every target repo, patch-id match its PRs, write merge labels.
180180+ Repos run concurrently (network+disk+git); DB writes funnel through the main thread."""
181181+ ensure_schema()
182182+ with connection(read_only=True) as con:
183183+ cands, targets = _candidates(con), _targets(con)
184184+ work = [(rd, cands[rd], prs) for rd, prs in targets.items() if rd in cands]
185185+ if max_repos:
186186+ work = work[:max_repos]
187187+ skipped_unresolved = sum(len(prs) for rd, prs in targets.items() if rd not in cands)
188188+ print(f"[merged] {len(work)} resolvable repos, "
189189+ f"{sum(len(p) for _,_,p in work)} PRs ({skipped_unresolved} PRs have no clone URL)", flush=True)
190190+191191+ total_merged = total_diffs = repos_failed = writes_lost = 0
192192+ with ThreadPoolExecutor(max_workers=workers) as ex:
193193+ for i, res in enumerate(ex.map(lambda w: _process(*w), work), 1):
194194+ if not res["cloned"]:
195195+ repos_failed += 1
196196+ if _write(res["merged"], res["diffs"]):
197197+ total_merged += len(res["merged"])
198198+ total_diffs += len(res["diffs"])
199199+ else:
200200+ writes_lost += 1
201201+ if i % 25 == 0:
202202+ print(f"[merged] {i}/{len(work)} repos, {total_merged} merged labels, "
203203+ f"{total_diffs} diffs ({repos_failed} unreachable, {writes_lost} writes lost)", flush=True)
204204+ out = {"repos": len(work), "merged": total_merged, "diffs_stored": total_diffs,
205205+ "repos_failed": repos_failed, "writes_lost": writes_lost, "prs_unresolved": skipped_unresolved}
206206+ print(f"[merged] DONE: {out}", flush=True)
207207+ return out
208208+209209+210210+def demo() -> None:
211211+ """Offline self-check: a patch-id is stable for the same diff and differs for a
212212+ changed diff, so set-membership matching is sound. No network/clone."""
213213+ d1 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+b\n"
214214+ d2 = b"diff --git a/x b/x\n--- a/x\n+++ b/x\n@@ -1 +1 @@\n-a\n+c\n"
215215+ p1, p1b, p2 = _patch_id(d1), _patch_id(d1), _patch_id(d2)
216216+ assert p1 and p1 == p1b, "patch-id not stable for identical diff"
217217+ assert p1 != p2, "different diffs must yield different patch-ids"
218218+ assert p1 in {p1, "deadbeef"} and p2 not in {p1}, "membership logic wrong"
219219+ print("patch-id stable + discriminating ok")
220220+221221+222222+def main() -> None:
223223+ ap = argparse.ArgumentParser(description="git-on-knots merge detection -> pull_requests.merged")
224224+ ap.add_argument("--max-repos", type=int, default=None, help="cap repos processed (smoke test)")
225225+ ap.add_argument("--workers", type=int, default=6, help="concurrent repo clones (default 6)")
226226+ ap.add_argument("--demo", action="store_true", help="offline self-check, then exit")
227227+ args = ap.parse_args()
228228+ if args.demo:
229229+ demo()
230230+ return
231231+ detect(max_repos=args.max_repos, workers=args.workers)
232232+233233+234234+if __name__ == "__main__":
235235+ main()
···11+"""M7 voice briefing (PRD: an ElevenLabs voice briefing on the API).
22+33+The explanation `summary` is already "suitable to read aloud" (6.6/6.9), so this
44+just composes a short spoken script from an assessment and pipes it to ElevenLabs.
55+No SDK — one POST with httpx.
66+77+Env (only ELEVENLABS_API_KEY touches the network):
88+ ELEVENLABS_API_KEY your key; absent -> the API returns the brief text instead of audio
99+ ELEVENLABS_VOICE_ID voice (default: a public ElevenLabs voice)
1010+"""
1111+1212+from __future__ import annotations
1313+1414+import os
1515+1616+URL = "https://api.elevenlabs.io/v1/text-to-speech/{voice}"
1717+DEFAULT_VOICE = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # "Rachel" (public)
1818+DECISION_PHRASE = {
1919+ "fast_lane": "safe to fast-lane",
2020+ "normal_queue": "for the normal review queue",
2121+ "needs_human": "routed to a human reviewer",
2222+}
2323+2424+2525+def brief_text(score: dict) -> str:
2626+ """2-3 sentence spoken script from an assessment. Skips raw DIDs (unspeakable)."""
2727+ who = score.get("handle") or score["did"]
2828+ pct = round((score.get("calibrated_prob") or 0) * 100)
2929+ decision = DECISION_PHRASE.get(score["decision"], score["decision"])
3030+ reason = score.get("explanation") or {}
3131+ out = [f"{who}: {decision}. Calibrated trust {pct} percent."]
3232+ if reason.get("compliance_block"):
3333+ out.append(reason["compliance_block"])
3434+ else:
3535+ factor = next((f for f in reason.get("top_factors", [])
3636+ if "did:" not in f and "trust reaches" not in f), None)
3737+ if factor:
3838+ out.append(factor.rstrip(".") + ".")
3939+ if reason.get("content_summary"):
4040+ out.append("Claude notes: " + reason["content_summary"])
4141+ return " ".join(out)
4242+4343+4444+def synthesize(text: str) -> bytes | None:
4545+ """ElevenLabs TTS -> mp3 bytes, or None when no API key is configured."""
4646+ key = os.environ.get("ELEVENLABS_API_KEY")
4747+ if not key:
4848+ return None
4949+ import httpx
5050+5151+ r = httpx.post(URL.format(voice=DEFAULT_VOICE),
5252+ headers={"xi-api-key": key, "accept": "audio/mpeg"},
5353+ json={"text": text, "model_id": "eleven_turbo_v2_5"}, timeout=60)
5454+ r.raise_for_status()
5555+ return r.content
5656+5757+5858+def demo() -> None:
5959+ """Self-check: build a sensible script; synth is a no-op without a key."""
6060+ script = brief_text({
6161+ "did": "did:plc:alice", "handle": "alice.dev", "calibrated_prob": 1.0,
6262+ "decision": "needs_human",
6363+ "explanation": {"compliance_block": "sensitive-tier repo: a valid jurisdiction "
6464+ "attestation is required before fast-lane/merge (6.13)",
6565+ "top_factors": ["trust reaches did:plc:alice via maintainer", "8 merged PRs"]},
6666+ })
6767+ print(script)
6868+ assert "alice.dev" in script and "human reviewer" in script and "did:" not in script
6969+ assert synthesize(script) is None or isinstance(synthesize(script), bytes)
7070+ print("ok")
7171+7272+7373+if __name__ == "__main__":
7474+ demo()
+87
src/trust/vouchsafe.py
···11+"""Static secret/SAST scan of a PR diff -> advisory machine_findings (PRD 6.12).
22+33+Regex patterns ported from VouchSafe (tangled.org/ivoine.tngl.sh/hackathon, MIT) --
44+the "OSV/secret-scan/SAST" external signal the README lists as skipped. Two deliberate
55+adaptations for this system:
66+77+- Scan only ADDED lines of the unified diff: a removed secret is not a leak, and
88+ context lines aren't the contribution under review.
99+- REDACT every matched secret before returning. Findings ride into a Claude prompt
1010+ and into the published sh.tangled.trust.score record (6.11), so the raw value must
1111+ never echo back out -- a scanner that leaks the secret it found is worse than none.
1212+1313+Advisory only, like the slop signal: it hints Claude (machine_findings) and adds a
1414+line to the explanation. It never flips the gate -- only the graph and attestation do
1515+(constraint 2). To make a critical leak force review, gate should_review on it; left
1616+out by intent so the gate contract stays in fusion.decide().
1717+"""
1818+1919+from __future__ import annotations
2020+2121+import re
2222+2323+# (name, pattern, severity). Per-line scan over added lines, so no re.S/multiline needed.
2424+_PATTERNS = [
2525+ ("Exposed API Key", re.compile(r"(?:api[_-]?key|apikey|api[_-]?secret)\s*[=:]\s*['\"]([a-zA-Z0-9_\-]{20,})['\"]?", re.I), "critical"),
2626+ ("AWS Access Key", re.compile(r"(?:A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}"), "critical"),
2727+ ("Private Key", re.compile(r"-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----"), "critical"),
2828+ ("Hardcoded Password", re.compile(r"(?:password|passwd|pwd)\s*[=:]\s*['\"]([^'\"]{8,})['\"]?", re.I), "high"),
2929+ ("SQL Injection Risk", re.compile(r"(?:execute|query|prepare)\s*\(\s*['\"]\s*SELECT.*\+.*['\"]|(?:execute|query)\s*\(\s*['\"].*\$\{.*\}.*['\"]", re.I), "high"),
3030+ ("eval() Usage", re.compile(r"\beval\s*\("), "high"),
3131+ ("JWT Token", re.compile(r"eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}"), "medium"),
3232+ ("Generic Secret", re.compile(r"(?:secret|token|bearer)\s*[=:]\s*['\"]([a-zA-Z0-9_\-]{20,})['\"]?", re.I), "medium"),
3333+]
3434+3535+_HUNK = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)")
3636+3737+3838+def _redact(line: str, m: re.Match) -> str:
3939+ """Mask the sensitive token (capture group if any, else the whole match) so a
4040+ finding can travel into a public prompt/record without leaking the secret."""
4141+ secret = m.group(1) if m.lastindex else m.group(0)
4242+ masked = secret[:4] + "...[redacted]" if len(secret) > 4 else "...[redacted]"
4343+ return line.replace(secret, masked).strip()[:120]
4444+4545+4646+def scan_diff(diff: str | None) -> list[dict]:
4747+ """Findings on ADDED lines of a unified diff: [{type, severity, line, snippet}].
4848+ `line` is the new-file line number; `snippet` has the secret redacted. Empty list
4949+ if nothing matched or no diff (so callers can `or None` it into machine_findings)."""
5050+ if not diff:
5151+ return []
5252+ findings: list[dict] = []
5353+ new_line = 0
5454+ for raw in diff.splitlines():
5555+ h = _HUNK.match(raw)
5656+ if h:
5757+ new_line = int(h.group(1))
5858+ continue
5959+ if raw.startswith(("+++", "---")): # file headers, not content
6060+ continue
6161+ if raw.startswith("+"):
6262+ text = raw[1:]
6363+ for name, pat, sev in _PATTERNS:
6464+ for m in pat.finditer(text):
6565+ findings.append({"type": name, "severity": sev,
6666+ "line": new_line, "snippet": _redact(text, m)})
6767+ new_line += 1
6868+ elif not raw.startswith("-"): # context line advances the new-file counter
6969+ new_line += 1
7070+ return findings
7171+7272+7373+def demo() -> None:
7474+ """Self-check: flags an added secret, redacts it, ignores removals."""
7575+ leak = ('@@ -1,2 +1,3 @@\n context\n+api_key = "AKIAIOSFODNN7EXAMPLE12"\n-old = 1\n')
7676+ f = scan_diff(leak)
7777+ assert f and f[0]["type"] == "Exposed API Key", f
7878+ assert f[0]["line"] == 2, f # hunk +1, context line 1, added line 2
7979+ assert "AKIAIOSFODNN7EXAMPLE12" not in f[0]["snippet"], "secret must be redacted"
8080+ # a secret only REMOVED is not a leak
8181+ assert scan_diff('@@ -1 +0,0 @@\n-password = "hunter2hunter2"\n') == []
8282+ assert scan_diff("") == []
8383+ print(f"scan_diff ok: {len(f)} finding(s); {f[0]['type']} @line {f[0]['line']} :: {f[0]['snippet']}")
8484+8585+8686+if __name__ == "__main__":
8787+ demo()
+78
tests/test_smoke.py
···1717 review.demo() # parses schema, or no-ops cleanly without an API key
181819192020+def test_static_scan_flags_added_secrets():
2121+ from trust import vouchsafe
2222+ vouchsafe.demo() # flags an added secret, redacts the value, ignores removals
2323+2424+2025def test_learned_ranks_trusted_above_sybil():
2126 pytest.importorskip("lightgbm") # M5 is the .[learned] extra; skip if not installed
2227 from trust import learned
···3237def test_atproto_record_shape():
3338 from trust import atproto
3439 atproto.demo() # builds a valid sh.tangled.trust.score record (no network)
4040+4141+4242+def test_voice_brief_text():
4343+ from trust import voice
4444+ voice.demo() # composes a speakable brief; synth no-ops without a key
4545+4646+4747+def test_embed_slop_ranking():
4848+ from trust import embed
4949+ embed.demo() # cosine identities + slop-corpus ranking; live embed only with a key
5050+5151+5252+def test_diffs_gunzip_roundtrip():
5353+ from trust import diffs
5454+ diffs.demo() # patchBlob gunzip round-trip + CID extraction (no network)
5555+5656+5757+def test_content_head_outranks_bad_diff():
5858+ pytest.importorskip("sklearn") # Tower B head is the .[learned] extra (scikit-learn)
5959+ from trust import content
6060+ content.demo() # frozen-embedding linear probe: a bad diff out-risks a clean one
6161+6262+6363+def test_pull_status_drives_clean_merge():
6464+ """sh.tangled.repo.pull.status (public) -> merged/closed -> clean_merge label."""
6565+ import json
6666+ import duckdb
6767+ from trust import db, ingest
6868+6969+ con = duckdb.connect(":memory:")
7070+ con.execute(db.SCHEMA); con.execute(db.FEATURES_VIEW); con.execute(db.PR_LABELS_VIEW)
7171+ did = "did:plc:x"
7272+ old = "2020-01-01T00:00:00Z" # well past the N-day window, so not NULL'd
7373+ def ev(coll, rkey, rec): return (did, 0, "create", coll, rkey, json.dumps(rec))
7474+ ingest.derive(con, [
7575+ ev("sh.tangled.repo.pull", "r1", {"createdAt": old, "target": {"repo": "x", "branch": "main"}}),
7676+ ev("sh.tangled.repo.pull.status", "s1",
7777+ {"pull": f"at://{did}/sh.tangled.repo.pull/r1", "status": "sh.tangled.repo.pull.status.merged"}),
7878+ ev("sh.tangled.repo.pull", "r2", {"createdAt": old}),
7979+ ev("sh.tangled.repo.pull.status", "s2",
8080+ {"pull": f"at://{did}/sh.tangled.repo.pull/r2", "status": "sh.tangled.repo.pull.status.closed"}),
8181+ ])
8282+ lbl = dict(con.execute("SELECT pr_id, clean_merge FROM pr_labels").fetchall())
8383+ assert lbl[f"{did}/sh.tangled.repo.pull/r1"] == 1, "merged + old + no revert -> clean_merge=1"
8484+ assert lbl[f"{did}/sh.tangled.repo.pull/r2"] == 0, "closed unmerged -> clean_merge=0"
8585+8686+8787+def test_stars_trust_weights_by_starrer_trust():
8888+ """A star from a trusted DID must outweigh a star from an untrusted/sybil DID."""
8989+ import json
9090+ import duckdb
9191+ from trust import db, ingest, eigentrust
9292+9393+ con = duckdb.connect(":memory:")
9494+ con.execute(db.SCHEMA); con.execute(db.FEATURES_VIEW); con.execute(db.PR_LABELS_VIEW)
9595+ # trusted chain seed -> alice; sybil is isolated (no incoming trust).
9696+ con.execute("INSERT INTO contributors (did) VALUES ('seed'),('alice'),('sybil')")
9797+ con.execute("INSERT INTO vouches (voucher_did, subject_did) VALUES ('seed','alice')")
9898+ con.execute("INSERT INTO seeds VALUES ('seed')")
9999+100100+ def star(starrer, owner): # derive() inserts the starrer as a contributor too
101101+ ingest.derive(con, [(starrer, 0, "create", "sh.tangled.feed.star", f"{starrer}-{owner}",
102102+ json.dumps({"subject": {"did": owner, "$type": "sh.tangled.feed.star#repo"}}))])
103103+ star("alice", "did:plc:ownerA") # endorsed by a trusted DID
104104+ star("sybil", "did:plc:ownerB") # endorsed by an untrusted DID
105105+106106+ er = eigentrust.compute(con)
107107+ a = er.stars_trust.get("did:plc:ownerA", 0.0)
108108+ b = er.stars_trust.get("did:plc:ownerB", 0.0)
109109+ assert a > b, f"trusted-starred owner ({a}) must outrank sybil-starred owner ({b})"
110110+ # raw count is blind to starrer trust: both owners show 1 star received.
111111+ raw = dict(con.execute("SELECT owner_did, COUNT(*) FROM stars GROUP BY owner_did").fetchall())
112112+ assert raw["did:plc:ownerA"] == raw["did:plc:ownerB"] == 1
+2
uv.lock
···733733 { name = "anthropic" },
734734 { name = "duckdb" },
735735 { name = "fastapi" },
736736+ { name = "httpx" },
736737 { name = "numpy", version = "2.4.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.12'" },
737738 { name = "numpy", version = "2.5.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" },
738739 { name = "pydantic" },
···747748 { name = "anthropic", specifier = ">=0.40" },
748749 { name = "duckdb", specifier = ">=1.1" },
749750 { name = "fastapi", specifier = ">=0.115" },
751751+ { name = "httpx", specifier = ">=0.27" },
750752 { name = "numpy", specifier = ">=1.26" },
751753 { name = "pydantic", specifier = ">=2.7" },
752754 { name = "scipy", specifier = ">=1.11" },
···11+# sunstead-web
22+33+SvelteKit 5 UI for the Tangled trust scorer. Thin client over the FastAPI JSON
44+endpoints (`trust-api`, :8003) — it never touches DuckDB. Surfaces:
55+66+- `/` — **Triage queue**: every open PR, searchable/sortable/filterable, each row expands to the full provenance (trust path, top factors, TreeSHAP model factors, Claude flags, content/slop risk, compliance block).
77+- `/dashboard` — fast-lane & false-approval rates, score histogram, decision donut, and a force-directed **vouch graph** (`/graph`).
88+- `/leaderboard` — contributors ranked by calibrated trust.
99+- `/backfill` — live scrape progress (polls `/backfill/status`).
1010+1111+## Run
1212+1313+```sh
1414+bun install
1515+bun run dev # http://localhost:5173 ; assumes the scorer on :8003
1616+API_BASE=http://host:port bun run dev # point at a different scorer
1717+```
1818+1919+`mprocs` brings up the whole stack (scorer + this UI) from the repo root.
2020+2121+`bun run build` → `node build` for the adapter-node production server (set
2222+`API_BASE` in its env).
2323+2424+## Stack
2525+2626+Svelte 5 runes · bits-ui · Lightning CSS (6-layer cascade in `app.css`) ·
2727+unplugin-icons/lucide · svelte-sonner · zod (validates every API response) ·
2828+better-result · adapter-node.
2929+3030+`/api/*` is a same-origin proxy (`routes/api/[...path]`) to the scorer, so there's
3131+no CORS and dev/prod fetch the same way.
3232+3333+Skipped from the house stack — not needed by a read-only dashboard, add when warranted:
3434+superforms/formsnap & sveltednd (no forms or DnD), paraglide (single locale),
3535+tanstack/table-core (`$derived` sort/filter/paginate covers thousands of rows;
3636+add table-core for column virtualization or faceting), layerchart (the published
3737+1.x is Tailwind-coupled and renders axes invisible on dark — the two charts are
3838+hand-rolled SVG; revisit if charts need axes/tooltips/zoom).
···11+import { SvelteMap } from 'svelte/reactivity';
22+33+// Real ATProto handles aren't in the scorer DB (contributors.handle is empty), but
44+// the API's /identity/{did} resolves + caches them via the PLC directory. We upgrade
55+// DIDs -> handles lazily for whatever rows are on screen, bounded so we don't hammer
66+// plc.directory. ponytail: client cache only; the server lru_caches the PLC hit too.
77+const cache = new SvelteMap<string, string | null>(); // did -> handle, null = resolved/none
88+const queued = new Set<string>();
99+let active = 0;
1010+const MAX = 8;
1111+1212+function pump() {
1313+ while (active < MAX && queued.size) {
1414+ const did = queued.values().next().value as string;
1515+ queued.delete(did);
1616+ active++;
1717+ fetch(`/api/identity/${encodeURIComponent(did)}`)
1818+ .then((r) => (r.ok ? r.json() : null))
1919+ .then((d) => cache.set(did, (d?.handle as string) ?? null))
2020+ .catch(() => cache.set(did, null))
2121+ .finally(() => {
2222+ active--;
2323+ pump();
2424+ });
2525+ }
2626+}
2727+2828+/** Resolve real handles for these DIDs (deduped, cached, max 8 in flight). */
2929+export function resolveHandles(dids: string[]) {
3030+ for (const did of dids) if (!cache.has(did) && !queued.has(did)) queued.add(did);
3131+ pump();
3232+}
3333+3434+/** Resolved handle, `null` if the DID has none, `undefined` while still pending. */
3535+export function handleFor(did: string): string | null | undefined {
3636+ return cache.get(did);
3737+}
···11+import { error } from '@sveltejs/kit';
22+import { getJson, TriageList } from '$lib/api';
33+import type { PageLoad } from './$types';
44+55+export const load: PageLoad = async ({ fetch }) => {
66+ const res = await getJson(fetch, '/triage', TriageList);
77+ if (res.isErr()) throw error(502, res.error);
88+ return { rows: res.value };
99+};
+15
web/src/routes/api/[...path]/+server.ts
···11+import { env } from '$env/dynamic/private';
22+import type { RequestHandler } from './$types';
33+44+// Single proxy so dev and prod (adapter-node) both talk to the FastAPI scoring
55+// service via same-origin /api/* — no CORS, no dev/prod config drift.
66+// ponytail: GET-only; the UI is read-only. Add POST when /review/pr gets a form.
77+const BASE = env.API_BASE ?? 'http://127.0.0.1:8003';
88+99+export const GET: RequestHandler = async ({ params, url, fetch }) => {
1010+ const res = await fetch(`${BASE}/${params.path}${url.search}`);
1111+ return new Response(res.body, {
1212+ status: res.status,
1313+ headers: { 'content-type': res.headers.get('content-type') ?? 'application/json' },
1414+ });
1515+};