···11+# Source before running anything (PRD 4.1). Route all large artifacts to the drive.
22+export DATA_ROOT="/Volumes/EXT/tangled-trust" # the external drive (Linux: /mnt/ext/tangled-trust)
33+mkdir -p "$DATA_ROOT"/{venv,pip,hf,torch,pyg,staging,diffs,models,duckdb,logs}
44+55+export PIP_CACHE_DIR="$DATA_ROOT/pip"
66+export UV_CACHE_DIR="$DATA_ROOT/pip"
77+export HF_HOME="$DATA_ROOT/hf"
88+export TORCH_HOME="$DATA_ROOT/torch"
99+1010+export DUCKDB_PATH="$DATA_ROOT/duckdb/trust.duckdb"
1111+export STAGING_DIR="$DATA_ROOT/staging"
1212+export MODEL_DIR="$DATA_ROOT/models"
1313+export LOG_DIR="$DATA_ROOT/logs"
1414+1515+# Claude review (the only secret that stays on the main disk, in .env):
1616+export ANTHROPIC_API_KEY="sk-ant-..."
1717+export CLAUDE_MODEL="claude-sonnet-4-6"
1818+1919+# Create the venv ON the drive, not in the repo:
2020+# python -m venv "$DATA_ROOT/venv" && source "$DATA_ROOT/venv/bin/activate"
···11+# Tangled contributor trust scoring (EigenTrust + Claude)
22+33+Calibrated, explainable, sybil-resistant trust scores that auto-triage Tangled PRs
44+into **fast-lane / normal-queue / needs-human**. Two independent signals fused by a
55+gate (not an average): **structural trust** (EigenTrust over the vouch graph) and
66+**content review** (Claude reading the diff, blind to author identity).
77+88+Built per `prd.md` through **M7**: EigenTrust + Claude end to end; LightGBM learned score
99+with isotonic calibration; GraphSAGE trained offline and compared (not served — it doesn't
1010+beat M5 on this sparse graph, and the PRD says ship it only if it does); the
1111+attestation-gated sensitive-repo tier (6.13); AT-Proto writeback of assessments as records
1212+(6.11); and the Tangled browser overlay (7.4). Only the ElevenLabs voice briefing is left
1313+as a seam — see "What's skipped".
1414+1515+## Layout
1616+1717+```
1818+src/trust/
1919+ config.py env paths (DATA_ROOT fail-fast) + gate/eigen/review tuning
2020+ db.py DuckDB schema, feature view, clean_merge label SQL
2121+ ingest.py M1 Jetstream -> events -> derive typed tables (--probe confirms NSIDs)
2222+ eigentrust.py M3 SciPy power iteration + BFS path explanation (no graph DB)
2323+ review.py M4 Claude reviewer, verbatim 6.6 prompt, forced-schema tool use
2424+ fusion.py M4 gate decide() + scoring worker (score_pr); loads M5 model if present
2525+ learned.py M5 LightGBM + isotonic calibration + TreeSHAP (optional .[learned] extra)
2626+ gnn.py M6 GraphSAGE, trained offline + compared vs M5; served only if it wins (.[gnn])
2727+ atproto.py M7 writeback: assessments published as sh.tangled.trust.score records (6.11)
2828+ api.py M3/M4 FastAPI: /score /review /leaderboard /metrics /triage + pages
2929+src/trust/static/ triage / dashboard / leaderboard pages
3030+extension/ M7 Tangled browser overlay (7.4) — MV3 content script, UI only
3131+lexicons/ sh.tangled.trust.score lexicon for the writeback (6.11)
3232+ seed.py synthetic demo data (trusted core + sybil cluster)
3333+ static/ triage / dashboard / leaderboard pages (thin clients of the API)
3434+```
3535+3636+## Setup
3737+3838+```bash
3939+cp .envrc.example .envrc # point DATA_ROOT at the external drive; add ANTHROPIC_API_KEY
4040+source .envrc # in prod: fails fast if the drive is not mounted
4141+uv venv .venv && source .venv/bin/activate && uv pip install -e .
4242+```
4343+4444+`DATA_ROOT` unset → a repo-local `.data/` dev fallback (with a warning). All large
4545+artifacts route under `DATA_ROOT` (PRD 4.1).
4646+4747+## Demo (no live data or API key required)
4848+4949+One command brings up the whole stack (seed → score loop → API) in split panes:
5050+5151+```bash
5252+mprocs # reads mprocs.yaml; open http://127.0.0.1:8000
5353+```
5454+5555+Or run the panes by hand:
5656+5757+```bash
5858+python -m trust.seed # load the synthetic vouch graph + labelled PRs
5959+python -m trust.score --loop # poll + score PRs, write decisions (--loop for a daemon)
6060+python -m trust.api # serve http://127.0.0.1:8000 (triage / dashboard / leaderboard)
6161+```
6262+6363+> DuckDB is single-writer and a held lock blocks every other open, so each process
6464+> opens the file briefly (open → work → close) with retry — that's what lets the
6565+> mprocs panes share one `trust.duckdb`. Don't run `ingest` and `score` as writers
6666+> at the same time.
6767+6868+## Learned score (M5, optional)
6969+7070+```bash
7171+uv pip install -e '.[learned]' # lightgbm + scikit-learn (no shap needed)
7272+python -m trust.seed
7373+trust-train # LightGBM on the features, isotonic-calibrated; prints a reliability curve
7474+python -m trust.score # the gate now uses calibrated P(clean), not raw EigenTrust
7575+```
7676+7777+`trust-train` predicts `clean_merge` from the per-DID features (with `eigentrust_score`
7878+**as a feature**, so the model builds on the graph), splits by time, and fits isotonic
7979+regression so the output is a real probability (PRD 6.5/6.8). The model is saved under
8080+`MODEL_DIR`; `fusion.structural_for` loads it automatically and falls back to raw
8181+EigenTrust when it's absent (so the base install still runs). Explanations gain the top
8282+LightGBM **TreeSHAP** contributions (`merged_pr_count (+1.40)`, …) via LightGBM's native
8383+`pred_contrib` — no `shap`/`numba` dependency.
8484+8585+> On the tiny synthetic data the model is near-degenerate (the reliability curve has two
8686+> bins; one revert sends a contributor to 0). That's expected at N≈22 — real history
8787+> smooths it. To use M5 in a running `mprocs` demo: `trust-train`, then restart the
8888+> `score` and `api` panes so they load the model.
8989+9090+What it shows (the PRD deliverable):
9191+9292+- `live/trusted-clean` — authored by **carol**, trust flows maintainer → alice → carol →
9393+ **fast-lane** on structural trust alone.
9494+- `live/sybil-buggy` — authored by a throwaway in an isolated mutual-vouch cluster,
9595+ starved to **0.000** → **needs_human**. A clean-looking diff could never lift it
9696+ (constraint 2). With `ANTHROPIC_API_KEY` set, Claude also attaches a concrete reason
9797+ (the diff swaps a constant-time compare for `==`).
9898+- Dashboard: score distribution, fast-lane rate, **0% false-approval** backtest above the
9999+ threshold, vouch-graph stats.
100100+101101+## Live data
102102+103103+```bash
104104+python -m trust.ingest --probe --max-events 300 # confirm real sh.tangled.* NSIDs first
105105+python -m trust.ingest # firehose -> DuckDB, resumable cursor
106106+python -m trust.score # score newly-ingested PRs
107107+```
108108+109109+The collection→record map in `config.COLLECTION_KINDS` is best-guess and marked
110110+`CONFIRM` — verify it against the `--probe` output before trusting derived rows.
111111+112112+## Tests
113113+114114+```bash
115115+python -m pytest # eigentrust starves sybils; gate never lifts untrusted; schema parses
116116+```
117117+118118+## GraphSAGE (M6, optional)
119119+120120+```bash
121121+uv pip install -e '.[gnn]' # torch + torch-geometric (multi-GB)
122122+trust-seed && trust-train && trust-gnn # trains GraphSAGE offline, compares vs M5
123123+```
124124+125125+`trust-gnn` builds a PyG graph (positive vouches + co-contribution edges; per-DID feature
126126+vectors as node features; denounce-count rides as a feature, no signed-edge GNN), trains an
127127+inductive 2-layer GraphSAGE on a time split, then writes a **verdict** comparing its holdout
128128+accuracy to M5's. `fusion.structural_for` serves the GNN **only if `gnn_wins`** — on the
129129+synthetic graph it loses to M5, so the system keeps the calibrated baseline. That gate is the
130130+PRD's rule ("ship the GNN only if it beats the baseline and is stable"), enforced in code.
131131+132132+> lightgbm and torch each bundle `libomp`; loading both in one process hangs on macOS.
133133+> `trust/__init__.py` sets `KMP_DUPLICATE_LIB_OK` / `OMP_NUM_THREADS` before either imports.
134134+135135+## Native + compliance surfaces (M7)
136136+137137+- **Attestation-gated sensitive-repo tier (6.13).** A repo in the `sensitive` tier
138138+ requires a contributor-issued jurisdiction attestation before fast-lane/merge; a
139139+ missing one forces `needs_human` regardless of trust or content risk — the only control
140140+ that overrides the score, so it's checked first in `decide()`. The demo seeds a sensitive
141141+ repo where an attested DID fast-lanes and an unattested high-trust DID is blocked at
142142+ `calibrated_prob 1.00`. Only declared/asserted facts are used; nothing is inferred.
143143+- **AT-Proto writeback (6.11).** `trust-publish` emits each assessment as a public
144144+ `sh.tangled.trust.score` record (lexicon in `lexicons/`) on the service's own PDS, so
145145+ verdicts are auditable provenance on the network. No creds → dry-run (prints the records);
146146+ set `ATPROTO_PDS` / `ATPROTO_IDENTIFIER` / `ATPROTO_PASSWORD` to publish for real.
147147+- **Browser overlay (7.4).** `extension/` is a minimal MV3 content script that injects a
148148+ trust hat onto tangled.org from the same `/score` API. Load unpacked; see `extension/README.md`.
149149+ Confirm the DID selector against the real DOM (the UI analog of confirming NSIDs).
150150+151151+## What's skipped (and when to add it)
152152+153153+- **ElevenLabs voice briefing (M7).** A thin wrapper: the explanation `summary` is already
154154+ "suitable to read aloud" (6.6), so a `/brief/{did}` endpoint that pipes it to TTS is the
155155+ whole job — add when you have an ElevenLabs key.
156156+- **Per-PR writeback subject.** `sh.tangled.trust.score` currently keys on the contributor
157157+ DID; carry `pr_id` on the `scores` table to reference a specific PR's `at://` URI.
158158+- **SvelteKit frontend.** The three surfaces ship as built-in static pages (the PRD blesses
159159+ this for the dashboard); swap to SvelteKit if you need the richer UI kit / native overlay.
160160+- **External signals (6.12): OSV/secret-scan/SAST.** `review_pr` already accepts
161161+ `machine_findings` as structured input — wire the scanners into that arg.
+27
extension/README.md
···11+# Tangled Trust Hat (browser overlay, PRD 7.4)
22+33+A minimal MV3 content script that injects a calibrated **trust hat** next to
44+contributor DIDs on tangled.org, reading the same `/score` API as the dashboard.
55+UI only — the brain stays in the scoring service.
66+77+## Load it
88+99+1. Run the scoring service: `python -m trust.api` (serves `http://127.0.0.1:8000`).
1010+2. Chrome → `chrome://extensions` → enable Developer mode → **Load unpacked** → pick
1111+ this `extension/` folder.
1212+3. Open a tangled.org PR or contributor page. DIDs get a colored pill
1313+ (green fast-lane / amber normal / red needs-review); hover for the reason.
1414+1515+## Two things to confirm against the real site
1616+1717+- **The DID selector.** `content.js` scans text nodes for `did:plc:` / `did:web:`
1818+ patterns — a best guess. If tangled.org renders DIDs in attributes or a different
1919+ shape, adjust the scan (this is the UI analog of confirming the NSIDs).
2020+- **CORS / host.** The API ships permissive CORS for local use; the manifest grants
2121+ `http://127.0.0.1:8000/*`. Point both at your host for a hosted demo.
2222+2323+## Not built
2424+2525+The upstream vision — Tangled's own appview rendering third-party trust records
2626+natively (the 6.11 `sh.tangled.trust.score` records) — is a platform change, not this
2727+extension. Ask Lewis whether the appview can render trust records authored by other DIDs.
+63
extension/content.js
···11+// Tangled-native overlay (PRD 7.4). UI only: it reads the same /score API the
22+// dashboard does and injects a "trust hat" pill next to contributor DIDs on
33+// tangled.org. The brain stays in the service; this never touches DuckDB.
44+//
55+// ponytail: vanilla content script, no build step (same call as the built-in
66+// pages) over the Bun/oxlint/zod toolchain. CONFIRM the DID selector against the
77+// real tangled.org DOM — like the NSIDs, this is a best-guess until verified.
88+99+const API = "http://127.0.0.1:8000";
1010+const DID_RE = /did:(?:plc:[a-z2-7]+|web:[a-z0-9.\-]+)/gi;
1111+const COLOR = { fast_lane: "#1a7f37", normal_queue: "#9a6700", needs_human: "#cf222e" };
1212+1313+const cache = new Map();
1414+1515+async function score(did) {
1616+ if (cache.has(did)) return cache.get(did);
1717+ const p = fetch(`${API}/score/${encodeURIComponent(did)}`)
1818+ .then((r) => (r.ok ? r.json() : null))
1919+ .catch(() => null); // service down -> render nothing, never break the page
2020+ cache.set(did, p);
2121+ return p;
2222+}
2323+2424+function pill(s) {
2525+ const el = document.createElement("span");
2626+ el.className = "tangled-trust-hat";
2727+ el.dataset.did = s.did;
2828+ el.textContent = ` ${Math.round((s.calibrated_prob ?? 0) * 100)}% `;
2929+ const factors = (s.explanation?.top_factors || []).join("\n");
3030+ el.title = `${s.decision}\n${factors}`;
3131+ Object.assign(el.style, {
3232+ background: COLOR[s.decision] || "#57606a",
3333+ color: "#fff", borderRadius: "999px", padding: "1px 7px",
3434+ fontSize: "11px", fontWeight: "600", marginLeft: "6px", verticalAlign: "middle",
3535+ });
3636+ return el;
3737+}
3838+3939+// Find text-node occurrences of a DID and tag their parent once.
4040+async function scan() {
4141+ const walker = document.createTreeWalker(document.body, NodeFilter.SHOW_TEXT);
4242+ const hits = [];
4343+ for (let n = walker.nextNode(); n; n = walker.nextNode()) {
4444+ const m = n.nodeValue.match(DID_RE);
4545+ if (m) hits.push({ node: n, dids: [...new Set(m)] });
4646+ }
4747+ for (const { node, dids } of hits) {
4848+ const host = node.parentElement;
4949+ if (!host || host.querySelector(":scope > .tangled-trust-hat")) continue;
5050+ for (const did of dids) {
5151+ const s = await score(did);
5252+ if (s) host.appendChild(pill(s));
5353+ }
5454+ }
5555+}
5656+5757+scan();
5858+// tangled.org is an SPA; re-scan on DOM changes (debounced).
5959+let t;
6060+new MutationObserver(() => {
6161+ clearTimeout(t);
6262+ t = setTimeout(scan, 400);
6363+}).observe(document.body, { childList: true, subtree: true });
+14
extension/manifest.json
···11+{
22+ "manifest_version": 3,
33+ "name": "Tangled Trust Hat",
44+ "version": "0.1.0",
55+ "description": "Overlays calibrated contributor trust (from the local scoring service) onto tangled.org PR and contributor pages. UI only; the brain stays in the service.",
66+ "content_scripts": [
77+ {
88+ "matches": ["https://tangled.org/*", "https://*.tangled.org/*"],
99+ "js": ["content.js"],
1010+ "run_at": "document_idle"
1111+ }
1212+ ],
1313+ "host_permissions": ["http://127.0.0.1:8000/*"]
1414+}
+47
lexicons/sh.tangled.trust.score.json
···11+{
22+ "lexicon": 1,
33+ "id": "sh.tangled.trust.score",
44+ "description": "A trust assessment of a Tangled contributor, authored by the trust-scoring service's DID (PRD 6.11). Auditable provenance on the network, not a row in a private file.",
55+ "defs": {
66+ "main": {
77+ "type": "record",
88+ "key": "tid",
99+ "record": {
1010+ "type": "object",
1111+ "required": ["subject", "calibratedProb", "decision", "structuralTrust", "createdAt"],
1212+ "properties": {
1313+ "subject": {
1414+ "type": "string",
1515+ "format": "did",
1616+ "description": "The assessed contributor's DID. Reference a specific PR by storing its at:// URI here once pr_id is carried on scores."
1717+ },
1818+ "calibratedProb": {
1919+ "type": "number",
2020+ "description": "Calibrated P(next contribution is a clean merge), 0.0-1.0."
2121+ },
2222+ "decision": {
2323+ "type": "string",
2424+ "enum": ["fast_lane", "normal_queue", "needs_human"]
2525+ },
2626+ "structuralTrust": {
2727+ "type": "number",
2828+ "description": "EigenTrust / learned structural signal, 0.0-1.0."
2929+ },
3030+ "contentRisk": {
3131+ "type": "number",
3232+ "description": "Claude content-review risk when a review ran, else absent."
3333+ },
3434+ "summary": {
3535+ "type": "string",
3636+ "maxGraphemes": 280,
3737+ "description": "Human-readable rationale, suitable to read aloud."
3838+ },
3939+ "createdAt": {
4040+ "type": "string",
4141+ "format": "datetime"
4242+ }
4343+ }
4444+ }
4545+ }
4646+ }
4747+}
+27
mprocs.yaml
···11+# mprocs: brings up the whole stack. Run `mprocs` in the repo root.
22+#
33+# DuckDB allows only ONE read-write process at a time and a held lock blocks
44+# every other open. So each process opens the file briefly (open -> work ->
55+# close) with retry, letting these panes interleave. Don't run `ingest` and
66+# `score` at the same time in live mode — they're both writers and will just
77+# contend; ingest is the single writer, score reads+writes scores in its gaps.
88+procs:
99+ seed:
1010+ shell: uv run trust-seed # one-shot: load demo data, then exits
1111+ autostart: true
1212+1313+ score:
1414+ shell: uv run python -m trust.score --loop --interval 5
1515+ autostart: true
1616+1717+ api:
1818+ shell: uv run trust-api # http://127.0.0.1:8000 (triage/dashboard/leaderboard)
1919+ autostart: true
2020+2121+ ingest:
2222+ shell: uv run python -m trust.ingest # live firehose -> DuckDB
2323+ autostart: false # enable after `--probe` confirms NSIDs; pause `seed`/`score` first
2424+2525+ probe:
2626+ shell: uv run python -m trust.ingest --probe --max-events 300
2727+ autostart: false # confirm real sh.tangled.* collection names
+456
prd.md
···11+# PRD: hybrid contributor trust scoring for Tangled (GNN + Claude)
22+33+You are building a backend service that scores the trustworthiness of contributors on **Tangled**, a code forge built on the **AT Protocol**. The score auto-triages incoming pull requests so maintainers who approve hundreds of PRs a day only review the ones that need a human. The score must be **calibrated** (a real probability), **explainable** (a maintainer can see why), and **adversarially robust** (resistant to throwaway identities pushing machine-generated low-quality code).
44+55+The stack is deliberately lean and self-hosted: a single embedded **DuckDB** store on an external drive, plain Python processes, and no managed cloud data services. Read sections 0, 1, and 2 before writing any code. Build strictly in the order in section 5.
66+77+---
88+99+## 0. Mission
1010+1111+Produce, per contributor DID, a calibrated probability that their next contribution is safe to fast-lane, plus a short human-readable reason, by fusing two independent signals:
1212+1313+- **Structural trust (the "who"):** the contributor's position in the vouch graph and their historical track record. Sybil-resistant. Built with EigenTrust first, then optionally upgraded with a GNN.
1414+- **Content review (the "what"):** Claude reading the actual diff and discussion of a specific PR to catch problems the graph cannot see.
1515+1616+These are fused by a **policy/gate**, not a naive average. The output drives a decision: fast-lane, normal queue, or route to a human with a reason attached.
1717+1818+---
1919+2020+## 1. Threat model and hard constraints (non-negotiable)
2121+2222+The attacker spins up fresh DIDs and pushes LLM-generated code that looks correct but is subtly wrong, to get it merged with minimal review. Every design choice exists to defeat this:
2323+2424+1. **The structural signal must be load-bearing and sybil-resistant.** Trust must flow from a trusted seed set; a cluster of fake DIDs vouching for each other must be starved. This is why EigenTrust (a trust-flow algorithm) is the core, not a vouch count.
2525+2. **Claude judges content, never identity.** Claude must not see or infer author reputation. A clean-looking diff from an untrusted DID must NOT lift that DID into the fast-lane. Identity is the graph's job; content is Claude's job.
2626+3. **The score must be calibrated.** 0.9 must mean roughly 90% of such contributors produce clean PRs.
2727+4. **The score must be explainable.** Emit a structured explanation (top factors plus Claude's rationale), never a bare number.
2828+5. **Inform, do not enforce.** Tangled's vouching has no punitive consequence; it informs a decision. This system recommends and routes; it does not block users.
2929+3030+---
3131+3232+## 2. Two decisions that shape the whole build
3333+3434+**The stack is serviceless and self-hosted.** All state lives in a single embedded DuckDB file on an external drive. The ingester, scoring worker, and API are plain Python processes. There is no message broker, no separate database server, and no managed cloud data service. Models are trained offline; Anthropic serves the Claude inference call. This keeps the moving parts minimal and the footprint small, which suits both the hardware constraint and a hackathon timeline. The resumability a broker would give is already covered by the AT Protocol firehose itself: the Jetstream cursor lets you replay, and the raw event log in DuckDB is the durable record.
3535+3636+**There is no graph database, and the agent must not add one.** You need graph *computation*, not a graph *engine*, and they are different layers. The vouch graph lives as a plain edge list in a `vouches` table in DuckDB. EigenTrust reads those rows into a SciPy sparse matrix and runs power iteration in memory. GraphSAGE builds a PyTorch Geometric `edge_index` tensor from the same query. At hackathon scale the graph is a few thousand edges and fits in memory many times over, so there is no performance case for Neo4j or any graph DB, and adding one only costs a service to run and a query language to wire up. Path-based explanations ("trust reaches this contributor through maintainers X and Y") are done with a short in-memory breadth-first walk from the seed during the EigenTrust run, not with graph-DB traversal.
3737+3838+---
3939+4040+## 3. Architecture
4141+4242+```
4343+ Jetstream (filtered AT Proto firehose, JSON over WebSocket)
4444+ |
4545+ v
4646+ Ingester (plain Python process)
4747+ (confirm NSIDs; persist cursor; batched appends)
4848+ |
4949+ v
5050+ DuckDB file [on external drive: $DUCKDB_PATH]
5151+ - events (raw append log)
5252+ - contributors
5353+ - vouches (edge list) <- the whole graph; no graph DB
5454+ - pull_requests (lifecycle)
5555+ - features (SQL views / tables)
5656+ - scores
5757+ - ingest_state (cursor)
5858+ |
5959+ +-------------------+--------------------+
6060+ | |
6161+ v v
6262+ STRUCTURAL SIGNAL CONTENT SIGNAL
6363+ (reads the vouches edge list) (Claude via Anthropic API)
6464+ - EigenTrust (SciPy sparse) - reviews a PR's diff +
6565+ - LightGBM on features discussion; returns
6666+ - GraphSAGE (PyG; trained offline, structured risk + flags
6767+ inference served in-process) + rationale
6868+ | |
6969+ +-------------------+--------------------+
7070+ |
7171+ v
7272+ FUSION POLICY / GATE (section 6.7)
7373+ |
7474+ v
7575+ Calibrated score + decision + explanation
7676+ |
7777+ v
7878+ FastAPI (plain process) -> /score /review /leaderboard
7979+ + built-in /dashboard (reads DuckDB)
8080+ |
8181+ v
8282+ (stretch) write assessment back as an AT Proto record
8383+```
8484+8585+**Stack roles**
8686+8787+- **DuckDB (single embedded store).** Holds everything: the raw event log, the curated tables (contributors, the vouch edge list, PR lifecycle, scores), and the feature views. One file on the external drive. Batch-append from the ingester (single writer); the API and the structural step read from it. Excellent at the analytical aggregations the features need.
8888+- **DuckDB VSS extension or sqlite-vec (optional).** Diff-embedding k-NN for the slop-similarity angle: embed diffs and find near-duplicates of known-bad patterns. Keeps vector search serviceless, no separate search engine.
8989+- **Built-in dashboard (recommended, on-theme).** The challenge is about observability and traceability, so the API serves a small static `/dashboard` page that reads DuckDB aggregates. Low effort and your demo centerpiece. A self-hosted Grafana plus Prometheus is an option for richer charts, but it adds services and disk, so default to the built-in page.
9090+- **Plain Python processes.** The ingester, the scoring worker, and the FastAPI service. Run locally during the hackathon. For a hosted demo, one small VM or a single container; no managed platform required.
9191+- **Offline training and Anthropic.** The GNN is trained offline with checkpoints on the drive and served in-process; Anthropic serves the Claude inference call.
9292+9393+---
9494+9595+## 4. Stack
9696+9797+- **Language:** Python 3.11+ throughout (the GNN forces PyTorch; keep one language).
9898+- **Ingest:** `websockets` against a public Jetstream instance; batched appends written directly to DuckDB.
9999+- **Store:** DuckDB, embedded, a single file on the external drive (`$DUCKDB_PATH`), for the event log, curated tables, feature views, and scores. Optional DuckDB VSS extension (or sqlite-vec) for diff-embedding similarity.
100100+- **Structural:** NumPy/SciPy sparse for EigenTrust; PyTorch Geometric for the GNN.
101101+- **Learned baseline:** LightGBM; SHAP for explanations.
102102+- **Content:** Anthropic SDK. Default `claude-sonnet-4-6`; cheap pre-pass `claude-haiku-4-5-20251001`; escalate hard cases to `claude-opus-4-8`. Temperature 0. Force the output schema with tool use / structured outputs.
103103+- **Observability:** a built-in FastAPI `/dashboard` reading DuckDB; self-hosted Grafana plus Prometheus optional if you want richer charts.
104104+- **API and runtime:** FastAPI (Python), co-located with the scorer, run as a plain process. The SvelteKit frontend talks to it directly. If you want the API in your TS house style, a thin Hono (Bun) gateway can front the Python scorer, but the direct path avoids a cross-language hop for the hackathon. For a hosted demo, a single small VM or container.
105105+- **Frontend:** SvelteKit + Svelte 5 (runes), shipped via `@sveltejs/adapter-node`. UI kit bits-ui; styling Lightning CSS with the six-layer cascade; icons unplugin-icons + iconify; charts layerchart; tables tanstack/table-core; toasts svelte-sonner; validation zod. Full screen spec in section 7.
106106+- **Tooling (scoring service):** uv for deps, ruff for lint and format, ty for type checking, pytest for tests.
107107+108108+### 4.1 Local disk: route every large file to the external drive
109109+110110+The development machine is short on space, so all large local artifacts live on a mounted external drive, never on the home or system disk. This is cheap to enforce because the heavy local footprint is small and well contained: the data store is a single DuckDB file, and the rest is the Python and ML toolchain (torch plus torch-geometric are multi-GB), the model and embedding caches, and the transient backfill staging. Route all of it through a single `DATA_ROOT` env var.
111111+112112+Set `DATA_ROOT` to the mounted drive and create the subtree once (macOS shown; on Linux use a path like `/mnt/ext/tangled-trust`):
113113+114114+```bash
115115+# .envrc (source this before running anything in the project)
116116+export DATA_ROOT="/Volumes/EXT/tangled-trust" # the external drive
117117+mkdir -p "$DATA_ROOT"/{venv,pip,hf,torch,pyg,staging,diffs,models,duckdb,logs}
118118+119119+# Python toolchain (the single biggest hog: torch + torch-geometric wheels)
120120+export PIP_CACHE_DIR="$DATA_ROOT/pip"
121121+export UV_CACHE_DIR="$DATA_ROOT/pip" # if using uv
122122+# Create the venv ON the drive, not inside the repo:
123123+# python -m venv "$DATA_ROOT/venv" && source "$DATA_ROOT/venv/bin/activate"
124124+125125+# Model and embedding caches (GBs if you do local diff embeddings)
126126+export HF_HOME="$DATA_ROOT/hf"
127127+export TRANSFORMERS_CACHE="$DATA_ROOT/hf"
128128+export SENTENCE_TRANSFORMERS_HOME="$DATA_ROOT/hf"
129129+export TORCH_HOME="$DATA_ROOT/torch"
130130+131131+# App paths, read from env, all defaulting under DATA_ROOT
132132+export DUCKDB_PATH="$DATA_ROOT/duckdb/trust.duckdb" # primary data store
133133+export PYG_ROOT="$DATA_ROOT/pyg" # PyTorch Geometric processed-dataset cache
134134+export STAGING_DIR="$DATA_ROOT/staging" # Jetstream backfill dumps (NDJSON/Parquet)
135135+export DIFF_CORPUS_DIR="$DATA_ROOT/diffs" # cached PR diffs/patches for eval and training
136136+export MODEL_DIR="$DATA_ROOT/models" # GraphSAGE + LightGBM checkpoints, calibrators
137137+export LOG_DIR="$DATA_ROOT/logs"
138138+```
139139+140140+What this covers, by component:
141141+142142+- **The DuckDB file (`DUCKDB_PATH`):** the entire data store (event log, curated tables, features, scores) is one file on the drive, so the bulk of the data is on the external drive by design.
143143+- **Python venv and pip/uv cache:** the torch and torch-geometric wheels are the largest local cost; both the environment and the download cache live on the drive.
144144+- **PyG dataset cache (`PYG_ROOT`) and checkpoints (`MODEL_DIR`):** the GNN's cached graph tensors and saved weights from offline training.
145145+- **Hugging Face / sentence-transformers / torch-hub caches:** any local embedding model for the diff-similarity path (DuckDB VSS).
146146+- **`STAGING_DIR`:** the raw Jetstream backfill, written as NDJSON or Parquet before it is loaded into DuckDB. Transient but large during a full replay; write it to the drive and delete after load.
147147+- **`DIFF_CORPUS_DIR`:** cached PR patch text for the Claude eval fixture and any training set.
148148+149149+Rules:
150150+151151+- Every component reads these from env and must default its large-output paths under `DATA_ROOT`. Do not hardcode repo-relative or home-relative paths for anything that grows, including the DuckDB file.
152152+- At process startup, assert `DATA_ROOT` exists and is writable, and fail fast with a clear message if the drive is not mounted, so a half-run never scatters files (or the DuckDB file) onto the system disk.
153153+- Only the repo and the small `.env` (the Anthropic API key) stay on the main disk. The data store, the venv, and all caches are on the drive.
154154+- A USB external drive is slower than internal SSD, so DuckDB queries, PyG dataset processing, and disk-heavy steps run somewhat slower. At hackathon-scale data this is fine; keep the drive mounted for the whole run.
155155+156156+---
157157+158158+## 5. Build order (build in this exact order; each milestone must run before the next)
159159+160160+- **M0 - Set up the local stack.** Mount the external drive, source `.envrc`, create the venv and the DuckDB file under `DATA_ROOT`, install dependencies. Verify `DATA_ROOT` is writable. No services to provision.
161161+- **M1 - Ingest.** Jetstream to DuckDB with a persisted cursor and historical backfill; a step derives typed rows from the raw event log. Confirm the exact Tangled collection names (6.1). Goal: events landing in DuckDB, resumable after a crash.
162162+- **M2 - Dataset.** Reconstruct PR lifecycles, mine the clean-merge label, build per-DID features as DuckDB SQL views or a batch job (6.2, 6.3).
163163+- **M3 - Structural baseline + end-to-end demo.** EigenTrust over the `vouches` table, a `/score/{did}` endpoint, and the triage queue plus leaderboard screens (section 7). After M3 you have a working, sybil-resistant, demoable system with zero ML training.
164164+- **M3.5 - Observability.** The dashboard screen (section 7) reading `/metrics`: trust-score distribution, fast-lane rate, false-approval budget, vouch-graph stats, and ingest lag. Operational telemetry (events/sec, API latency, Claude cost) goes to Prometheus + Grafana, not this screen. Low effort, directly on-theme, and your demo backdrop.
165165+- **M4 - Content layer + decisions.** Claude review component and the fusion gate (6.6, 6.7), optionally enriched with the code-security and supply-chain findings (6.12) as structured input to the reviewer. Now you have the full hybrid: EigenTrust + Claude.
166166+- **M5 - Learned score.** LightGBM on the features (with the EigenTrust score as a feature), calibrated (6.5, 6.8).
167167+- **M6 - GNN upgrade (stretch).** GraphSAGE trained offline; serve inference in-process; compare against M5. Ship only if it beats the baseline and is stable.
168168+- **M7 - Surfaces (stretch).** Write assessments back as AT Proto records (6.11), add the Tangled-native browser-extension overlay (section 7), the attestation-gated sensitive-repo tier (6.13), and/or an ElevenLabs voice briefing on the API.
169169+170170+The GNN is M6 on purpose: on a new, sparsely vouched network it will likely not beat M5 and is the most likely thing to break mid-demo. Always have M4 working first.
171171+172172+---
173173+174174+## 6. Component specs
175175+176176+### 6.1 Ingestion (Jetstream to DuckDB)
177177+178178+Connect a websocket to a public Jetstream instance, filtered server-side to only the collections you need:
179179+180180+```
181181+wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=sh.tangled.*
182182+```
183183+184184+- Also subscribe to `app.bsky.graph.*` if you want the cross-ATmosphere social signal (follower graph, account age).
185185+- **Confirm the exact NSIDs. Do NOT hardcode guesses.** The Tangled lexicons live in the `tangled.org/tangled.org/core` repo; read them, and log a sample of live Jetstream events to see the real `collection` values for pull requests, vouches, CI/pipeline ("spindle") runs, issues, comments, and stars. Known facts: Tangled records live under `sh.tangled.*`; vouch/denounce records are public records on the issuer's PDS and each carries a reason; CI emits pull_request / push / manual pipeline events. Verify everything else against source.
186186+- **Writer:** DuckDB is single-writer and OLAP, so do NOT insert row-by-row from the socket handler. Buffer events in memory and append in batches to the `events` table (or stage them as Parquet under `STAGING_DIR` and load). A single ingester process owns the write path; everything else reads. A derive step turns the raw log into the typed tables (contributors, vouches, pull_requests).
187187+- **Cursor:** persist the `time_us` of the last processed event in an `ingest_state` row (or a small cursor file under `DATA_ROOT`). On reconnect, resume from that cursor minus a few seconds for gapless playback. An absent cursor means live-tail; a past cursor backfills, which is how you build the training history. This cursor plus the durable `events` log is the resumability that a broker would otherwise provide.
188188+- **Account and Identity events** arrive regardless of the collection filter; use Identity events to refresh a DID's handle and document.
189189+- Each event gives `did`, `time_us`, and a `commit` with `operation` (create/update/delete), `collection`, `rkey`, and the JSON `record`.
190190+191191+### 6.2 Data model (all in the DuckDB file)
192192+193193+Every table lives in the single DuckDB file at `$DUCKDB_PATH` on the external drive.
194194+195195+- `events(did, time_us, operation, collection, rkey, record JSON, ...)` -- the raw append log, written in batches by the ingester
196196+- `contributors(did PK, handle, did_created_at, pds_host, first_seen)`
197197+- `vouches(voucher_did, subject_did, polarity int{+1,-1}, reason text, evidence_uri, created_at, weight)` -- this is the entire graph; no graph DB
198198+- `pull_requests(pr_id PK, author_did, repo, target, opened_at, ci_status, merged bool, merged_at, closed_unmerged bool, additions int, deletions int, files_touched int, diff_text, discussion_len int)`
199199+- `pr_followups(pr_id, reverted bool, patched_same_lines_within_n_days bool)`
200200+- `features` -- per-DID aggregates as a SQL view or a materialized table refreshed by a batch step (merged counts, revert rate, CI pass rate, diff-size stats, discussion length)
201201+- `scores(did, as_of, structural_trust, content_risk, calibrated_prob, decision, explanation_json)`
202202+- `ingest_state(stream, last_time_us)`
203203+204204+### 6.3 Label mining (the supervised target)
205205+206206+For each historical PR, derive a binary `clean_merge` label automatically:
207207+208208+- **1 (clean):** merged AND CI passed AND not reverted AND the same lines not patched within N days (default N = 14).
209209+- **0 (not clean):** reverted, or closed unmerged, or repeated CI failure, or a quick follow-up fix to the same lines.
210210+- Drop PRs too recent for the N-day window to have elapsed.
211211+212212+Aggregate to a per-DID signal. **Split by time, not randomly**, so you never train on the future.
213213+214214+### 6.4 Structural signal: EigenTrust (required baseline)
215215+216216+Read the edge list from the DuckDB `vouches` table, build a row-normalized sparse matrix, seed on the trusted maintainer DID(s), and run personalized power iteration:
217217+218218+```python
219219+# SELECT voucher_did, subject_did, weight FROM vouches -> build sparse C (n x n)
220220+# C[i][j] = normalized trust i places in j; rows sum to 1.
221221+# Edge weight before normalization: base 1.0, scaled up if the vouch carries
222222+# PR evidence, scaled down by age (time decay).
223223+# p: seed vector, mass on the maintainer DID(s), normalized.
224224+# alpha: restart probability ~0.15.
225225+t = p.copy()
226226+for _ in range(50):
227227+ t = (1 - alpha) * (C.T @ t) + alpha * p
228228+ t = t / t.sum()
229229+# t[did] is the structural trust; expose it as a signal and as a model feature.
230230+```
231231+232232+- **Denounces:** classic EigenTrust assumes a non-negative stochastic matrix. Keep it simple: a denounce zeroes trust into that node and is recorded as a negative node feature for the learned models. Do NOT make distrust flow transitively.
233233+- Seeding on the maintainer makes scores viewer-relative, matching Tangled's circle philosophy but propagated across the whole graph with decay.
234234+- **Path explanation:** during the run, keep the edge list in memory and do a short BFS from the seed to reconstruct the trust path for the explanation object. No graph DB.
235235+236236+### 6.5 Learned signal: LightGBM, then GraphSAGE
237237+238238+**LightGBM (M5, reliable):** predict `clean_merge` from per-DID features (read from the DuckDB `features` view). Include `eigentrust_score` as a feature so the model builds on the graph signal. Suggested features:
239239+240240+```
241241+eigentrust_score, did_age_days, merged_pr_count, revert_rate, ci_pass_rate,
242242+close_without_merge_ratio, mean_diff_size, mean_files_touched, churn,
243243+mean_discussion_len, bsky_graph_degree, bsky_account_age, denounce_count
244244+```
245245+246246+Trains in seconds, resists overfitting at small N far better than a net, and gives SHAP explanations. Save the model and calibrator under `MODEL_DIR`. Calibrate the output (6.8).
247247+248248+**GraphSAGE GNN (M6, stretch upgrade):** an inductive node-classification model.
249249+250250+```python
251251+# nodes: contributors, with the feature vector above as node features x
252252+# edges: built from the vouches table into a PyG edge_index tensor (positive,
253253+# weighted), plus co-contribution edges; no graph DB involved
254254+# task: node-level binary classification against clean_merge
255255+# model: GraphSAGE, 2 layers, hidden 64, out 1; neighbor sampling (inductive)
256256+# train OFFLINE: BCEWithLogitsLoss on labeled nodes, temporal split;
257257+# checkpoints + PyG cache under MODEL_DIR / PYG_ROOT on the drive
258258+# serve inference in-process: sigmoid(logit) -> structural_trust_gnn
259259+```
260260+261261+- Use the inductive variant so it generalizes to unseen contributors (cold start).
262262+- **Signed edges:** either use a signed GNN (SignedGCN) or, simpler, keep the GNN on positive vouch edges and pass denounce-count as a node feature.
263263+- GNN explanations are weak; the human-facing explanation stays the SHAP factors and/or the EigenTrust path plus Claude's rationale.
264264+265265+### 6.6 Content signal: Claude review
266266+267267+Assesses ONE PR's actual content. **Cost gate:** do not call the expensive model on every PR.
268268+269269+- `structural_trust >= T_HIGH`: skip the Sonnet review unless the diff touches security-sensitive paths.
270270+- `T_LOW <= structural_trust < T_HIGH` (ambiguous band): run the review. This is where Claude earns its keep.
271271+- `structural_trust < T_LOW`: run the review to attach a concrete reason for the human.
272272+- Optionally run a 1-call Haiku pre-pass everywhere to decide whether a Sonnet review is warranted.
273273+274274+**Input:** the diff, PR title and description, and discussion text, truncated to a token budget. **No author identity, handle, or history.**
275275+276276+**Model:** `claude-sonnet-4-6`, temperature 0, output forced to the JSON schema via tool use.
277277+278278+**System prompt for this component (use verbatim):**
279279+280280+```
281281+You are a code-contribution reviewer for an open-source trust system. You assess ONE
282282+pull request's actual content for quality and safety. You do not decide whether to
283283+merge; you produce a structured risk assessment that a separate policy layer combines
284284+with an identity-trust signal.
285285+286286+Hard rules:
287287+- Judge only the artifact in front of you: the diff, the PR title and description, and
288288+ the discussion. You are given NO information about the author's identity, reputation,
289289+ or history, and you must not speculate about it. Identity trust is handled elsewhere.
290290+- Your job is to catch problems a reputation signal cannot see: code that looks correct
291291+ but is subtly wrong, plausible-looking machine-generated filler ("slop"),
292292+ security-sensitive changes, leaked secrets or credentials, license violations, and
293293+ changes whose stated intent does not match what the code does.
294294+- Prefer flagging uncertainty over approving. If the diff is large, unclear, or you
295295+ cannot verify correctness, say so and set review_recommended. Never rubber-stamp.
296296+- Be specific. Every flag must point to concrete lines or patterns, not vibes.
297297+- Output ONLY the structured object specified by the tool. No prose outside it.
298298+```
299299+300300+**Output schema (tool use):**
301301+302302+```json
303303+{
304304+ "content_risk": "float 0.0 (clearly safe/trivial) to 1.0 (clearly broken or dangerous)",
305305+ "flags": [
306306+ {
307307+ "type": "subtle_bug | slop | security | secret_leak | license | intent_mismatch | untested | oversized | other",
308308+ "severity": "low | med | high",
309309+ "location": "file and/or line reference",
310310+ "explanation": "concrete reason tied to the code"
311311+ }
312312+ ],
313313+ "summary": "1-3 sentence plain-language rationale, suitable to read aloud to a maintainer",
314314+ "review_recommended": "boolean"
315315+}
316316+```
317317+318318+### 6.7 Fusion and decision policy (a gate, not an average)
319319+320320+```python
321321+def decide(structural_trust, content, cfg):
322322+ # structural_trust: calibrated P(clean) in [0,1]
323323+ # content: dict from 6.6, or None if no Claude call was made
324324+ risk = 0.0 if content is None else content["content_risk"]
325325+ review = False if content is None else content["review_recommended"]
326326+ high_flag = bool(content) and any(f["severity"] == "high" for f in content["flags"])
327327+328328+ if structural_trust < cfg.T_LOW or risk >= cfg.R_HIGH or high_flag:
329329+ return "needs_human", build_reason(structural_trust, content)
330330+ if structural_trust >= cfg.T_HIGH and risk <= cfg.R_LOW and not review:
331331+ return "fast_lane", build_reason(structural_trust, content)
332332+ return "normal_queue", build_reason(structural_trust, content)
333333+```
334334+335335+- **Displayed score:** start from the calibrated structural P(clean), then penalize for content flags. A low structural score can never be lifted into fast-lane by clean-looking content (constraint 2).
336336+- Thresholds are config. Set `T_HIGH` from calibration so the historical false-approval rate above it stays under your chosen budget. Write every decision to the DuckDB `scores` table.
337337+338338+### 6.8 Calibration
339339+340340+Hold out a time-based split. Fit isotonic regression (or Platt scaling) mapping the raw model score to an empirical P(clean). Report a reliability curve. The fast-lane threshold then corresponds to a concrete false-approval budget.
341341+342342+### 6.9 Explainability
343343+344344+Emit a structured `explanation` per score: the top SHAP feature contributions (LightGBM) or the dominant EigenTrust path from the in-memory BFS ("vouched by trusted maintainers X, Y; 34 merged PRs; 0 reverts"), plus Claude's `summary` and any flags when a review ran. This is also what a voice layer would read aloud.
345345+346346+### 6.10 API and runtime
347347+348348+Run as plain Python processes.
349349+350350+- `GET /score/{did}` -> `{ calibrated_prob, structural_trust, content_risk?, decision, explanation, top_factors }`
351351+- `POST /review/pr` -> body `{ diff, title, description, discussion }`, runs 6.6, returns the schema object.
352352+- `GET /leaderboard` -> contributors ranked by calibrated_prob.
353353+- `GET /metrics` -> aggregate JSON for the dashboard: score distribution, fast-lane rate, false-approval rate, vouch-graph stats, ingest lag. The UI (section 7) renders it; the API serves JSON only.
354354+- A scoring worker (a separate process or a loop) picks up new PR records (poll the `events` table for unprocessed PRs, or have the ingester hand them off in-process), runs `decide(...)`, and writes results to `scores`. No message broker.
355355+- Optionally cache hot scores in-process; no separate cache service.
356356+- For a hosted demo, package the processes into a single container or run them on one small VM.
357357+358358+### 6.11 AT Proto-native output (stretch, but what the judges reward)
359359+360360+Give the service its own DID. Write each assessment back as a public record on its PDS (its own lexicon, referencing the PR's `at://` URI), so verdicts are auditable provenance on the network, not rows in a private file. Consume state from the firehose; emit state as records. This is the difference between a native ATProto integration and a tool that happens to read Tangled.
361361+362362+### 6.12 External data sources (additional signals)
363363+364364+All of these are public and either contribution-based or track-record-based, fetched on demand and cached. None requires probing a contributor or correlating identity. Each is a weak, advisory feature, never a determination.
365365+366366+Code-security and supply-chain (feed the content-risk signal in 6.6 and the gate in 6.7). This targets the malware half of the brief that the trust graph alone does not cover:
367367+368368+- Vulnerability databases: cross-reference every dependency a PR adds or bumps against OSV.dev, the GitHub Advisory Database, and NVD/CVE, through an index like deps.dev.
369369+- Malicious-package and typosquat signals: flag dependencies that are newly published, low-download, or near-misses of popular names (the classic supply-chain shape), using registry publish age and download stats.
370370+- Secret scanning on the diff (gitleaks or betterleaks) for leaked keys and credentials.
371371+- SAST on the diff (Semgrep rules or CodeQL) for dangerous constructs.
372372+- License data (SPDX) on added files and dependencies, for license violations.
373373+374374+Hand these machine findings to the Claude reviewer (6.6) as structured input, so it reasons over concrete evidence instead of judging code in a vacuum.
375375+376376+Verifiable track record (feed the structural features in 6.5). Strong and hard to fake, but use only links the contributor publicly declares; inferring an undeclared one is the deanonymization line in 6.13:
377377+378378+- Package-registry maintainer history: npm, PyPI, and crates.io tenure, publish history, and download scale for packages they maintain.
379379+- OpenSSF Scorecard and repo-health metrics for repos they own.
380380+- Commit signing: verified SSH/GPG or Sigstore signatures, for cryptographic attribution provenance.
381381+382382+ATmosphere identity depth (feed the structural and DID-provenance features in 6.5). Your best native sybil signal, because the DID is shared across apps:
383383+384384+- Participation across other AT Protocol apps under the same DID (blogs, Frontpage, Smoke Signal, and others), with the age and breadth of that footprint. A DID woven through the ATmosphere for years is expensive to fake; a fresh one tied to a single app is the attacker's profile.
385385+- Verified links in the DID document: a domain-verified did:web, a DNS-verified handle, self-declared verified accounts.
386386+387387+Timezone consistency (a feature, not a location). Derive a coarse activity-timezone band from commit UTC offsets and posting times, which are already in the data, and use it only as a coherence check: a contributor whose declared context, vouch neighborhood, and commit timezone disagree is worth a second look. Never emit it as a location claim.
388388+389389+### 6.13 Provenance, jurisdiction, and repo tiering
390390+391391+The regulatory question is not "where is this contributor" but "is this contribution safe to trust," and jurisdiction, where it genuinely matters, comes from verification, not inference.
392392+393393+- Verified jurisdiction by assertion: a contributor-issued jurisdiction attestation (a signed record), a verified organizational DID with a known jurisdiction, or a domain-verified did:web on an organization domain. This is the only jurisdiction source a compliance reviewer accepts, and a VPN cannot defeat it. Inference clears neither bar (accuracy against a VPN, lawful use against non-consenting third parties), so the system does not attempt it.
394394+- Repo tiering is the actual control, mirroring how export control works by controlling the artifact and the access rather than surveilling the person:
395395+ - Public or civilian tier: open; the trust-graph triage in 6.7 is sufficient.
396396+ - Sensitive or dual-use tier: a valid jurisdiction attestation is required before a contribution can be fast-laned or merged. A missing attestation forces `needs_human` regardless of structural trust or content risk.
397397+- The weak hints in 6.12 (PDS host, DID method, handle TLD, locale, timezone) are fed to the model as features; none is treated as a jurisdiction determination.
398398+399399+This whole layer uses only what a contributor publicly declares or cryptographically asserts. The system never infers or correlates real-world identity or location (see the non-goal in section 8): no IP geolocation, no OSINT location-finding, no cross-platform profile matching, no fingerprinting, no stylometric deanonymization. That is both a legal constraint for an EU operator handling third-party personal data and a fit with the DID and pseudonymity model the platform rests on.
400400+401401+---
402402+403403+## 7. User-facing surfaces (UI)
404404+405405+The scoring service is the brain. Every UI is a thin client that reads the API (`/score`, `/leaderboard`, `/metrics`) and never touches the DuckDB file directly. Two surfaces ship as your own SvelteKit app; one is a native overlay.
406406+407407+Frontend stack: SvelteKit + Svelte 5 with runes, shipped via `@sveltejs/adapter-node`. UI kit bits-ui; styling Lightning CSS with the six-layer cascade (`@layer reset, tokens, base, components, utilities, overrides`); icons unplugin-icons + iconify; charts layerchart; tables tanstack/table-core; toasts svelte-sonner; validation zod. Server state via tanstack query is optional at this scale.
408408+409409+### 7.1 Triage queue (the product, route `/`)
410410+411411+The maintainer's open PRs across their repos, grouped by decision into fast-lane, needs review, and flagged. Each row shows the contributor avatar and handle, the PR title with repo and number, the calibrated score as a pill colored by decision (success / warning / danger), and a one-line reason. Rows expand to the breakdown from the explanation object (6.9): the structural side (the EigenTrust path and top factors) and the content side (Claude's flags and summary). Render the list with tanstack/table-core, sortable and filterable by repo and bucket, with a metric-card strip on top (open, fast-lane, needs review, flagged). Per-row actions: approve a fast-lane row, or pull one into your review anyway. Approving can call Tangled's API to merge, or simply record the action. The decision and the reason come straight from the gate (6.7); the UI renders them, it does not decide.
412412+413413+### 7.2 Observability dashboard (route `/dashboard`, milestone M3.5)
414414+415415+The trust view and your demo backdrop, reading `/metrics`: a score-distribution histogram (layerchart), the fast-lane rate, the false-approval rate from the backtest, vouch-graph stats (contributors, edges, seed), and ingest lag. Keep operational telemetry off this screen. Events per second, API latency, and Claude call cost and latency go to Prometheus + Grafana from your self-hosted stack, and Langfuse can trace the Claude review calls for per-call eval.
416416+417417+### 7.3 Leaderboard (route `/leaderboard`)
418418+419419+Contributors ranked by calibrated trust, the playful nod to the Tangled push-leaderboard tradition. tanstack/table-core, sortable. Cheap to build and good demo candy.
420420+421421+### 7.4 Tangled-native overlay (stretch, the native surface)
422422+423423+A thin browser extension whose content script injects the trust hat and Claude's note onto tangled.org PR and contributor pages, reading the same `/score` API client-side. It is UI only; the brain stays in the service. Build it as a minimal content script with your TS toolchain (Bun build, oxlint and oxfmt, zod to parse the response). This lands inline placement without waiting on Tangled to merge anything. The upstream version, Tangled's own appview rendering third-party trust records natively, is the vision, not the build; ask Lewis whether the appview can render trust records authored by other DIDs.
424424+425425+Build placement: the triage queue and leaderboard land with M3 once `/score` exists, the dashboard with M3.5, and the extension overlay with M7.
426426+427427+---
428428+429429+## 8. Guardrails and non-goals
430430+431431+Do:
432432+- Keep the structural signal sybil-resistant and load-bearing.
433433+- Keep Claude blind to author identity; combine via the gate, not an average.
434434+- Calibrate the score and tie the threshold to a false-approval budget.
435435+- Confirm Tangled NSIDs from source and live stream; never hardcode guesses.
436436+- Keep the stack serviceless and embedded: one DuckDB file on the external drive, plain processes; resumability comes from the Jetstream cursor plus the raw event log, not a message broker.
437437+- Run Claude at temperature 0 with forced schema, and gate calls by cost.
438438+- Keep the brain in the scoring service; the SvelteKit UI and the extension are thin clients that read the API, never the DuckDB file.
439439+- Write every large artifact (the DuckDB file, venv, caches, staging, checkpoints, diffs) under `DATA_ROOT` on the external drive; never on the home or system disk, and fail fast if the drive is not mounted.
440440+- For jurisdiction where it genuinely matters, require a contributor-issued attestation or verified DID, never an inference, and gate sensitive-tier repos on it (6.13).
441441+442442+Do not:
443443+- Add a graph database. Edges are rows; graph compute is in-memory (SciPy / PyG).
444444+- Add a message broker, a separate database server, or a managed cloud data service; the embedded store is enough at this scale.
445445+- Train the GNN online; train it offline and serve inference in-process.
446446+- Block, ban, or punish users; this system informs and routes only.
447447+- Infer or correlate real-world identity or location: no IP geolocation, no OSINT location-finding, no cross-platform profile matching (LinkedIn and similar), no browser or network fingerprinting, no stylometric deanonymization. Use only what a contributor publicly declares or cryptographically asserts.
448448+- Let clean content fast-lane an untrusted DID.
449449+- Make denounces propagate transitively.
450450+- Ship the GNN unless it beats the calibrated LightGBM baseline and is stable.
451451+452452+---
453453+454454+## 9. Deliverable
455455+456456+A running FastAPI scoring service backed by an embedded DuckDB store on the external drive, built from real Tangled data via Jetstream, fronted by a SvelteKit app with a triage queue, an observability dashboard, and a leaderboard, exposing calibrated and explained trust scores and fast-lane / human-review decisions, with EigenTrust + Claude working end to end (M4) before any GNN work. The browser-extension overlay onto Tangled PR pages is the stretch native surface. Include a short demo script that scores a few real contributors and shows one PR routed to a human with Claude's reason and one fast-laned on structural trust.
···11+"""Hybrid contributor trust scoring for Tangled (EigenTrust + Claude)."""
22+33+import os as _os
44+55+# lightgbm (M5) and torch (M6) each bundle their own libomp; loading both in one
66+# process hangs/crashes on macOS. Allow the duplicate and pin to one thread (all
77+# compute here is hackathon-scale). Package init is the earliest reliable point —
88+# it runs before any submodule imports lightgbm or torch.
99+# ponytail: env guard over subprocess isolation; isolate the M5 baseline call in a
1010+# subprocess if this ever misbehaves on another platform.
1111+_os.environ.setdefault("OMP_NUM_THREADS", "1")
1212+_os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE")
+182
src/trust/api.py
···11+"""M3/M4 API + built-in surfaces (PRD 6.10, section 7).
22+33+The scoring service is the brain; the UIs are thin clients that read these JSON
44+endpoints, never the DuckDB file. Serves the three section-7 surfaces as static
55+pages (PRD blesses a built-in dashboard; same laziness applies to the others).
66+ponytail: built-in HTML over a separate SvelteKit/Bun stack; swap if the native
77+overlay (M7) or richer UI is needed.
88+"""
99+1010+from __future__ import annotations
1111+1212+import json
1313+from contextlib import asynccontextmanager
1414+from pathlib import Path
1515+1616+from fastapi import Depends, FastAPI, HTTPException
1717+from fastapi.middleware.cors import CORSMiddleware
1818+from fastapi.responses import FileResponse
1919+from fastapi.staticfiles import StaticFiles
2020+from pydantic import BaseModel
2121+2222+from .config import CFG
2323+from .db import connection, ensure_schema
2424+from . import eigentrust, fusion, review as review_mod
2525+2626+2727+@asynccontextmanager
2828+async def lifespan(app):
2929+ ensure_schema() # create tables/view once; readers below open read-only
3030+ yield
3131+3232+3333+app = FastAPI(title="Tangled trust scoring", lifespan=lifespan)
3434+# ponytail: permissive CORS on a local read-only service so the tangled.org overlay
3535+# (7.4) can read /score client-side. Lock to the extension origin for a hosted demo.
3636+app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["GET"], allow_headers=["*"])
3737+STATIC = Path(__file__).parent / "static"
3838+3939+4040+def get_con():
4141+ # Short-lived read-only connection per request so the score-loop writer can interleave.
4242+ with connection(read_only=True) as con:
4343+ yield con
4444+4545+4646+def _eigen(con):
4747+ # ponytail: recompute per request; few-thousand-edge graph -> sub-ms. Cache if it grows.
4848+ return eigentrust.compute(con)
4949+5050+5151+@app.get("/score/{did}")
5252+def score(did: str, con=Depends(get_con)):
5353+ er = _eigen(con)
5454+ feats = fusion._features_for(con, did)
5555+ structural, model_factors = fusion.structural_for(did, er, feats) # M5 calibrated, or raw EigenTrust
5656+ latest = con.execute(
5757+ "SELECT content_risk, calibrated_prob, decision, explanation_json FROM scores "
5858+ "WHERE did=? ORDER BY as_of DESC LIMIT 1", [did]
5959+ ).fetchone()
6060+ content_risk = latest[0] if latest else None
6161+ decision = latest[2] if latest else fusion.decide(structural, None)
6262+ reason = json.loads(latest[3]) if latest else fusion.build_reason(
6363+ did, structural, None, er, feats, model_factors)
6464+ prob = latest[1] if latest else structural
6565+ return {"did": did, "structural_trust": structural, "content_risk": content_risk,
6666+ "calibrated_prob": prob, "decision": decision, "explanation": reason,
6767+ "top_factors": reason.get("top_factors", [])}
6868+6969+7070+class ReviewBody(BaseModel):
7171+ diff: str
7272+ title: str = ""
7373+ description: str = ""
7474+ discussion: str = ""
7575+7676+7777+@app.post("/review/pr")
7878+def review(body: ReviewBody):
7979+ out = review_mod.review_pr(body.diff, body.title, body.description, body.discussion)
8080+ if out is None:
8181+ raise HTTPException(503, f"set {CFG.review.api_key_env} to enable Claude review")
8282+ return out
8383+8484+8585+@app.get("/leaderboard")
8686+def leaderboard(limit: int = 50, con=Depends(get_con)):
8787+ er = _eigen(con)
8888+ handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
8989+ ranked = sorted(er.trust.items(), key=lambda kv: kv[1], reverse=True)[:limit]
9090+ return [{"did": d, "handle": handles.get(d), "calibrated_prob": round(t, 4),
9191+ "decision": fusion.decide(t, None)} for d, t in ranked]
9292+9393+9494+@app.get("/triage")
9595+def triage(con=Depends(get_con)):
9696+ """Open PRs grouped by decision, with the explanation breakdown (section 7.1)."""
9797+ er = _eigen(con)
9898+ handles = dict(con.execute("SELECT did, handle FROM contributors").fetchall())
9999+ rows = con.execute(
100100+ "SELECT pr_id, author_did, repo FROM pull_requests WHERE NOT merged AND NOT closed_unmerged"
101101+ ).fetchall()
102102+ out = []
103103+ for pr_id, did, repo in rows:
104104+ structural = er.trust.get(did, 0.0)
105105+ latest = con.execute(
106106+ "SELECT calibrated_prob, decision, explanation_json FROM scores WHERE did=? "
107107+ "ORDER BY as_of DESC LIMIT 1", [did]
108108+ ).fetchone()
109109+ prob = latest[0] if latest else structural
110110+ decision = latest[1] if latest else fusion.decide(structural, None)
111111+ reason = json.loads(latest[2]) if latest else fusion.build_reason(
112112+ did, structural, None, er, fusion._features_for(con, did))
113113+ out.append({"pr_id": pr_id, "repo": repo, "handle": handles.get(did), "did": did,
114114+ "calibrated_prob": round(prob, 4), "decision": decision, "explanation": reason})
115115+ return out
116116+117117+118118+@app.get("/metrics")
119119+def metrics(con=Depends(get_con)):
120120+ """Aggregates for the dashboard (PRD 6.10 / 7.2). JSON only; the UI renders it."""
121121+ er = _eigen(con)
122122+ dist = [0] * 10
123123+ for t in er.trust.values():
124124+ dist[min(int(t * 10), 9)] += 1
125125+126126+ decisions = {"fast_lane": 0, "normal_queue": 0, "needs_human": 0}
127127+ for t in er.trust.values():
128128+ decisions[fusion.decide(t, None)] += 1
129129+ total = max(sum(decisions.values()), 1)
130130+131131+ # False-approval backtest: of historical PRs whose author is fast-lane-eligible
132132+ # (structural >= T_HIGH), what fraction were NOT clean_merge? (PRD 6.8)
133133+ fa_total = fa_bad = 0
134134+ for did, clean_rate in con.execute(
135135+ "SELECT did, clean_merge_rate FROM features WHERE clean_merge_rate IS NOT NULL"
136136+ ).fetchall():
137137+ if er.trust.get(did, 0.0) >= CFG.gate.T_HIGH:
138138+ fa_total += 1
139139+ if clean_rate < 1.0:
140140+ fa_bad += 1
141141+ cur = con.execute("SELECT last_time_us FROM ingest_state WHERE stream='jetstream'").fetchone()
142142+ return {
143143+ "score_distribution": dist,
144144+ "decisions": decisions,
145145+ "fast_lane_rate": decisions["fast_lane"] / total,
146146+ "false_approval_rate": (fa_bad / fa_total) if fa_total else None,
147147+ "vouch_graph": {
148148+ "contributors": con.execute("SELECT count(*) FROM contributors").fetchone()[0],
149149+ "edges": con.execute("SELECT count(*) FROM vouches").fetchone()[0],
150150+ "seeds": len(er.seeds),
151151+ },
152152+ "ingest_last_time_us": cur[0] if cur else None,
153153+ }
154154+155155+156156+# --- static surfaces -------------------------------------------------------
157157+@app.get("/")
158158+def root():
159159+ return FileResponse(STATIC / "triage.html")
160160+161161+162162+@app.get("/dashboard")
163163+def dashboard():
164164+ return FileResponse(STATIC / "dashboard.html")
165165+166166+167167+@app.get("/leaderboard.html")
168168+def leaderboard_page():
169169+ return FileResponse(STATIC / "leaderboard.html")
170170+171171+172172+app.mount("/static", StaticFiles(directory=STATIC), name="static")
173173+174174+175175+def main() -> None:
176176+ import uvicorn
177177+178178+ uvicorn.run(app, host="127.0.0.1", port=8000)
179179+180180+181181+if __name__ == "__main__":
182182+ main()
+130
src/trust/atproto.py
···11+"""M7 AT-Proto-native output (PRD 6.11): publish each assessment as a public record.
22+33+The service has its own DID/PDS account; it consumes state from the firehose and
44+emits state as records on the network, so verdicts are auditable provenance rather
55+than rows in a private file. Records use our own lexicon, `sh.tangled.trust.score`.
66+77+Credentials via env (only these touch the network):
88+ ATPROTO_PDS PDS base URL (e.g. https://pds.example)
99+ ATPROTO_IDENTIFIER the service handle or DID
1010+ ATPROTO_PASSWORD an app password
1111+No password -> automatic dry-run (prints the records it would publish), so this is
1212+demoable and testable without an account.
1313+"""
1414+1515+from __future__ import annotations
1616+1717+import argparse
1818+import json
1919+import os
2020+2121+from .config import LOG_DIR # noqa: F401 (keeps DATA_ROOT import side effects consistent)
2222+from .db import connection
2323+2424+LEXICON = "sh.tangled.trust.score"
2525+2626+2727+def build_record(row: dict) -> dict:
2828+ """One score row -> an `sh.tangled.trust.score` record (PRD 6.11 lexicon)."""
2929+ reason = row.get("explanation") or {}
3030+ summary = reason.get("compliance_block") or reason.get("content_summary") \
3131+ or "; ".join(reason.get("top_factors", [])[:2]) or "structural trust assessment"
3232+ return {
3333+ "$type": LEXICON,
3434+ "subject": row["did"], # the assessed contributor (a DID is the native key)
3535+ "calibratedProb": round(float(row["calibrated_prob"]), 4),
3636+ "decision": row["decision"],
3737+ "structuralTrust": round(float(row["structural_trust"]), 4),
3838+ "contentRisk": row.get("content_risk"),
3939+ "summary": summary[:280],
4040+ "createdAt": str(row["as_of"]),
4141+ }
4242+4343+4444+def _latest_unpublished(con, limit: int) -> list[dict]:
4545+ rows = con.execute(
4646+ "SELECT s.did, s.as_of, s.structural_trust, s.content_risk, s.calibrated_prob, "
4747+ " s.decision, s.explanation_json "
4848+ "FROM scores s "
4949+ "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
5050+ " ON s.did = l.did AND s.as_of = l.m "
5151+ "WHERE NOT EXISTS (SELECT 1 FROM published_records p WHERE p.did=s.did AND p.as_of=s.as_of) "
5252+ "LIMIT ?", [limit]
5353+ ).fetchall()
5454+ return [{"did": r[0], "as_of": r[1], "structural_trust": r[2], "content_risk": r[3],
5555+ "calibrated_prob": r[4], "decision": r[5], "explanation": json.loads(r[6] or "{}")}
5656+ for r in rows]
5757+5858+5959+def _session(pds: str, identifier: str, password: str) -> tuple[str, str]:
6060+ import httpx
6161+6262+ r = httpx.post(f"{pds}/xrpc/com.atproto.server.createSession",
6363+ json={"identifier": identifier, "password": password}, timeout=30)
6464+ r.raise_for_status()
6565+ d = r.json()
6666+ return d["accessJwt"], d["did"]
6767+6868+6969+def _create_record(pds: str, jwt: str, repo_did: str, record: dict) -> str:
7070+ import httpx
7171+7272+ r = httpx.post(f"{pds}/xrpc/com.atproto.repo.createRecord",
7373+ headers={"Authorization": f"Bearer {jwt}"},
7474+ json={"repo": repo_did, "collection": LEXICON, "record": record}, timeout=30)
7575+ r.raise_for_status()
7676+ return r.json()["uri"]
7777+7878+7979+def publish(dry_run: bool = False, limit: int = 100) -> list[dict]:
8080+ pds = os.environ.get("ATPROTO_PDS")
8181+ ident = os.environ.get("ATPROTO_IDENTIFIER")
8282+ pw = os.environ.get("ATPROTO_PASSWORD")
8383+ live = bool(pds and ident and pw) and not dry_run
8484+8585+ with connection(read_only=not live) as con:
8686+ rows = _latest_unpublished(con, limit)
8787+ out = []
8888+ jwt = repo_did = None
8989+ if live:
9090+ jwt, repo_did = _session(pds, ident, pw)
9191+ for row in rows:
9292+ rec = build_record(row)
9393+ if live:
9494+ uri = _create_record(pds, jwt, repo_did, rec)
9595+ con.execute("INSERT INTO published_records VALUES (?,?,?) "
9696+ "ON CONFLICT DO NOTHING", [row["did"], row["as_of"], uri])
9797+ else:
9898+ uri = None
9999+ out.append({"uri": uri, "record": rec})
100100+ return out
101101+102102+103103+def main() -> None:
104104+ ap = argparse.ArgumentParser(description="publish trust assessments as AT-Proto records")
105105+ ap.add_argument("--dry-run", action="store_true", help="print records without publishing")
106106+ ap.add_argument("--limit", type=int, default=100)
107107+ args = ap.parse_args()
108108+ results = publish(dry_run=args.dry_run, limit=args.limit)
109109+ mode = "published" if (results and results[0]["uri"]) else "dry-run (no creds or --dry-run)"
110110+ for r in results:
111111+ print(f" {r['uri'] or '(dry-run)'} {r['record']['decision']:<12} "
112112+ f"{r['record']['calibratedProb']} {r['record']['subject']}")
113113+ print(f"[publish] {len(results)} assessments, {mode}")
114114+115115+116116+def demo() -> None:
117117+ """Self-check: a score row builds a valid record (no network)."""
118118+ row = {"did": "did:plc:carol", "as_of": "2026-06-25T00:00:00Z", "structural_trust": 1.0,
119119+ "content_risk": None, "calibrated_prob": 1.0, "decision": "fast_lane",
120120+ "explanation": {"top_factors": ["trust reaches did:plc:carol via maintainer -> alice"]}}
121121+ rec = build_record(row)
122122+ assert rec["$type"] == LEXICON and rec["subject"] == "did:plc:carol"
123123+ assert rec["decision"] == "fast_lane" and 0 <= rec["calibratedProb"] <= 1
124124+ assert rec["summary"] and rec["createdAt"]
125125+ print(json.dumps(rec, indent=2))
126126+ print("ok")
127127+128128+129129+if __name__ == "__main__":
130130+ demo()
+170
src/trust/backfill.py
···11+"""Historical backfill: scrape ALL sh.tangled.* records across the network, once.
22+33+`trust.ingest` is the live tail (Jetstream, ~5s replay). This is the history:
44+enumerate every repo holding a sh.tangled.* collection via the relay's
55+listReposByCollection, pull each repo's records with listRecords (JSON, no CAR
66+parsing), archive them raw to the `events` table, and feed them through the SAME
77+ingest.derive() -> contributors / vouches / pull_requests. Then train as usual
88+(`python -m trust.learned`, `python -m trust.gnn`).
99+1010+Storage: everything goes through connection() -> DUCKDB_PATH under DATA_ROOT, so
1111+point DATA_ROOT at the external drive (`export DATA_ROOT=/Volumes/<drive>`); the
1212+writability assert in ensure_data_root() fails fast if it isn't mounted.
1313+1414+Resumability: derive() upserts / inserts ON CONFLICT DO NOTHING, so re-running is
1515+idempotent — just run it again to resume. ponytail: idempotent writes instead of a
1616+checkpoint table; add a per-DID cursor table only if a full run gets too slow to repeat.
1717+1818+Confirm record shapes FIRST: `python -m trust.backfill --sample` prints real
1919+records so you can verify the field names derive() assumes (the config NSID map and
2020+field guesses are flagged unconfirmed). If pulls turn out to be thin pointers
2121+(diff/CI live on the knot), that's a second fetch — don't build it until --sample
2222+proves it's needed (YAGNI).
2323+"""
2424+2525+from __future__ import annotations
2626+2727+import argparse
2828+import json
2929+import time
3030+import urllib.error
3131+import urllib.parse
3232+import urllib.request
3333+3434+from .db import connection, ensure_schema
3535+from .ingest import derive
3636+3737+# listReposByCollection lives on the relay; listRecords on each repo's PDS.
3838+RELAY = "https://relay1.us-west.bsky.network"
3939+PLC = "https://plc.directory"
4040+4141+# Collections we know how to derive (must hit a COLLECTION_KINDS substring in config).
4242+COLLECTIONS = ["sh.tangled.repo.pull", "sh.tangled.graph.vouch", "sh.tangled.graph.follow"]
4343+4444+4545+def _get(url: str, tries: int = 4) -> dict:
4646+ """GET JSON with naive backoff on 429/5xx. ponytail: linear sleep, no token bucket."""
4747+ for i in range(tries):
4848+ try:
4949+ req = urllib.request.Request(url, headers={"User-Agent": "trust-backfill"})
5050+ with urllib.request.urlopen(req, timeout=30) as r:
5151+ return json.load(r)
5252+ except urllib.error.HTTPError as e:
5353+ if e.code in (429, 502, 503) and i < tries - 1:
5454+ time.sleep(2 * (i + 1))
5555+ continue
5656+ raise
5757+ return {}
5858+5959+6060+def _pds(did: str) -> str | None:
6161+ """DID -> PDS endpoint. Handles did:plc via PLC directory and did:web inline."""
6262+ if did.startswith("did:web:"):
6363+ doc = _get(f"https://{did[len('did:web:'):]}/.well-known/did.json")
6464+ else:
6565+ doc = _get(f"{PLC}/{urllib.parse.quote(did)}")
6666+ for s in doc.get("service", []):
6767+ if s.get("type") == "AtprotoPersonalDataServer":
6868+ return s["serviceEndpoint"].rstrip("/")
6969+ return None
7070+7171+7272+def _repos(collection: str):
7373+ """All DIDs holding `collection`, paginated via the relay."""
7474+ cursor = None
7575+ while True:
7676+ q = {"collection": collection, "limit": "500"}
7777+ if cursor:
7878+ q["cursor"] = cursor
7979+ page = _get(f"{RELAY}/xrpc/com.atproto.sync.listReposByCollection?{urllib.parse.urlencode(q)}")
8080+ for r in page.get("repos", []):
8181+ yield r["did"]
8282+ cursor = page.get("cursor")
8383+ if not cursor:
8484+ return
8585+8686+8787+def _records(pds: str, did: str, collection: str):
8888+ """All records of one collection in one repo, paginated via listRecords."""
8989+ cursor = None
9090+ while True:
9191+ q = {"repo": did, "collection": collection, "limit": "100"}
9292+ if cursor:
9393+ q["cursor"] = cursor
9494+ page = _get(f"{pds}/xrpc/com.atproto.repo.listRecords?{urllib.parse.urlencode(q)}")
9595+ for rec in page.get("records", []):
9696+ yield rec # {uri, cid, value}
9797+ cursor = page.get("cursor")
9898+ if not cursor:
9999+ return
100100+101101+102102+def _archive_and_derive(buf: list[tuple]) -> None:
103103+ """Durable raw log to `events` (on the external drive) + derive into typed tables.
104104+ Does NOT touch ingest_state — that cursor belongs to the live firehose, not backfill."""
105105+ if not buf:
106106+ return
107107+ with connection(read_only=False) as con:
108108+ con.executemany(
109109+ "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)",
110110+ buf,
111111+ )
112112+ derive(con, buf)
113113+114114+115115+def backfill(collections=COLLECTIONS, max_repos: int | None = None) -> dict:
116116+ ensure_schema()
117117+ counts: dict[str, int] = {}
118118+ for col in collections:
119119+ repos = records = 0
120120+ for did in _repos(col):
121121+ if max_repos and repos >= max_repos:
122122+ break
123123+ repos += 1
124124+ pds = _pds(did)
125125+ if not pds:
126126+ print(f"[backfill] {col} {did}: no PDS, skip")
127127+ continue
128128+ buf = []
129129+ for rec in _records(pds, did, col):
130130+ # time_us=0: listRecords has no firehose seq; the archive key is the (did, collection, rkey).
131131+ rkey = rec["uri"].rsplit("/", 1)[-1]
132132+ buf.append((did, 0, "create", col, rkey, json.dumps(rec["value"])))
133133+ _archive_and_derive(buf)
134134+ records += len(buf)
135135+ print(f"[backfill] {col} {did} -> {len(buf)} records")
136136+ counts[col] = records
137137+ print(f"[backfill] {col}: {records} records from {repos} repos")
138138+ return counts
139139+140140+141141+def sample(collection: str = COLLECTIONS[0], n: int = 3) -> None:
142142+ """Print real record values so you can confirm the fields derive() assumes."""
143143+ for did in _repos(collection):
144144+ pds = _pds(did)
145145+ if not pds:
146146+ continue
147147+ shown = 0
148148+ for rec in _records(pds, did, collection):
149149+ print(json.dumps(rec["value"], indent=2))
150150+ shown += 1
151151+ if shown >= n:
152152+ return
153153+154154+155155+def main() -> None:
156156+ ap = argparse.ArgumentParser(description="Backfill all sh.tangled.* history into DuckDB (under DATA_ROOT)")
157157+ ap.add_argument("--sample", action="store_true", help="print real records to confirm field shapes, write nothing")
158158+ ap.add_argument("--collection", default=None, help="restrict to one NSID (default: all known)")
159159+ ap.add_argument("--max-repos", type=int, default=None, help="cap repos per collection (smoke test)")
160160+ args = ap.parse_args()
161161+ if args.sample:
162162+ sample(args.collection or COLLECTIONS[0])
163163+ return
164164+ cols = [args.collection] if args.collection else COLLECTIONS
165165+ c = backfill(cols, max_repos=args.max_repos)
166166+ print(f"[backfill] done: {c}")
167167+168168+169169+if __name__ == "__main__":
170170+ main()
+109
src/trust/config.py
···11+"""Config: env-driven paths (all large artifacts under DATA_ROOT) + tuning knobs.
22+33+PRD section 4.1: every large file lives on the external drive, routed through
44+DATA_ROOT. We fail fast at startup if DATA_ROOT is set-but-unwritable, so a
55+half-run never scatters the DuckDB file onto the system disk.
66+"""
77+88+from __future__ import annotations
99+1010+import os
1111+from dataclasses import dataclass, field
1212+from pathlib import Path
1313+1414+# ponytail: DATA_ROOT unset in dev -> fall back to repo-local .data with a warning.
1515+# The PRD's safety property ("never scatter onto system disk if the drive is gone")
1616+# is satisfied by the writability assert below; point DATA_ROOT at /Volumes/EXT in prod.
1717+DATA_ROOT = Path(os.environ.get("DATA_ROOT") or (Path(__file__).resolve().parents[2] / ".data"))
1818+DUCKDB_PATH = Path(os.environ.get("DUCKDB_PATH") or (DATA_ROOT / "duckdb" / "trust.duckdb"))
1919+STAGING_DIR = Path(os.environ.get("STAGING_DIR") or (DATA_ROOT / "staging"))
2020+MODEL_DIR = Path(os.environ.get("MODEL_DIR") or (DATA_ROOT / "models"))
2121+LOG_DIR = Path(os.environ.get("LOG_DIR") or (DATA_ROOT / "logs"))
2222+2323+2424+_warned = False
2525+2626+2727+def ensure_data_root() -> Path:
2828+ """Create the DATA_ROOT subtree and assert it is writable. Call at startup."""
2929+ global _warned
3030+ if not os.environ.get("DATA_ROOT") and not _warned:
3131+ print(f"[config] DATA_ROOT unset -> dev fallback {DATA_ROOT} (set DATA_ROOT for the external drive)")
3232+ _warned = True
3333+ for d in (DATA_ROOT, DUCKDB_PATH.parent, STAGING_DIR, MODEL_DIR, LOG_DIR):
3434+ d.mkdir(parents=True, exist_ok=True)
3535+ probe = DATA_ROOT / ".write_probe"
3636+ try:
3737+ probe.write_text("ok")
3838+ probe.unlink()
3939+ except OSError as e:
4040+ raise SystemExit(f"DATA_ROOT not writable ({DATA_ROOT}): {e}. Is the external drive mounted?")
4141+ return DATA_ROOT
4242+4343+4444+# --- Tangled lexicon NSIDs -------------------------------------------------
4545+# PRD 6.1: CONFIRM these against tangled.org core lexicons + a live Jetstream
4646+# sample before trusting them; `python -m trust.ingest --probe` logs the real
4747+# `collection` values seen on the wire. Override via env without touching code.
4848+# Known fact: Tangled records live under `sh.tangled.*`.
4949+WANTED_COLLECTIONS = os.environ.get("WANTED_COLLECTIONS", "sh.tangled.*,app.bsky.graph.*")
5050+JETSTREAM_URL = os.environ.get(
5151+ "JETSTREAM_URL", "wss://jetstream2.us-east.bsky.network/subscribe"
5252+)
5353+5454+# collection -> our internal record kind. Patterns are substring matches on the
5555+# NSID. These are best-guess shapes; `--probe` tells you the truth. ponytail:
5656+# substring map over a lexicon parser; swap to exact NSIDs once confirmed.
5757+COLLECTION_KINDS: dict[str, str] = {
5858+ "tangled.pull": "pull_request",
5959+ "tangled.repo.pull": "pull_request",
6060+ "tangled.vouch": "vouch",
6161+ "tangled.graph.vouch": "vouch",
6262+ "tangled.denounce": "denounce",
6363+ "tangled.pipeline": "ci",
6464+ "tangled.spindle": "ci",
6565+ "tangled.issue": "issue",
6666+ "tangled.star": "star",
6767+ "tangled.attestation": "attestation", # jurisdiction attestation (6.13); CONFIRM NSID
6868+ "tangled.jurisdiction": "attestation",
6969+ "bsky.graph.follow": "follow",
7070+}
7171+7272+7373+@dataclass
7474+class GateConfig:
7575+ """Fusion gate thresholds (PRD 6.7). Tune T_HIGH from calibration so the
7676+ historical false-approval rate above it stays under the chosen budget."""
7777+7878+ T_LOW: float = 0.30 # below -> needs_human
7979+ T_HIGH: float = 0.70 # above (and content clean) -> fast_lane
8080+ R_LOW: float = 0.20 # content risk considered clean
8181+ R_HIGH: float = 0.60 # content risk considered dangerous
8282+8383+8484+@dataclass
8585+class EigenConfig:
8686+ alpha: float = 0.15 # restart probability
8787+ iters: int = 50
8888+ age_halflife_days: float = 180.0 # vouch weight time-decay
8989+ evidence_boost: float = 1.5 # vouch carrying PR evidence weighs more
9090+9191+9292+@dataclass
9393+class ReviewConfig:
9494+ model: str = os.environ.get("CLAUDE_MODEL", "claude-sonnet-4-6")
9595+ prepass_model: str = "claude-haiku-4-5-20251001"
9696+ escalate_model: str = "claude-opus-4-8"
9797+ max_diff_chars: int = 24_000 # token budget guard
9898+ api_key_env: str = "ANTHROPIC_API_KEY"
9999+100100+101101+@dataclass
102102+class Config:
103103+ gate: GateConfig = field(default_factory=GateConfig)
104104+ eigen: EigenConfig = field(default_factory=EigenConfig)
105105+ review: ReviewConfig = field(default_factory=ReviewConfig)
106106+ clean_merge_window_days: int = 14 # PRD 6.3 label-mining N
107107+108108+109109+CFG = Config()
+168
src/trust/db.py
···11+"""DuckDB store: schema, connection, the derive step, and feature/label SQL.
22+33+Single embedded file at DUCKDB_PATH (PRD 2: no graph DB, no server). The
44+ingester is the only writer; everything else reads.
55+"""
66+77+from __future__ import annotations
88+99+import time
1010+from contextlib import contextmanager
1111+1212+import duckdb
1313+1414+from .config import CFG, DUCKDB_PATH, ensure_data_root
1515+1616+SCHEMA = """
1717+CREATE TABLE IF NOT EXISTS events (
1818+ did VARCHAR, time_us BIGINT, operation VARCHAR, collection VARCHAR,
1919+ rkey VARCHAR, record JSON, ingested_at TIMESTAMP DEFAULT now()
2020+);
2121+CREATE TABLE IF NOT EXISTS contributors (
2222+ did VARCHAR PRIMARY KEY, handle VARCHAR, did_created_at TIMESTAMP,
2323+ pds_host VARCHAR, first_seen TIMESTAMP DEFAULT now()
2424+);
2525+-- vouches IS the whole graph (PRD 2): a plain edge list, no graph engine.
2626+CREATE TABLE IF NOT EXISTS vouches (
2727+ voucher_did VARCHAR, subject_did VARCHAR, polarity INTEGER DEFAULT 1,
2828+ reason VARCHAR, evidence_uri VARCHAR, created_at TIMESTAMP, weight DOUBLE DEFAULT 1.0,
2929+ PRIMARY KEY (voucher_did, subject_did)
3030+);
3131+CREATE TABLE IF NOT EXISTS pull_requests (
3232+ pr_id VARCHAR PRIMARY KEY, author_did VARCHAR, repo VARCHAR, target VARCHAR,
3333+ opened_at TIMESTAMP, ci_status VARCHAR, merged BOOLEAN, merged_at TIMESTAMP,
3434+ closed_unmerged BOOLEAN, additions INTEGER, deletions INTEGER,
3535+ files_touched INTEGER, diff_text VARCHAR, discussion_len INTEGER
3636+);
3737+CREATE TABLE IF NOT EXISTS pr_followups (
3838+ pr_id VARCHAR PRIMARY KEY, reverted BOOLEAN DEFAULT FALSE,
3939+ patched_same_lines_within_n_days BOOLEAN DEFAULT FALSE
4040+);
4141+CREATE TABLE IF NOT EXISTS scores (
4242+ did VARCHAR, as_of TIMESTAMP DEFAULT now(), structural_trust DOUBLE,
4343+ content_risk DOUBLE, calibrated_prob DOUBLE, decision VARCHAR, explanation_json JSON
4444+);
4545+CREATE TABLE IF NOT EXISTS ingest_state (stream VARCHAR PRIMARY KEY, last_time_us BIGINT);
4646+-- trusted maintainer seed set for personalized EigenTrust (PRD 6.4)
4747+CREATE TABLE IF NOT EXISTS seeds (did VARCHAR PRIMARY KEY);
4848+-- repo tiering (PRD 6.13): sensitive/dual-use repos gate fast-lane on an attestation
4949+CREATE TABLE IF NOT EXISTS repo_tiers (repo VARCHAR PRIMARY KEY, tier VARCHAR DEFAULT 'public');
5050+-- contributor-issued jurisdiction attestations (signed records); declared, never inferred
5151+CREATE TABLE IF NOT EXISTS attestations (
5252+ did VARCHAR, jurisdiction VARCHAR, method VARCHAR, created_at TIMESTAMP,
5353+ PRIMARY KEY (did, jurisdiction)
5454+);
5555+-- AT-Proto writeback (PRD 6.11): the at:// URI of each assessment published as a record
5656+CREATE TABLE IF NOT EXISTS published_records (
5757+ did VARCHAR, as_of TIMESTAMP, uri VARCHAR, PRIMARY KEY (did, as_of)
5858+);
5959+"""
6060+6161+# Per-DID feature view (PRD 6.3/6.5). eigentrust_score + bsky_* are joined in
6262+# at scoring time (computed in Python / from app.bsky events).
6363+FEATURES_VIEW = f"""
6464+CREATE OR REPLACE VIEW features AS
6565+WITH pr AS (
6666+ SELECT p.*, COALESCE(f.reverted, FALSE) AS reverted,
6767+ COALESCE(f.patched_same_lines_within_n_days, FALSE) AS patched_quick,
6868+ -- clean_merge label (PRD 6.3); NULL when too recent to have elapsed the N-day window
6969+ CASE
7070+ WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
7171+ WHEN p.merged AND p.ci_status = 'passed'
7272+ AND NOT COALESCE(f.reverted, FALSE)
7373+ AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
7474+ ELSE 0
7575+ END AS clean_merge
7676+ FROM pull_requests p LEFT JOIN pr_followups f USING (pr_id)
7777+)
7878+SELECT
7979+ c.did,
8080+ date_diff('day', c.did_created_at, now()) AS did_age_days,
8181+ COUNT(*) FILTER (WHERE pr.merged) AS merged_pr_count,
8282+ COALESCE(AVG(CASE WHEN pr.reverted THEN 1.0 ELSE 0.0 END), 0) AS revert_rate,
8383+ COALESCE(AVG(CASE WHEN pr.ci_status='passed' THEN 1.0 ELSE 0.0 END), 0) AS ci_pass_rate,
8484+ COALESCE(AVG(CASE WHEN pr.closed_unmerged THEN 1.0 ELSE 0.0 END), 0) AS close_without_merge_ratio,
8585+ COALESCE(AVG(pr.additions + pr.deletions), 0) AS mean_diff_size,
8686+ COALESCE(AVG(pr.files_touched), 0) AS mean_files_touched,
8787+ COALESCE(SUM(pr.additions + pr.deletions), 0) AS churn,
8888+ COALESCE(AVG(pr.discussion_len), 0) AS mean_discussion_len,
8989+ (SELECT COUNT(*) FROM vouches v WHERE v.subject_did = c.did AND v.polarity < 0) AS denounce_count,
9090+ AVG(pr.clean_merge) AS clean_merge_rate
9191+FROM contributors c
9292+LEFT JOIN pr ON pr.author_did = c.did
9393+GROUP BY c.did, c.did_created_at;
9494+"""
9595+9696+9797+# Per-PR clean_merge label (PRD 6.3) for supervised training; NULL when too recent.
9898+PR_LABELS_VIEW = f"""
9999+CREATE OR REPLACE VIEW pr_labels AS
100100+SELECT p.pr_id, p.author_did, p.opened_at,
101101+ CASE
102102+ WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
103103+ WHEN p.merged AND p.ci_status = 'passed'
104104+ AND NOT COALESCE(f.reverted, FALSE)
105105+ AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
106106+ ELSE 0
107107+ END AS clean_merge
108108+FROM pull_requests p LEFT JOIN pr_followups f USING (pr_id);
109109+"""
110110+111111+112112+def connect(read_only: bool = False) -> duckdb.DuckDBPyConnection:
113113+ ensure_data_root()
114114+ con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only)
115115+ return con
116116+117117+118118+def init_db(con: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyConnection:
119119+ con = con or connect()
120120+ con.execute(SCHEMA)
121121+ con.execute(FEATURES_VIEW)
122122+ con.execute(PR_LABELS_VIEW)
123123+ return con
124124+125125+126126+@contextmanager
127127+def connection(read_only: bool = False, attempts: int = 40, delay: float = 0.25):
128128+ """Short-lived connection with retry on DuckDB's cross-process file lock.
129129+130130+ DuckDB allows only one read-write process; a held lock blocks every other
131131+ open (even read-only). So long-running processes (API, score loop, ingester)
132132+ must open->work->close per operation, letting panes interleave under mprocs.
133133+ ponytail: open/close + retry over a single-writer daemon; fine at hackathon
134134+ scale, revisit if write throughput matters.
135135+ """
136136+ ensure_data_root()
137137+ con = last = None
138138+ for _ in range(attempts):
139139+ try:
140140+ con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only)
141141+ break
142142+ except duckdb.IOException as e:
143143+ last = e
144144+ time.sleep(delay)
145145+ if con is None:
146146+ raise last
147147+ try:
148148+ yield con
149149+ finally:
150150+ con.close()
151151+152152+153153+def ensure_schema() -> None:
154154+ """Create tables + the features view once (read-write), then release the lock."""
155155+ with connection(read_only=False) as con:
156156+ con.execute(SCHEMA)
157157+ con.execute(FEATURES_VIEW)
158158+ con.execute(PR_LABELS_VIEW)
159159+160160+161161+def main() -> None:
162162+ con = init_db()
163163+ tables = con.execute("SHOW TABLES").fetchall()
164164+ print(f"[db] initialised {DUCKDB_PATH} with {len(tables)} tables/views")
165165+166166+167167+if __name__ == "__main__":
168168+ main()
+141
src/trust/eigentrust.py
···11+"""M3 structural signal: EigenTrust over the vouches edge list (PRD 6.4).
22+33+Reads rows into a SciPy sparse matrix and runs personalized power iteration in
44+memory. No graph DB (PRD 2). Path explanations come from an in-memory BFS from
55+the seed, not graph-DB traversal.
66+"""
77+88+from __future__ import annotations
99+1010+import math
1111+from collections import deque
1212+from dataclasses import dataclass
1313+1414+import numpy as np
1515+from scipy import sparse
1616+1717+from .config import CFG
1818+1919+2020+@dataclass
2121+class EigenResult:
2222+ trust: dict[str, float] # did -> structural trust in [0,1] (max-normalized)
2323+ index: dict[str, int]
2424+ seeds: list[str]
2525+ _adj: dict[str, list[str]] # positive-edge adjacency for BFS paths
2626+2727+ def path_from_seed(self, did: str, max_hops: int = 4) -> list[str]:
2828+ """Shortest positive-vouch path seed -> did, for the explanation (PRD 6.4)."""
2929+ if did in self.seeds:
3030+ return [did]
3131+ seen = set(self.seeds)
3232+ q: deque[list[str]] = deque([[s] for s in self.seeds])
3333+ while q:
3434+ path = q.popleft()
3535+ if len(path) > max_hops:
3636+ continue
3737+ for nxt in self._adj.get(path[-1], ()):
3838+ if nxt in seen:
3939+ continue
4040+ if nxt == did:
4141+ return path + [nxt]
4242+ seen.add(nxt)
4343+ q.append(path + [nxt])
4444+ return []
4545+4646+4747+def _age_weight(created_at, now_us: float) -> float:
4848+ if created_at is None:
4949+ return 1.0
5050+ try:
5151+ age_days = (now_us - created_at.timestamp()) / 86400.0
5252+ except (AttributeError, TypeError):
5353+ return 1.0
5454+ return 0.5 ** (max(age_days, 0) / CFG.eigen.age_halflife_days)
5555+5656+5757+def compute(con) -> EigenResult:
5858+ import datetime
5959+6060+ now_us = datetime.datetime.now(datetime.timezone.utc).timestamp()
6161+ rows = con.execute(
6262+ "SELECT voucher_did, subject_did, polarity, weight, evidence_uri, created_at FROM vouches"
6363+ ).fetchall()
6464+ seeds = [r[0] for r in con.execute("SELECT did FROM seeds").fetchall()]
6565+ dids = {d for (d,) in con.execute("SELECT did FROM contributors").fetchall()}
6666+ for v, s, *_ in rows:
6767+ dids.update((v, s))
6868+ dids.update(seeds)
6969+7070+ if not dids:
7171+ return EigenResult({}, {}, seeds, {})
7272+ index = {d: i for i, d in enumerate(sorted(dids))}
7373+ n = len(index)
7474+7575+ # denounced nodes get incoming trust zeroed (PRD 6.4: distrust does NOT flow transitively)
7676+ denounced = {s for (v, s, pol, *_) in rows if pol is not None and pol < 0}
7777+7878+ src, dst, data = [], [], []
7979+ adj: dict[str, list[str]] = {}
8080+ for voucher, subject, polarity, weight, evidence, created_at in rows:
8181+ if polarity is not None and polarity < 0:
8282+ continue # denounce: recorded as a feature, never a positive edge
8383+ if subject in denounced:
8484+ continue # ponytail: any denounce starves the node; per-edge weighting if it matters
8585+ w = (weight or 1.0) * _age_weight(created_at, now_us)
8686+ if evidence:
8787+ w *= CFG.eigen.evidence_boost
8888+ src.append(index[voucher]); dst.append(index[subject]); data.append(w)
8989+ adj.setdefault(voucher, []).append(subject)
9090+9191+ C = sparse.csr_matrix((data, (src, dst)), shape=(n, n))
9292+ row_sums = np.asarray(C.sum(axis=1)).ravel()
9393+ row_sums[row_sums == 0] = 1.0
9494+ C = sparse.diags(1.0 / row_sums) @ C # row-normalize: C[i,j] = trust i places in j
9595+9696+ p = np.zeros(n)
9797+ if seeds:
9898+ for s in seeds:
9999+ p[index[s]] = 1.0
100100+ else:
101101+ p[:] = 1.0 # ponytail: no seed configured -> uniform restart (global PageRank fallback)
102102+ p /= p.sum()
103103+104104+ Ct = C.T.tocsr()
105105+ t = p.copy()
106106+ a = CFG.eigen.alpha
107107+ for _ in range(CFG.eigen.iters):
108108+ t = (1 - a) * (Ct @ t) + a * p
109109+ s = t.sum()
110110+ if s > 0:
111111+ t /= s
112112+113113+ hi = t.max() or 1.0
114114+ trust = {d: float(t[i] / hi) for d, i in index.items()} # max-normalize to [0,1]
115115+ return EigenResult(trust, index, seeds, adj)
116116+117117+118118+def demo() -> None:
119119+ """Self-check: a sybil cluster vouching for itself must be starved (PRD 1.1)."""
120120+ from .db import init_db
121121+122122+ con = init_db()
123123+ con.execute("DELETE FROM vouches; DELETE FROM seeds; DELETE FROM contributors")
124124+ edges = [("seed", "alice"), ("alice", "bob"), ("bob", "carol"), # trusted chain
125125+ ("sybil1", "sybil2"), ("sybil2", "sybil1"), ("sybil2", "sybil3"), ("sybil3", "sybil1")]
126126+ for v, s in edges:
127127+ con.execute("INSERT INTO contributors (did) VALUES (?) ON CONFLICT DO NOTHING", [v])
128128+ con.execute("INSERT INTO contributors (did) VALUES (?) ON CONFLICT DO NOTHING", [s])
129129+ con.execute("INSERT INTO vouches (voucher_did, subject_did) VALUES (?,?)", [v, s])
130130+ con.execute("INSERT INTO seeds VALUES ('seed')")
131131+ r = compute(con)
132132+ trusted = r.trust.get("bob", 0)
133133+ sybil = max(r.trust.get(f"sybil{i}", 0) for i in (1, 2, 3))
134134+ print(f"bob={trusted:.3f} sybil_max={sybil:.3f} path(carol)={r.path_from_seed('carol')}")
135135+ assert trusted > sybil, "sybil cluster should be starved relative to the trusted chain"
136136+ assert r.path_from_seed("carol") == ["seed", "alice", "bob", "carol"]
137137+ print("ok")
138138+139139+140140+if __name__ == "__main__":
141141+ demo()
+222
src/trust/fusion.py
···11+"""M4 fusion gate + scoring worker (PRD 6.7, 6.9).
22+33+The gate is NOT an average: a low structural score can never be lifted into the
44+fast lane by clean-looking content (constraint 2). Content can only penalize.
55+"""
66+77+from __future__ import annotations
88+99+import json
1010+1111+from .config import CFG
1212+from . import eigentrust, review as review_mod
1313+1414+1515+def decide(structural_trust: float, content: dict | None, cfg=CFG.gate, *,
1616+ attestation_required: bool = False, attested: bool = True):
1717+ """PRD 6.7 gate. structural_trust is calibrated P(clean) in [0,1].
1818+1919+ 6.13: a sensitive/dual-use repo requires a valid jurisdiction attestation; a
2020+ missing one forces needs_human regardless of structural trust or content risk.
2121+ This is the only control that overrides the score, so it is checked first.
2222+ """
2323+ if attestation_required and not attested:
2424+ return "needs_human"
2525+ risk = 0.0 if content is None else content["content_risk"]
2626+ review = False if content is None else content["review_recommended"]
2727+ high_flag = bool(content) and any(f["severity"] == "high" for f in content["flags"])
2828+2929+ if structural_trust < cfg.T_LOW or risk >= cfg.R_HIGH or high_flag:
3030+ return "needs_human"
3131+ if structural_trust >= cfg.T_HIGH and risk <= cfg.R_LOW and not review:
3232+ return "fast_lane"
3333+ return "normal_queue"
3434+3535+3636+def repo_tier(con, repo: str | None) -> str:
3737+ """'sensitive' if the repo is in the sensitive/dual-use tier (6.13), else 'public'."""
3838+ if not repo:
3939+ return "public"
4040+ row = con.execute("SELECT tier FROM repo_tiers WHERE repo=?", [repo]).fetchone()
4141+ return row[0] if row else "public"
4242+4343+4444+def is_attested(con, did: str) -> bool:
4545+ """True if the DID has a contributor-issued jurisdiction attestation (declared, not inferred)."""
4646+ return con.execute("SELECT 1 FROM attestations WHERE did=? LIMIT 1", [did]).fetchone() is not None
4747+4848+4949+def displayed_prob(structural_trust: float, content: dict | None) -> float:
5050+ """Start from structural P(clean), penalize for content risk. Never lifts (6.7)."""
5151+ if content is None:
5252+ return structural_trust
5353+ return structural_trust * (1.0 - content["content_risk"])
5454+5555+5656+def _scorer():
5757+ """Load the M5 LightGBM scorer if trained AND lightgbm is installed; else None."""
5858+ try:
5959+ from . import learned
6060+ except ImportError:
6161+ return None
6262+ return learned.load()
6363+6464+6565+def _gnn_winner():
6666+ """M6 GraphSAGE scorer ONLY if it beat M5 on the holdout (PRD guardrail); else None."""
6767+ try:
6868+ from . import gnn
6969+ except ImportError:
7070+ return None
7171+ return gnn.load_if_winner()
7272+7373+7474+def structural_for(did, er: eigentrust.EigenResult, feats: dict | None):
7575+ """Calibrated P(clean) for the gate. Precedence: winning GNN (M6) -> LightGBM (M5)
7676+ -> raw EigenTrust (M3). The GNN is used only if it provably beat the baseline."""
7777+ g = _gnn_winner()
7878+ if g is not None:
7979+ return g.prob(did), None # GNN explanations are weak; reason falls back to path/SHAP
8080+ scorer = _scorer()
8181+ if scorer is not None:
8282+ return scorer.prob(did, feats or {}, er), scorer.contributions(did, feats or {}, er)
8383+ return er.trust.get(did, 0.0), None
8484+8585+8686+def build_reason(did, structural_trust, content, er: eigentrust.EigenResult, feats: dict | None,
8787+ model_factors: list | None = None, gate_note: str | None = None):
8888+ """Structured explanation (6.9): EigenTrust path + top factors + Claude's rationale."""
8989+ path = er.path_from_seed(did)
9090+ top_factors = []
9191+ if gate_note: # 6.13 compliance override, surfaced first so the human sees why
9292+ top_factors.append(gate_note)
9393+ if path:
9494+ top_factors.append(f"trust reaches {did} via {' -> '.join(path)}")
9595+ if feats:
9696+ if feats.get("merged_pr_count"):
9797+ top_factors.append(f"{int(feats['merged_pr_count'])} merged PRs")
9898+ if feats.get("revert_rate") is not None:
9999+ top_factors.append(f"revert rate {feats['revert_rate']:.0%}")
100100+ if feats.get("denounce_count"):
101101+ top_factors.append(f"{int(feats['denounce_count'])} denounce(s)")
102102+ if model_factors: # M5 LightGBM TreeSHAP contributions (6.9)
103103+ for mf in model_factors:
104104+ top_factors.append(f"{mf['feature']} ({mf['contribution'] + 0.0:+.3f})")
105105+ reason = {
106106+ "structural_trust": round(structural_trust, 4),
107107+ "trust_path": path,
108108+ "top_factors": top_factors,
109109+ "model_factors": model_factors or [],
110110+ "compliance_block": gate_note,
111111+ }
112112+ if content is not None:
113113+ reason["content_summary"] = content["summary"]
114114+ reason["flags"] = content["flags"]
115115+ reason["content_risk"] = content["content_risk"]
116116+ return reason
117117+118118+119119+def should_review(structural_trust: float, security_sensitive: bool, cfg=CFG.gate) -> bool:
120120+ """Cost gate (6.6): skip Sonnet for clearly-trusted unless security-sensitive."""
121121+ if structural_trust >= cfg.T_HIGH:
122122+ return security_sensitive
123123+ return True # ambiguous band and below: review earns its keep / attaches a reason
124124+125125+126126+def score_pr(con, pr_id: str, run_review: bool = True) -> dict:
127127+ """Full hybrid score for one PR: EigenTrust + (gated) Claude -> decision + write."""
128128+ row = con.execute(
129129+ "SELECT author_did, diff_text, repo, target FROM pull_requests WHERE pr_id=?", [pr_id]
130130+ ).fetchone()
131131+ if not row:
132132+ raise ValueError(f"unknown pr {pr_id}")
133133+ did, diff, repo, target = row
134134+135135+ er = eigentrust.compute(con)
136136+ feats = _features_for(con, did)
137137+ structural, model_factors = structural_for(did, er, feats)
138138+139139+ tier = repo_tier(con, repo) # 6.13 repo tiering
140140+ attested = is_attested(con, did)
141141+ sensitive = tier == "sensitive"
142142+ content = None
143143+ if run_review and should_review(structural, sensitive):
144144+ content = review_mod.review_pr(diff or "", title=repo or "", discussion="")
145145+146146+ decision = decide(structural, content, attestation_required=sensitive, attested=attested)
147147+ prob = displayed_prob(structural, content)
148148+ gate_note = ("sensitive-tier repo: a valid jurisdiction attestation is required before "
149149+ "fast-lane/merge (6.13)") if sensitive and not attested else None
150150+ reason = build_reason(did, structural, content, er, feats, model_factors, gate_note)
151151+152152+ con.execute(
153153+ "INSERT INTO scores (did, structural_trust, content_risk, calibrated_prob, decision, explanation_json) "
154154+ "VALUES (?,?,?,?,?,?)",
155155+ [did, structural, (content or {}).get("content_risk"), prob, decision, json.dumps(reason)],
156156+ )
157157+ return {"did": did, "structural_trust": structural, "calibrated_prob": prob,
158158+ "decision": decision, "explanation": reason}
159159+160160+161161+def _features_for(con, did: str) -> dict | None:
162162+ cols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
163163+ row = con.execute("SELECT * FROM features WHERE did=?", [did]).fetchone()
164164+ return dict(zip(cols, row)) if row else None
165165+166166+167167+def _process_pending(con) -> int:
168168+ pending = con.execute(
169169+ "SELECT pr_id FROM pull_requests WHERE author_did NOT IN (SELECT did FROM scores)"
170170+ ).fetchall()
171171+ for (pr_id,) in pending:
172172+ r = score_pr(con, pr_id)
173173+ print(f"{r['decision']:<13} {r['calibrated_prob']:.3f} {pr_id}", flush=True)
174174+ return len(pending)
175175+176176+177177+def main() -> None:
178178+ """Scoring worker: score PRs that have no score yet, write decisions (6.10).
179179+180180+ Default is one-shot; --loop polls forever (a long-lived pane under mprocs),
181181+ opening a short-lived read-write connection per cycle so the API can read
182182+ between cycles.
183183+ """
184184+ import argparse
185185+ import time
186186+187187+ from .db import connection, ensure_schema
188188+189189+ ap = argparse.ArgumentParser(description="trust scoring worker")
190190+ ap.add_argument("--loop", action="store_true", help="poll forever instead of one pass")
191191+ ap.add_argument("--interval", type=float, default=5.0, help="seconds between polls")
192192+ args = ap.parse_args()
193193+194194+ ensure_schema()
195195+ while True:
196196+ with connection(read_only=False) as con:
197197+ n = _process_pending(con)
198198+ if not args.loop:
199199+ print(f"[score] processed {n} PRs")
200200+ return
201201+ time.sleep(args.interval)
202202+203203+204204+def demo() -> None:
205205+ """Self-check: gate never fast-lanes a low-trust DID, even on clean content (constraint 2)."""
206206+ clean = {"content_risk": 0.0, "review_recommended": False, "flags": [], "summary": "ok"}
207207+ risky = {"content_risk": 0.9, "review_recommended": True,
208208+ "flags": [{"severity": "high", "type": "subtle_bug", "location": "x", "explanation": "y"}],
209209+ "summary": "bad"}
210210+ assert decide(0.1, clean) == "needs_human", "low trust + clean content must NOT fast-lane"
211211+ assert decide(0.95, clean) == "fast_lane"
212212+ assert decide(0.95, risky) == "needs_human", "high-severity flag forces human"
213213+ assert decide(0.5, None) == "normal_queue"
214214+ assert displayed_prob(0.9, risky) < 0.9, "content risk must penalize, never lift"
215215+ # 6.13: a sensitive-tier repo with no attestation forces human even for a perfect score.
216216+ assert decide(0.99, clean, attestation_required=True, attested=False) == "needs_human"
217217+ assert decide(0.99, clean, attestation_required=True, attested=True) == "fast_lane"
218218+ print("ok")
219219+220220+221221+if __name__ == "__main__":
222222+ demo()
+212
src/trust/gnn.py
···11+"""M6 GraphSAGE (stretch). Inductive node classification on the vouch graph.
22+33+PRD M6 / 6.5: GraphSAGE, 2 layers, hidden 64, out 1; nodes are contributors with
44+the per-DID feature vector as node features; edges are positive vouches + co-
55+contribution edges (denounce-count rides as a node feature, no signed-edge GNN).
66+Trained OFFLINE, served in-process.
77+88+Guardrail (PRD section 8, repeated): SHIP THE GNN ONLY IF IT BEATS THE CALIBRATED
99+LightGBM BASELINE AND IS STABLE. So `train_and_compare` writes a verdict, and
1010+`load_if_winner` (used by fusion) returns a scorer ONLY when the GNN actually beat
1111+M5 on the time-split holdout. On a small, sparsely-vouched graph it won't, and the
1212+system correctly keeps serving M5 — "always have M4/M5 working first."
1313+1414+Optional: needs `uv pip install -e '.[gnn]'` (torch + torch-geometric, multi-GB).
1515+"""
1616+1717+from __future__ import annotations
1818+1919+import json
2020+from types import SimpleNamespace
2121+# OpenMP dual-libomp guard (lightgbm + torch) is set in trust/__init__.py — it must
2222+# run before either library imports, which package init guarantees.
2323+2424+from .config import MODEL_DIR
2525+from .db import connection
2626+from . import eigentrust, learned
2727+2828+CKPT = MODEL_DIR / "gnn.pt"
2929+VERDICT = MODEL_DIR / "gnn_verdict.json"
3030+HIDDEN = 64
3131+3232+3333+def _sage(in_dim: int):
3434+ import torch
3535+ from torch_geometric.nn import SAGEConv
3636+3737+ class SAGE(torch.nn.Module):
3838+ def __init__(self):
3939+ super().__init__()
4040+ self.c1 = SAGEConv(in_dim, HIDDEN) # inductive: generalizes to unseen nodes
4141+ self.c2 = SAGEConv(HIDDEN, 1)
4242+4343+ def forward(self, x, ei):
4444+ import torch.nn.functional as F
4545+4646+ return self.c2(F.relu(self.c1(x, ei)), ei).squeeze(-1)
4747+4848+ return SAGE()
4949+5050+5151+def _build_graph(con, mean=None, std=None):
5252+ import torch
5353+5454+ er = eigentrust.compute(con)
5555+ dids = [r[0] for r in con.execute("SELECT did FROM contributors ORDER BY did").fetchall()]
5656+ didx = {d: i for i, d in enumerate(dids)}
5757+ fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
5858+ feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
5959+6060+ raw = torch.tensor([learned._vec(d, feats.get(d, {}), er) for d in dids], dtype=torch.float)
6161+ if mean is None:
6262+ mean, std = raw.mean(0, keepdim=True), raw.std(0, keepdim=True).clamp_min(1e-6)
6363+ x = (raw - mean) / std
6464+6565+ src, dst = [], []
6666+ for v, s in con.execute("SELECT voucher_did, subject_did FROM vouches WHERE polarity > 0").fetchall():
6767+ if v in didx and s in didx: # undirected edges for SAGE mean-aggregation
6868+ src += [didx[v], didx[s]]; dst += [didx[s], didx[v]]
6969+ for a, b in con.execute( # co-contribution: authored PRs to the same repo (PRD 6.5)
7070+ "SELECT DISTINCT a.author_did, b.author_did FROM pull_requests a JOIN pull_requests b "
7171+ "ON a.repo = b.repo AND a.author_did < b.author_did"
7272+ ).fetchall():
7373+ if a in didx and b in didx:
7474+ src += [didx[a], didx[b]]; dst += [didx[b], didx[a]]
7575+ edge_index = (torch.tensor([src, dst], dtype=torch.long) if src
7676+ else torch.empty((2, 0), dtype=torch.long))
7777+7878+ # node labels: soft target = clean_merge_rate; temporal split by latest labelled PR
7979+ lab = con.execute(
8080+ "SELECT author_did, AVG(clean_merge), MAX(opened_at) FROM pr_labels "
8181+ "WHERE clean_merge IS NOT NULL GROUP BY author_did ORDER BY MAX(opened_at)"
8282+ ).fetchall()
8383+ label = {d: float(r) for d, r, _ in lab}
8484+ ordered = [d for d, _, _ in lab]
8585+ k = max(1, int(len(ordered) * 0.7))
8686+ train_dids, val_dids = ordered[:k], ordered[k:]
8787+ y = torch.zeros(len(dids))
8888+ train_mask = torch.zeros(len(dids), dtype=torch.bool)
8989+ val_mask = torch.zeros(len(dids), dtype=torch.bool)
9090+ for d in ordered:
9191+ y[didx[d]] = label[d]
9292+ for d in train_dids:
9393+ train_mask[didx[d]] = True
9494+ for d in val_dids:
9595+ val_mask[didx[d]] = True
9696+9797+ return SimpleNamespace(x=x, edge_index=edge_index, y=y, train_mask=train_mask, val_mask=val_mask,
9898+ dids=dids, didx=didx, val_dids=val_dids, label=label, feats=feats, er=er,
9999+ mean=mean, std=std)
100100+101101+102102+def _acc(prob, y) -> float:
103103+ return float(((prob >= 0.5) == (y >= 0.5)).float().mean()) if len(y) else float("nan")
104104+105105+106106+def _m5_val_acc(g) -> float | None:
107107+ try:
108108+ s = learned.load()
109109+ except ImportError:
110110+ return None
111111+ if s is None or not g.val_dids:
112112+ return None
113113+ hits = sum(int((s.prob(d, g.feats.get(d, {}), g.er) >= 0.5) == (g.label[d] >= 0.5))
114114+ for d in g.val_dids)
115115+ return hits / len(g.val_dids)
116116+117117+118118+def train_and_compare(epochs: int = 300) -> dict:
119119+ import torch
120120+121121+ with connection(read_only=True) as con:
122122+ g = _build_graph(con)
123123+ if not g.val_dids or int(g.train_mask.sum()) == 0:
124124+ raise SystemExit("not enough labelled nodes for a temporal split; seed/ingest more history")
125125+126126+ model = _sage(g.x.size(1))
127127+ opt = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
128128+ lossfn = torch.nn.BCEWithLogitsLoss()
129129+ for _ in range(epochs):
130130+ model.train(); opt.zero_grad()
131131+ loss = lossfn(model(g.x, g.edge_index)[g.train_mask], g.y[g.train_mask])
132132+ loss.backward(); opt.step()
133133+134134+ model.eval()
135135+ with torch.no_grad():
136136+ prob = torch.sigmoid(model(g.x, g.edge_index))
137137+ stable = bool(torch.isfinite(prob).all())
138138+ gnn_acc = _acc(prob[g.val_mask], g.y[g.val_mask])
139139+ m5_acc = _m5_val_acc(g)
140140+ # Beat the baseline strictly, and only when a baseline exists (PRD guardrail).
141141+ gnn_wins = bool(stable and m5_acc is not None and gnn_acc > m5_acc)
142142+143143+ MODEL_DIR.mkdir(parents=True, exist_ok=True)
144144+ torch.save({"state": model.state_dict(), "mean": g.mean, "std": g.std, "in_dim": g.x.size(1)}, CKPT)
145145+ verdict = {"gnn_val_acc": round(gnn_acc, 3), "m5_val_acc": m5_acc,
146146+ "val_nodes": len(g.val_dids), "stable": stable, "gnn_wins": gnn_wins}
147147+ VERDICT.write_text(json.dumps(verdict, indent=2))
148148+ return verdict
149149+150150+151151+class GNNScorer:
152152+ """In-process inductive inference: rebuild the current graph, forward, read the node."""
153153+154154+ def __init__(self, ckpt):
155155+ self.ckpt = ckpt
156156+157157+ def prob(self, did: str) -> float:
158158+ import torch
159159+160160+ with connection(read_only=True) as con:
161161+ g = _build_graph(con, mean=self.ckpt["mean"], std=self.ckpt["std"])
162162+ model = _sage(self.ckpt["in_dim"])
163163+ model.load_state_dict(self.ckpt["state"])
164164+ model.eval()
165165+ with torch.no_grad():
166166+ p = torch.sigmoid(model(g.x, g.edge_index))
167167+ i = g.didx.get(did)
168168+ return float(p[i]) if i is not None else 0.0
169169+170170+171171+def load_if_winner() -> GNNScorer | None:
172172+ """Serving hook used by fusion: a GNN scorer ONLY if it beat M5 (else None)."""
173173+ if not (VERDICT.exists() and CKPT.exists()):
174174+ return None
175175+ if not json.loads(VERDICT.read_text()).get("gnn_wins"):
176176+ return None
177177+ try:
178178+ import torch
179179+ except ImportError:
180180+ return None
181181+ return GNNScorer(torch.load(CKPT, weights_only=False))
182182+183183+184184+def main() -> None:
185185+ v = train_and_compare()
186186+ print(f"[gnn] val nodes={v['val_nodes']} GNN acc={v['gnn_val_acc']} "
187187+ f"M5 acc={v['m5_val_acc']} stable={v['stable']}")
188188+ print(f"[gnn] gnn_wins={v['gnn_wins']} -> "
189189+ + ("SERVED (beats calibrated M5)" if v["gnn_wins"]
190190+ else "NOT served; system keeps M5 (PRD guardrail: ship only if it beats the baseline)"))
191191+192192+193193+def demo() -> None:
194194+ """Self-check: trains, produces finite probs, writes a verdict — stability, not winning."""
195195+ from .db import connection as conn, init_db
196196+ from .seed import seed as load_seed
197197+198198+ with conn(read_only=False) as con:
199199+ init_db(con)
200200+ load_seed(con)
201201+ try:
202202+ learned.train() # so there's an M5 baseline to compare against
203203+ except Exception:
204204+ pass
205205+ v = train_and_compare(epochs=200)
206206+ assert v["stable"], "GNN produced non-finite outputs"
207207+ assert isinstance(v["gnn_wins"], bool)
208208+ print(f"gnn_val_acc={v['gnn_val_acc']} m5_val_acc={v['m5_val_acc']} gnn_wins={v['gnn_wins']} ok")
209209+210210+211211+if __name__ == "__main__":
212212+ demo()
+159
src/trust/ingest.py
···11+"""M1 ingest: Jetstream firehose -> events (batched) -> derive typed tables.
22+33+Single writer (PRD 6.1). Buffer in memory, append in batches, persist the
44+`time_us` cursor so a crash resumes gaplessly. The cursor + the durable events
55+log ARE the resumability a broker would give (PRD 2).
66+"""
77+88+from __future__ import annotations
99+1010+import argparse
1111+import asyncio
1212+import json
1313+from collections import Counter
1414+1515+import websockets
1616+1717+from .config import COLLECTION_KINDS, JETSTREAM_URL, WANTED_COLLECTIONS
1818+from .db import connection, ensure_schema
1919+2020+STREAM = "jetstream"
2121+BATCH = 200
2222+FLUSH_SECONDS = 2.0
2323+2424+2525+def _kind(collection: str) -> str | None:
2626+ for needle, kind in COLLECTION_KINDS.items():
2727+ if needle in collection:
2828+ return kind
2929+ return None
3030+3131+3232+def _url(con) -> str:
3333+ row = con.execute("SELECT last_time_us FROM ingest_state WHERE stream=?", [STREAM]).fetchone()
3434+ cursor = f"&cursor={row[0] - 5_000_000}" if row and row[0] else "" # -5s for gapless replay
3535+ cols = "".join(f"&wantedCollections={c}" for c in WANTED_COLLECTIONS.split(","))
3636+ return f"{JETSTREAM_URL}?{cols.lstrip('&')}{cursor}"
3737+3838+3939+def flush(con, buf: list[tuple]) -> None:
4040+ if not buf:
4141+ return
4242+ con.executemany(
4343+ "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)",
4444+ buf,
4545+ )
4646+ last = max(e[1] for e in buf)
4747+ con.execute(
4848+ "INSERT INTO ingest_state (stream, last_time_us) VALUES (?, ?) "
4949+ "ON CONFLICT (stream) DO UPDATE SET last_time_us=excluded.last_time_us",
5050+ [STREAM, last],
5151+ )
5252+ derive(con, buf)
5353+ buf.clear()
5454+5555+5656+def derive(con, events: list[tuple]) -> None:
5757+ """Raw event tuples -> contributors / vouches / pull_requests (PRD 6.1, 6.2)."""
5858+ for did, time_us, op, collection, rkey, record_json in events:
5959+ kind = _kind(collection)
6060+ rec = json.loads(record_json) if record_json else {}
6161+ con.execute(
6262+ "INSERT INTO contributors (did, first_seen) VALUES (?, now()) ON CONFLICT (did) DO NOTHING",
6363+ [did],
6464+ )
6565+ if kind in ("vouch", "denounce") and op != "delete":
6666+ # Real sh.tangled.graph.vouch: subject is the RKEY (at://voucher/.../<subject_did>),
6767+ # not a record field; vouch-vs-denounce is in rec["kind"]. Confirmed via listRecords.
6868+ subject = (rec.get("subject") or rec.get("subjectDid")
6969+ or (rkey if str(rkey).startswith("did:") else None))
7070+ if not subject:
7171+ continue
7272+ polarity = -1 if (kind == "denounce" or rec.get("kind") == "denounce") else 1
7373+ con.execute(
7474+ "INSERT INTO vouches (voucher_did, subject_did, polarity, reason, evidence_uri, created_at, weight) "
7575+ "VALUES (?,?,?,?,?,?,1.0) ON CONFLICT (voucher_did, subject_did) DO UPDATE SET "
7676+ "polarity=excluded.polarity, reason=excluded.reason",
7777+ [did, subject, polarity, rec.get("reason"), rec.get("evidence") or rec.get("uri"),
7878+ rec.get("createdAt")],
7979+ )
8080+ elif kind == "attestation" and op != "delete": # 6.13 jurisdiction attestation
8181+ con.execute(
8282+ "INSERT INTO attestations (did, jurisdiction, method, created_at) VALUES (?,?,?,?) "
8383+ "ON CONFLICT (did, jurisdiction) DO NOTHING",
8484+ [did, rec.get("jurisdiction"), rec.get("method", "signed_record"), rec.get("createdAt")],
8585+ )
8686+ elif kind == "pull_request" and op != "delete":
8787+ pr_id = f"{did}/{collection}/{rkey}"
8888+ tgt = rec.get("target") if isinstance(rec.get("target"), dict) else {}
8989+ # Real sh.tangled.repo.pull carries identity + branches + body, but NOT the
9090+ # label-bearing outcome: merged / ci_status / diff-stats are appview/knot state,
9191+ # absent from the PDS record (merged->NULL here). The diff is rounds[].patchBlob
9292+ # (a gzipped blob CID), not inline. ci_status/merged stay NULL until joined from
9393+ # the appview; see backfill.py header. ponytail: ingest what's in the record,
9494+ # leave outcome columns NULL rather than fabricating bool(missing)=False.
9595+ con.execute(
9696+ "INSERT INTO pull_requests (pr_id, author_did, repo, target, opened_at, ci_status, "
9797+ "merged, closed_unmerged, additions, deletions, files_touched, diff_text, discussion_len) "
9898+ "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (pr_id) DO NOTHING",
9999+ [pr_id, did, tgt.get("repo") or rec.get("repo"), tgt.get("branch") or rec.get("target"),
100100+ rec.get("createdAt"), rec.get("ciStatus"), rec.get("merged"), False,
101101+ rec.get("additions"), rec.get("deletions"), rec.get("filesTouched"),
102102+ rec.get("diff"), len(json.dumps(rec.get("body", "")))],
103103+ )
104104+105105+106106+def _flush(buf: list[tuple]) -> None:
107107+ # Short-lived read-write connection per batch so the API can read between flushes.
108108+ with connection(read_only=False) as con:
109109+ flush(con, buf)
110110+111111+112112+async def run(probe: bool = False, max_events: int | None = None) -> None:
113113+ ensure_schema()
114114+ with connection(read_only=True) as con:
115115+ url = _url(con)
116116+ buf: list[tuple] = []
117117+ seen: Counter[str] = Counter()
118118+ n = 0
119119+ async with websockets.connect(url, max_size=None) as ws:
120120+ loop = asyncio.get_event_loop()
121121+ last_flush = loop.time()
122122+ async for raw in ws:
123123+ evt = json.loads(raw)
124124+ if evt.get("kind") != "commit":
125125+ continue
126126+ c = evt["commit"]
127127+ collection = c.get("collection", "")
128128+ seen[collection] += 1
129129+ if probe:
130130+ n += 1
131131+ if n % 50 == 0:
132132+ print(f"[probe] {n} events; top collections: {seen.most_common(10)}")
133133+ if max_events and n >= max_events:
134134+ print(f"[probe] distinct collections seen:\n " +
135135+ "\n ".join(f"{k}: {v}" for k, v in seen.most_common()))
136136+ return
137137+ continue
138138+ buf.append((evt["did"], evt["time_us"], c.get("operation"), collection,
139139+ c.get("rkey"), json.dumps(c.get("record"))))
140140+ n += 1
141141+ if len(buf) >= BATCH or loop.time() - last_flush > FLUSH_SECONDS:
142142+ _flush(buf)
143143+ last_flush = loop.time()
144144+ if max_events and n >= max_events:
145145+ break
146146+ _flush(buf)
147147+148148+149149+def main() -> None:
150150+ ap = argparse.ArgumentParser(description="Jetstream -> DuckDB ingester")
151151+ ap.add_argument("--probe", action="store_true",
152152+ help="log live `collection` values to CONFIRM NSIDs; writes nothing")
153153+ ap.add_argument("--max-events", type=int, default=None)
154154+ args = ap.parse_args()
155155+ asyncio.run(run(probe=args.probe, max_events=args.max_events))
156156+157157+158158+if __name__ == "__main__":
159159+ main()
+160
src/trust/learned.py
···11+"""M5 learned signal: LightGBM on per-DID features, isotonic-calibrated (PRD 6.5/6.8).
22+33+Predicts clean_merge from the feature vector (eigentrust_score included as a
44+feature, so the model builds on the graph signal). Trained offline on a
55+time-based split, calibrated with isotonic regression so the output is a real
66+P(clean). Explanations use LightGBM's native TreeSHAP (`pred_contrib`) — no
77+separate shap/numba dependency.
88+99+Stretch milestone: needs `uv pip install -e '.[learned]'`. The system runs
1010+without it (fusion falls back to raw EigenTrust).
1111+"""
1212+1313+from __future__ import annotations
1414+1515+import pickle
1616+1717+import numpy as np
1818+1919+from .config import MODEL_DIR
2020+from .db import connection
2121+from . import eigentrust
2222+2323+# PRD 6.5 feature list, restricted to what the features view currently produces.
2424+# bsky_graph_degree / bsky_account_age join in once the app.bsky social graph is ingested.
2525+FEATURE_COLS = [
2626+ "eigentrust_score", "did_age_days", "merged_pr_count", "revert_rate", "ci_pass_rate",
2727+ "close_without_merge_ratio", "mean_diff_size", "mean_files_touched", "churn",
2828+ "mean_discussion_len", "denounce_count",
2929+]
3030+MODEL_PATH = MODEL_DIR / "learned.pkl"
3131+3232+3333+def _vec(did: str, feats: dict, er: eigentrust.EigenResult) -> list[float]:
3434+ out = []
3535+ for c in FEATURE_COLS:
3636+ out.append(er.trust.get(did, 0.0) if c == "eigentrust_score" else float(feats.get(c) or 0.0))
3737+ return out
3838+3939+4040+class LearnedScorer:
4141+ def __init__(self, booster, iso, cols):
4242+ self.booster, self.iso, self.cols = booster, iso, cols
4343+4444+ def prob(self, did, feats, er) -> float:
4545+ raw = float(self.booster.predict(np.array([_vec(did, feats, er)]))[0])
4646+ return float(self.iso.predict([raw])[0]) if self.iso is not None else raw
4747+4848+ def contributions(self, did, feats, er, top: int = 3) -> list[dict]:
4949+ c = self.booster.predict(np.array([_vec(did, feats, er)]), pred_contrib=True)[0][:-1]
5050+ idx = np.argsort(np.abs(c))[::-1][:top]
5151+ return [{"feature": self.cols[i], "contribution": round(float(c[i]), 3)} for i in idx]
5252+5353+5454+_cache: LearnedScorer | None = None
5555+_loaded = False
5656+5757+5858+def load() -> LearnedScorer | None:
5959+ global _cache, _loaded
6060+ if not _loaded:
6161+ _loaded = True
6262+ if MODEL_PATH.exists():
6363+ import lightgbm as lgb
6464+6565+ d = pickle.loads(MODEL_PATH.read_bytes())
6666+ _cache = LearnedScorer(lgb.Booster(model_str=d["booster"]), d["iso"], d["cols"])
6767+ return _cache
6868+6969+7070+def _matrix(con, er):
7171+ rows = con.execute(
7272+ "SELECT author_did, opened_at, clean_merge FROM pr_labels WHERE clean_merge IS NOT NULL "
7373+ "ORDER BY opened_at"
7474+ ).fetchall()
7575+ fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
7676+ feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
7777+ X = np.array([_vec(did, feats.get(did, {}), er) for did, _, _ in rows], dtype=float)
7878+ y = np.array([int(lbl) for _, _, lbl in rows], dtype=int)
7979+ return X, y
8080+8181+8282+def _reliability(p, y, bins=5):
8383+ """Reliability curve (PRD 6.8): predicted vs empirical P(clean) per bin."""
8484+ edges = np.linspace(0, 1, bins + 1)
8585+ out = []
8686+ for lo, hi in zip(edges, edges[1:]):
8787+ m = (p >= lo) & (p <= hi if hi == 1 else p < hi)
8888+ if m.any():
8989+ out.append({"bin": f"{lo:.1f}-{hi:.1f}", "predicted": round(float(p[m].mean()), 3),
9090+ "actual": round(float(y[m].mean()), 3), "n": int(m.sum())})
9191+ return out
9292+9393+9494+def train(split: float = 0.7) -> dict:
9595+ import lightgbm as lgb
9696+ from sklearn.isotonic import IsotonicRegression
9797+9898+ with connection(read_only=True) as con:
9999+ er = eigentrust.compute(con)
100100+ X, y = _matrix(con, er)
101101+ if len(X) < 4 or len(set(y.tolist())) < 2:
102102+ raise SystemExit(f"need >=4 labelled PRs spanning both classes; got {len(X)} rows, "
103103+ f"classes={set(y.tolist())}. Seed/ingest more history first.")
104104+105105+ k = max(2, int(len(X) * split))
106106+ Xtr, ytr, Xval, yval = X[:k], y[:k], X[k:], y[k:]
107107+ params = dict(objective="binary", num_leaves=15, min_data_in_leaf=1, min_data_in_bin=1,
108108+ learning_rate=0.1, verbose=-1, feature_pre_filter=False)
109109+ booster = lgb.train(params, lgb.Dataset(Xtr, label=ytr, feature_name=FEATURE_COLS),
110110+ num_boost_round=60)
111111+112112+ raw_val = booster.predict(Xval)
113113+ iso = None
114114+ if len(set(yval.tolist())) > 1: # isotonic needs both classes in the holdout
115115+ iso = IsotonicRegression(out_of_bounds="clip", y_min=0.0, y_max=1.0).fit(raw_val, yval)
116116+ cal_val = iso.predict(raw_val) if iso is not None else raw_val
117117+118118+ MODEL_DIR.mkdir(parents=True, exist_ok=True)
119119+ MODEL_PATH.write_bytes(pickle.dumps(
120120+ {"booster": booster.model_to_string(), "iso": iso, "cols": FEATURE_COLS}))
121121+ global _loaded, _cache
122122+ _loaded, _cache = False, None # force reload of the fresh model
123123+124124+ rel = _reliability(np.asarray(cal_val), yval)
125125+ return {"rows": len(X), "train": k, "val": len(Xval),
126126+ "calibrated": iso is not None, "reliability": rel, "model": str(MODEL_PATH)}
127127+128128+129129+def main() -> None:
130130+ r = train()
131131+ print(f"[train] {r['rows']} labelled PRs (train={r['train']} / val={r['val']}), "
132132+ f"calibrated={r['calibrated']} -> {r['model']}")
133133+ print("[train] reliability curve (predicted vs actual P(clean)):")
134134+ for b in r["reliability"]:
135135+ print(f" {b['bin']} predicted={b['predicted']} actual={b['actual']} n={b['n']}")
136136+137137+138138+def demo() -> None:
139139+ """Self-check: after training, a trusted DID scores above a sybil (graph + history)."""
140140+ from .db import connection as conn, init_db
141141+ from .seed import seed as load_seed
142142+143143+ with conn(read_only=False) as con:
144144+ init_db(con) # schema + features + pr_labels views
145145+ load_seed(con)
146146+ train()
147147+ s = load()
148148+ with conn(read_only=True) as con:
149149+ er = eigentrust.compute(con)
150150+ fcols = [c[0] for c in con.execute("DESCRIBE features").fetchall()]
151151+ feats = {r[0]: dict(zip(fcols, r)) for r in con.execute("SELECT * FROM features").fetchall()}
152152+ trusted = s.prob("did:plc:carol", feats.get("did:plc:carol", {}), er)
153153+ sybil = s.prob("did:plc:sybil2", feats.get("did:plc:sybil2", {}), er)
154154+ print(f"calibrated P(clean): carol={trusted:.3f} sybil2={sybil:.3f}")
155155+ assert trusted > sybil, "learned score must rank the trusted DID above the sybil"
156156+ print("ok")
157157+158158+159159+if __name__ == "__main__":
160160+ demo()
+121
src/trust/review.py
···11+"""M4 content signal: Claude reviews ONE PR's diff + discussion (PRD 6.6).
22+33+Claude judges content, never identity (constraint 2): no author handle, DID, or
44+history is ever passed in. Output is forced to the schema via tool use, temp 0.
55+"""
66+77+from __future__ import annotations
88+99+import json
1010+import os
1111+1212+from .config import CFG
1313+1414+# Verbatim from PRD 6.6 — do not paraphrase.
1515+SYSTEM_PROMPT = """\
1616+You are a code-contribution reviewer for an open-source trust system. You assess ONE
1717+pull request's actual content for quality and safety. You do not decide whether to
1818+merge; you produce a structured risk assessment that a separate policy layer combines
1919+with an identity-trust signal.
2020+2121+Hard rules:
2222+- Judge only the artifact in front of you: the diff, the PR title and description, and
2323+ the discussion. You are given NO information about the author's identity, reputation,
2424+ or history, and you must not speculate about it. Identity trust is handled elsewhere.
2525+- Your job is to catch problems a reputation signal cannot see: code that looks correct
2626+ but is subtly wrong, plausible-looking machine-generated filler ("slop"),
2727+ security-sensitive changes, leaked secrets or credentials, license violations, and
2828+ changes whose stated intent does not match what the code does.
2929+- Prefer flagging uncertainty over approving. If the diff is large, unclear, or you
3030+ cannot verify correctness, say so and set review_recommended. Never rubber-stamp.
3131+- Be specific. Every flag must point to concrete lines or patterns, not vibes.
3232+- Output ONLY the structured object specified by the tool. No prose outside it.\
3333+"""
3434+3535+ASSESSMENT_TOOL = {
3636+ "name": "submit_assessment",
3737+ "description": "Submit the structured risk assessment for this PR.",
3838+ "input_schema": {
3939+ "type": "object",
4040+ "properties": {
4141+ "content_risk": {"type": "number", "description": "0.0 safe/trivial to 1.0 broken/dangerous"},
4242+ "flags": {
4343+ "type": "array",
4444+ "items": {
4545+ "type": "object",
4646+ "properties": {
4747+ "type": {"type": "string", "enum": [
4848+ "subtle_bug", "slop", "security", "secret_leak", "license",
4949+ "intent_mismatch", "untested", "oversized", "other"]},
5050+ "severity": {"type": "string", "enum": ["low", "med", "high"]},
5151+ "location": {"type": "string"},
5252+ "explanation": {"type": "string"},
5353+ },
5454+ "required": ["type", "severity", "location", "explanation"],
5555+ "additionalProperties": False,
5656+ },
5757+ },
5858+ "summary": {"type": "string"},
5959+ "review_recommended": {"type": "boolean"},
6060+ },
6161+ "required": ["content_risk", "flags", "summary", "review_recommended"],
6262+ "additionalProperties": False,
6363+ },
6464+}
6565+6666+# Models that reject the temperature param (Opus 4.7+/Fable). Sonnet 4.6 accepts it.
6767+_NO_TEMPERATURE = ("opus-4-7", "opus-4-8", "fable")
6868+6969+7070+def _client():
7171+ if not os.environ.get(CFG.review.api_key_env):
7272+ return None
7373+ import anthropic
7474+7575+ return anthropic.Anthropic()
7676+7777+7878+def review_pr(diff: str, title: str = "", description: str = "", discussion: str = "",
7979+ machine_findings: dict | None = None, model: str | None = None) -> dict | None:
8080+ """Return the 6.6 schema object, or None if no API key is configured."""
8181+ client = _client()
8282+ if client is None:
8383+ return None
8484+ model = model or CFG.review.model
8585+8686+ parts = [f"PR title: {title}", f"PR description: {description}",
8787+ f"Discussion:\n{discussion}", f"Diff:\n{diff[:CFG.review.max_diff_chars]}"]
8888+ if machine_findings: # 6.12 structured evidence, no identity
8989+ parts.append("Automated scan findings (advisory evidence):\n"
9090+ + json.dumps(machine_findings, indent=2))
9191+ user = "\n\n".join(parts)
9292+9393+ kwargs = dict(
9494+ model=model, max_tokens=1500, system=SYSTEM_PROMPT,
9595+ tools=[ASSESSMENT_TOOL], tool_choice={"type": "tool", "name": "submit_assessment"},
9696+ messages=[{"role": "user", "content": user}],
9797+ )
9898+ if not any(m in model for m in _NO_TEMPERATURE):
9999+ kwargs["temperature"] = 0
100100+101101+ resp = client.messages.create(**kwargs)
102102+ for block in resp.content:
103103+ if block.type == "tool_use" and block.name == "submit_assessment":
104104+ return block.input
105105+ return None
106106+107107+108108+def demo() -> None:
109109+ """Self-check: schema shape is parseable; live call only if a key is set."""
110110+ out = review_pr("def add(a,b):\n return a-b # says add, does subtract",
111111+ title="Add helper", description="adds two numbers")
112112+ if out is None:
113113+ print("no ANTHROPIC_API_KEY -> content signal skipped (gate treats as None). ok")
114114+ return
115115+ assert 0.0 <= out["content_risk"] <= 1.0
116116+ assert isinstance(out["flags"], list) and "summary" in out
117117+ print(f"content_risk={out['content_risk']} flags={len(out['flags'])} :: {out['summary']}")
118118+119119+120120+if __name__ == "__main__":
121121+ demo()
+6
src/trust/score.py
···11+"""Scoring worker entry point. The logic lives in fusion (gate + score_pr)."""
22+33+from .fusion import main
44+55+if __name__ == "__main__":
66+ main()
+123
src/trust/seed.py
···11+"""Demo data so the full pipeline runs without a live Jetstream or external drive.
22+33+NOT real Tangled data — a synthetic vouch graph (trusted core + sybil cluster)
44+plus labelled PRs, enough to demo M3 (EigenTrust triage) and M4 (Claude + gate).
55+Run real ingest (`python -m trust.ingest`) to replace this with live data.
66+"""
77+88+from __future__ import annotations
99+1010+import datetime as dt
1111+1212+from .db import connection, ensure_schema
1313+1414+# did -> handle. The maintainer is the EigenTrust seed.
1515+PEOPLE = {
1616+ "did:plc:maintainer": "lewis.tangled.sh",
1717+ "did:plc:alice": "alice.dev",
1818+ "did:plc:bob": "bob.codes",
1919+ "did:plc:carol": "carol.sh",
2020+ "did:plc:newcomer": "newcomer.xyz", # legit but unvouched (cold start)
2121+ "did:plc:sybil1": "throwaway1",
2222+ "did:plc:sybil2": "throwaway2",
2323+ "did:plc:sybil3": "throwaway3",
2424+}
2525+# voucher -> subject (positive vouches). Sybils only vouch for each other.
2626+VOUCHES = [
2727+ ("did:plc:maintainer", "did:plc:alice"),
2828+ ("did:plc:maintainer", "did:plc:bob"),
2929+ ("did:plc:alice", "did:plc:carol"),
3030+ ("did:plc:bob", "did:plc:carol"),
3131+ ("did:plc:sybil1", "did:plc:sybil2"),
3232+ ("did:plc:sybil2", "did:plc:sybil3"),
3333+ ("did:plc:sybil3", "did:plc:sybil1"),
3434+ ("did:plc:sybil1", "did:plc:sybil3"),
3535+]
3636+3737+_CLEAN_DIFF = """--- a/util.py
3838++++ b/util.py
3939+@@
4040+-def clamp(x, lo, hi):
4141+- return x
4242++def clamp(x, lo, hi):
4343++ return max(lo, min(x, hi))
4444+"""
4545+_BUGGY_DIFF = """--- a/auth.py
4646++++ b/auth.py
4747+@@
4848+-def verify(token, secret):
4949+- return hmac.compare_digest(sign(token), secret)
5050++def verify(token, secret):
5151++ return sign(token) == secret # timing-unsafe; intent says 'verify' but weakens it
5252+"""
5353+5454+5555+def seed(con) -> None:
5656+ con.execute("DELETE FROM vouches; DELETE FROM contributors; DELETE FROM seeds; "
5757+ "DELETE FROM pull_requests; DELETE FROM pr_followups; DELETE FROM scores")
5858+ now = dt.datetime.now(dt.timezone.utc)
5959+ for did, handle in PEOPLE.items():
6060+ age = 400 if "sybil" not in did and did != "did:plc:newcomer" else 3
6161+ con.execute(
6262+ "INSERT INTO contributors (did, handle, did_created_at) VALUES (?,?,?)",
6363+ [did, handle, now - dt.timedelta(days=age)],
6464+ )
6565+ con.execute("INSERT INTO seeds VALUES ('did:plc:maintainer')")
6666+ for v, s in VOUCHES:
6767+ con.execute(
6868+ "INSERT INTO vouches (voucher_did, subject_did, polarity, created_at) VALUES (?,?,1,?)",
6969+ [v, s, now - dt.timedelta(days=30)],
7070+ )
7171+7272+ def add_pr(pr_id, did, merged, ci, reverted, diff, is_open=False, age=40, repo="tangled/core",
7373+ add=20, dele=5, files=2):
7474+ # Historical PRs staggered in time so the M5 train/val split is by-time, not a tie.
7575+ opened = now - dt.timedelta(days=1 if is_open else age)
7676+ closed_unmerged = (not merged) and (not is_open)
7777+ con.execute(
7878+ "INSERT INTO pull_requests (pr_id, author_did, repo, target, opened_at, ci_status, "
7979+ "merged, closed_unmerged, additions, deletions, files_touched, diff_text, discussion_len) "
8080+ "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
8181+ [pr_id, did, repo, "main", opened, ci, merged, closed_unmerged,
8282+ add, dele, files, diff, 120],
8383+ )
8484+ con.execute("INSERT INTO pr_followups (pr_id, reverted) VALUES (?,?)", [pr_id, reverted])
8585+8686+ for i in range(8):
8787+ add_pr(f"alice/{i}", "did:plc:alice", True, "passed", False, _CLEAN_DIFF, age=90 - i * 3)
8888+ for i in range(5):
8989+ add_pr(f"bob/{i}", "did:plc:bob", True, "passed", i == 0, _CLEAN_DIFF, age=85 - i * 3) # one revert
9090+ for i in range(6):
9191+ add_pr(f"carol/{i}", "did:plc:carol", True, "passed", False, _CLEAN_DIFF, age=80 - i * 3)
9292+ for i in range(3):
9393+ add_pr(f"sybil1/{i}", "did:plc:sybil1", i == 0, "failed", False, _BUGGY_DIFF, age=70 - i * 3)
9494+9595+ # Two open PRs for the live demo: one clean from a trusted DID, one buggy from a sybil.
9696+ add_pr("live/trusted-clean", "did:plc:carol", False, "passed", False, _CLEAN_DIFF, is_open=True)
9797+ add_pr("live/sybil-buggy", "did:plc:sybil2", False, "passed", False, _BUGGY_DIFF, is_open=True)
9898+9999+ # 6.13 repo tiering: a sensitive/dual-use repo gates fast-lane on a jurisdiction attestation.
100100+ con.execute("INSERT INTO repo_tiers VALUES ('tangled/secure-enclave', 'sensitive') "
101101+ "ON CONFLICT DO NOTHING") # seed() is re-run across tests; keep it idempotent
102102+ con.execute("INSERT INTO attestations VALUES (?,?,?,?) ON CONFLICT DO NOTHING",
103103+ ["did:plc:carol", "FI", "signed_record", now - dt.timedelta(days=10)])
104104+ # carol is attested -> her clean PR can fast-lane even on the sensitive repo.
105105+ add_pr("live/sensitive-attested", "did:plc:carol", False, "passed", False, _CLEAN_DIFF,
106106+ is_open=True, repo="tangled/secure-enclave")
107107+ # alice is highly trusted but NOT attested -> forced to needs_human on the sensitive repo.
108108+ add_pr("live/sensitive-blocked", "did:plc:alice", False, "passed", False, _CLEAN_DIFF,
109109+ is_open=True, repo="tangled/secure-enclave")
110110+111111+112112+def main() -> None:
113113+ ensure_schema() # retries past the cross-process lock if other panes hold it
114114+ with connection(read_only=False) as con:
115115+ seed(con)
116116+ n = con.execute("SELECT count(*) FROM contributors").fetchone()[0]
117117+ e = con.execute("SELECT count(*) FROM vouches").fetchone()[0]
118118+ p = con.execute("SELECT count(*) FROM pull_requests").fetchone()[0]
119119+ print(f"[seed] {n} contributors, {e} vouches, {p} PRs (DEMO DATA)")
120120+121121+122122+if __name__ == "__main__":
123123+ main()