Sunstead trust scoring project
1"""DuckDB store: schema, connection, the derive step, and feature/label SQL.
2
3Single embedded file at DUCKDB_PATH (PRD 2: no graph DB, no server). The
4ingester is the only writer; everything else reads.
5"""
6
7from __future__ import annotations
8
9import time
10from contextlib import contextmanager
11
12import duckdb
13
14from .config import CFG, DUCKDB_PATH, ensure_data_root
15
16SCHEMA = """
17CREATE TABLE IF NOT EXISTS events (
18 did VARCHAR, time_us BIGINT, operation VARCHAR, collection VARCHAR,
19 rkey VARCHAR, record JSON, ingested_at TIMESTAMP DEFAULT now()
20);
21CREATE TABLE IF NOT EXISTS contributors (
22 did VARCHAR PRIMARY KEY, handle VARCHAR, did_created_at TIMESTAMP,
23 pds_host VARCHAR, first_seen TIMESTAMP DEFAULT now()
24);
25-- vouches IS the whole graph (PRD 2): a plain edge list, no graph engine.
26CREATE TABLE IF NOT EXISTS vouches (
27 voucher_did VARCHAR, subject_did VARCHAR, polarity INTEGER DEFAULT 1,
28 reason VARCHAR, evidence_uri VARCHAR, created_at TIMESTAMP, weight DOUBLE DEFAULT 1.0,
29 PRIMARY KEY (voucher_did, subject_did)
30);
31CREATE TABLE IF NOT EXISTS pull_requests (
32 pr_id VARCHAR PRIMARY KEY, author_did VARCHAR, repo VARCHAR, target VARCHAR,
33 opened_at TIMESTAMP, ci_status VARCHAR, merged BOOLEAN, merged_at TIMESTAMP,
34 closed_unmerged BOOLEAN, additions INTEGER, deletions INTEGER,
35 files_touched INTEGER, diff_text VARCHAR, discussion_len INTEGER
36);
37CREATE TABLE IF NOT EXISTS pr_followups (
38 pr_id VARCHAR PRIMARY KEY, reverted BOOLEAN DEFAULT FALSE,
39 patched_same_lines_within_n_days BOOLEAN DEFAULT FALSE
40);
41-- authoritative pull outcome from sh.tangled.repo.pull.status (public record). Separate
42-- table so a status arriving before its pull record (ordering not guaranteed) is never lost.
43CREATE TABLE IF NOT EXISTS pull_status (
44 pr_id VARCHAR PRIMARY KEY, status VARCHAR, updated_at TIMESTAMP DEFAULT now()
45);
46CREATE TABLE IF NOT EXISTS scores (
47 did VARCHAR, as_of TIMESTAMP DEFAULT now(), structural_trust DOUBLE,
48 content_risk DOUBLE, calibrated_prob DOUBLE, decision VARCHAR, explanation_json JSON
49);
50CREATE TABLE IF NOT EXISTS ingest_state (stream VARCHAR PRIMARY KEY, last_time_us BIGINT);
51-- trusted maintainer seed set for personalized EigenTrust (PRD 6.4)
52CREATE TABLE IF NOT EXISTS seeds (did VARCHAR PRIMARY KEY);
53-- repo tiering (PRD 6.13): sensitive/dual-use repos gate fast-lane on an attestation
54CREATE TABLE IF NOT EXISTS repo_tiers (repo VARCHAR PRIMARY KEY, tier VARCHAR DEFAULT 'public');
55-- star graph (sh.tangled.feed.star): starrer -> repo owner. NOT sybil-resistant on its
56-- own (a star is cheap), so it's a model FEATURE, never a trust-graph edge. Keyed by
57-- (starrer, owner) so one DID endorsing an owner counts once, not once per repo.
58CREATE TABLE IF NOT EXISTS stars (
59 starrer_did VARCHAR, owner_did VARCHAR, created_at TIMESTAMP,
60 PRIMARY KEY (starrer_did, owner_did)
61);
62-- contributor-issued jurisdiction attestations (signed records); declared, never inferred
63CREATE TABLE IF NOT EXISTS attestations (
64 did VARCHAR, jurisdiction VARCHAR, method VARCHAR, created_at TIMESTAMP,
65 PRIMARY KEY (did, jurisdiction)
66);
67-- AT-Proto writeback (PRD 6.11): the at:// URI of each assessment published as a record
68CREATE TABLE IF NOT EXISTS published_records (
69 did VARCHAR, as_of TIMESTAMP, uri VARCHAR, PRIMARY KEY (did, as_of)
70);
71-- diff-embedding corpus (PRD 6.12 / section 4): near-duplicate detection of known-bad
72-- patterns. Vector search stays in DuckDB (list_cosine_similarity) -- no separate engine.
73CREATE TABLE IF NOT EXISTS diff_vectors (pr_id VARCHAR PRIMARY KEY, label VARCHAR, embedding DOUBLE[]);
74"""
75
76# Per-DID feature view (PRD 6.3/6.5). eigentrust_score + bsky_* are joined in
77# at scoring time (computed in Python / from app.bsky events).
78FEATURES_VIEW = f"""
79CREATE OR REPLACE VIEW features AS
80WITH pr AS (
81 SELECT p.*, COALESCE(f.reverted, FALSE) AS reverted,
82 COALESCE(f.patched_same_lines_within_n_days, FALSE) AS patched_quick,
83 -- merge outcome: pull_status (authoritative, public record) overrides the PDS
84 -- record's merged field, which is always NULL on real sh.tangled.repo.pull.
85 COALESCE(ps.status LIKE '%.merged', p.merged, FALSE) AS is_merged,
86 COALESCE(ps.status LIKE '%.closed', p.closed_unmerged, FALSE) AS is_closed,
87 -- clean_merge label (PRD 6.3); NULL when too recent to have elapsed the N-day window.
88 -- CI relaxed: pass/fail isn't a public Tangled record, and a `merged` PR already
89 -- cleared the merge-gate (which runs CI). So merged-and-not-CI-failed counts; only an
90 -- explicit ci_status='failed' disqualifies. Tighten if a CI verdict source is wired.
91 CASE
92 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
93 WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE)
94 AND COALESCE(p.ci_status, 'passed') <> 'failed'
95 AND NOT COALESCE(f.reverted, FALSE)
96 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
97 ELSE 0
98 END AS clean_merge
99 FROM pull_requests p
100 LEFT JOIN pr_followups f USING (pr_id)
101 LEFT JOIN pull_status ps USING (pr_id)
102)
103SELECT
104 c.did,
105 date_diff('day', c.did_created_at, now()) AS did_age_days,
106 COUNT(*) FILTER (WHERE pr.is_merged) AS merged_pr_count,
107 COALESCE(AVG(CASE WHEN pr.reverted THEN 1.0 ELSE 0.0 END), 0) AS revert_rate,
108 COALESCE(AVG(CASE WHEN pr.ci_status='passed' THEN 1.0 ELSE 0.0 END), 0) AS ci_pass_rate,
109 COALESCE(AVG(CASE WHEN pr.is_closed THEN 1.0 ELSE 0.0 END), 0) AS close_without_merge_ratio,
110 COALESCE(AVG(pr.additions + pr.deletions), 0) AS mean_diff_size,
111 COALESCE(AVG(pr.files_touched), 0) AS mean_files_touched,
112 COALESCE(SUM(pr.additions + pr.deletions), 0) AS churn,
113 COALESCE(AVG(pr.discussion_len), 0) AS mean_discussion_len,
114 (SELECT COUNT(*) FROM vouches v WHERE v.subject_did = c.did AND v.polarity < 0) AS denounce_count,
115 -- raw star count (advisory feature, gameable); the sybil-resistant trust-weighted
116 -- version (stars_trust) is computed in Python and rides on the EigenResult.
117 (SELECT COUNT(*) FROM stars st WHERE st.owner_did = c.did) AS stars_received,
118 AVG(pr.clean_merge) AS clean_merge_rate
119FROM contributors c
120LEFT JOIN pr ON pr.author_did = c.did
121GROUP BY c.did, c.did_created_at;
122"""
123
124
125# Per-PR clean_merge label (PRD 6.3) for supervised training; NULL when too recent.
126PR_LABELS_VIEW = f"""
127CREATE OR REPLACE VIEW pr_labels AS
128SELECT p.pr_id, p.author_did, p.opened_at,
129 CASE
130 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL
131 WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE)
132 AND COALESCE(p.ci_status, 'passed') <> 'failed'
133 AND NOT COALESCE(f.reverted, FALSE)
134 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1
135 ELSE 0
136 END AS clean_merge
137FROM pull_requests p
138LEFT JOIN pr_followups f USING (pr_id)
139LEFT JOIN pull_status ps USING (pr_id);
140"""
141
142
143def connect(read_only: bool = False) -> duckdb.DuckDBPyConnection:
144 ensure_data_root()
145 con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only)
146 return con
147
148
149def init_db(con: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyConnection:
150 con = con or connect()
151 con.execute(SCHEMA)
152 con.execute(FEATURES_VIEW)
153 con.execute(PR_LABELS_VIEW)
154 return con
155
156
157@contextmanager
158def connection(read_only: bool = False, attempts: int = 500, delay: float = 0.02):
159 """Short-lived connection with retry on DuckDB's cross-process file lock.
160
161 DuckDB allows only one read-write process; a held lock blocks every other
162 open (even read-only). So long-running processes (API, score loop, ingester)
163 must open->work->close per operation, letting panes interleave under mprocs.
164 ponytail: open/close + retry over a single-writer daemon; fine at hackathon
165 scale, revisit if write throughput matters.
166 """
167 ensure_data_root()
168 con = last = None
169 for _ in range(attempts):
170 try:
171 con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only)
172 break
173 except duckdb.IOException as e:
174 last = e
175 time.sleep(delay)
176 if con is None:
177 raise last
178 try:
179 yield con
180 finally:
181 con.close()
182
183
184def ensure_schema() -> None:
185 """Create tables + the features view once (read-write), then release the lock."""
186 with connection(read_only=False) as con:
187 con.execute(SCHEMA)
188 con.execute(FEATURES_VIEW)
189 con.execute(PR_LABELS_VIEW)
190
191
192def main() -> None:
193 con = init_db()
194 tables = con.execute("SHOW TABLES").fetchall()
195 print(f"[db] initialised {DUCKDB_PATH} with {len(tables)} tables/views")
196
197
198if __name__ == "__main__":
199 main()