Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.0 kB View raw
1"""DuckDB store: schema, connection, the derive step, and feature/label SQL. 2 3Single embedded file at DUCKDB_PATH (PRD 2: no graph DB, no server). The 4ingester is the only writer; everything else reads. 5""" 6 7from __future__ import annotations 8 9import time 10from contextlib import contextmanager 11 12import duckdb 13 14from .config import CFG, DUCKDB_PATH, ensure_data_root 15 16SCHEMA = """ 17CREATE TABLE IF NOT EXISTS events ( 18 did VARCHAR, time_us BIGINT, operation VARCHAR, collection VARCHAR, 19 rkey VARCHAR, record JSON, ingested_at TIMESTAMP DEFAULT now() 20); 21CREATE TABLE IF NOT EXISTS contributors ( 22 did VARCHAR PRIMARY KEY, handle VARCHAR, did_created_at TIMESTAMP, 23 pds_host VARCHAR, first_seen TIMESTAMP DEFAULT now() 24); 25-- vouches IS the whole graph (PRD 2): a plain edge list, no graph engine. 26CREATE TABLE IF NOT EXISTS vouches ( 27 voucher_did VARCHAR, subject_did VARCHAR, polarity INTEGER DEFAULT 1, 28 reason VARCHAR, evidence_uri VARCHAR, created_at TIMESTAMP, weight DOUBLE DEFAULT 1.0, 29 PRIMARY KEY (voucher_did, subject_did) 30); 31CREATE TABLE IF NOT EXISTS pull_requests ( 32 pr_id VARCHAR PRIMARY KEY, author_did VARCHAR, repo VARCHAR, target VARCHAR, 33 opened_at TIMESTAMP, ci_status VARCHAR, merged BOOLEAN, merged_at TIMESTAMP, 34 closed_unmerged BOOLEAN, additions INTEGER, deletions INTEGER, 35 files_touched INTEGER, diff_text VARCHAR, discussion_len INTEGER 36); 37CREATE TABLE IF NOT EXISTS pr_followups ( 38 pr_id VARCHAR PRIMARY KEY, reverted BOOLEAN DEFAULT FALSE, 39 patched_same_lines_within_n_days BOOLEAN DEFAULT FALSE 40); 41-- authoritative pull outcome from sh.tangled.repo.pull.status (public record). Separate 42-- table so a status arriving before its pull record (ordering not guaranteed) is never lost. 43CREATE TABLE IF NOT EXISTS pull_status ( 44 pr_id VARCHAR PRIMARY KEY, status VARCHAR, updated_at TIMESTAMP DEFAULT now() 45); 46CREATE TABLE IF NOT EXISTS scores ( 47 did VARCHAR, as_of TIMESTAMP DEFAULT now(), structural_trust DOUBLE, 48 content_risk DOUBLE, calibrated_prob DOUBLE, decision VARCHAR, explanation_json JSON 49); 50CREATE TABLE IF NOT EXISTS ingest_state (stream VARCHAR PRIMARY KEY, last_time_us BIGINT); 51-- trusted maintainer seed set for personalized EigenTrust (PRD 6.4) 52CREATE TABLE IF NOT EXISTS seeds (did VARCHAR PRIMARY KEY); 53-- repo tiering (PRD 6.13): sensitive/dual-use repos gate fast-lane on an attestation 54CREATE TABLE IF NOT EXISTS repo_tiers (repo VARCHAR PRIMARY KEY, tier VARCHAR DEFAULT 'public'); 55-- star graph (sh.tangled.feed.star): starrer -> repo owner. NOT sybil-resistant on its 56-- own (a star is cheap), so it's a model FEATURE, never a trust-graph edge. Keyed by 57-- (starrer, owner) so one DID endorsing an owner counts once, not once per repo. 58CREATE TABLE IF NOT EXISTS stars ( 59 starrer_did VARCHAR, owner_did VARCHAR, created_at TIMESTAMP, 60 PRIMARY KEY (starrer_did, owner_did) 61); 62-- contributor-issued jurisdiction attestations (signed records); declared, never inferred 63CREATE TABLE IF NOT EXISTS attestations ( 64 did VARCHAR, jurisdiction VARCHAR, method VARCHAR, created_at TIMESTAMP, 65 PRIMARY KEY (did, jurisdiction) 66); 67-- AT-Proto writeback (PRD 6.11): the at:// URI of each assessment published as a record 68CREATE TABLE IF NOT EXISTS published_records ( 69 did VARCHAR, as_of TIMESTAMP, uri VARCHAR, PRIMARY KEY (did, as_of) 70); 71-- diff-embedding corpus (PRD 6.12 / section 4): near-duplicate detection of known-bad 72-- patterns. Vector search stays in DuckDB (list_cosine_similarity) -- no separate engine. 73CREATE TABLE IF NOT EXISTS diff_vectors (pr_id VARCHAR PRIMARY KEY, label VARCHAR, embedding DOUBLE[]); 74""" 75 76# Per-DID feature view (PRD 6.3/6.5). eigentrust_score + bsky_* are joined in 77# at scoring time (computed in Python / from app.bsky events). 78FEATURES_VIEW = f""" 79CREATE OR REPLACE VIEW features AS 80WITH pr AS ( 81 SELECT p.*, COALESCE(f.reverted, FALSE) AS reverted, 82 COALESCE(f.patched_same_lines_within_n_days, FALSE) AS patched_quick, 83 -- merge outcome: pull_status (authoritative, public record) overrides the PDS 84 -- record's merged field, which is always NULL on real sh.tangled.repo.pull. 85 COALESCE(ps.status LIKE '%.merged', p.merged, FALSE) AS is_merged, 86 COALESCE(ps.status LIKE '%.closed', p.closed_unmerged, FALSE) AS is_closed, 87 -- clean_merge label (PRD 6.3); NULL when too recent to have elapsed the N-day window. 88 -- CI relaxed: pass/fail isn't a public Tangled record, and a `merged` PR already 89 -- cleared the merge-gate (which runs CI). So merged-and-not-CI-failed counts; only an 90 -- explicit ci_status='failed' disqualifies. Tighten if a CI verdict source is wired. 91 CASE 92 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL 93 WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE) 94 AND COALESCE(p.ci_status, 'passed') <> 'failed' 95 AND NOT COALESCE(f.reverted, FALSE) 96 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1 97 ELSE 0 98 END AS clean_merge 99 FROM pull_requests p 100 LEFT JOIN pr_followups f USING (pr_id) 101 LEFT JOIN pull_status ps USING (pr_id) 102) 103SELECT 104 c.did, 105 date_diff('day', c.did_created_at, now()) AS did_age_days, 106 COUNT(*) FILTER (WHERE pr.is_merged) AS merged_pr_count, 107 COALESCE(AVG(CASE WHEN pr.reverted THEN 1.0 ELSE 0.0 END), 0) AS revert_rate, 108 COALESCE(AVG(CASE WHEN pr.ci_status='passed' THEN 1.0 ELSE 0.0 END), 0) AS ci_pass_rate, 109 COALESCE(AVG(CASE WHEN pr.is_closed THEN 1.0 ELSE 0.0 END), 0) AS close_without_merge_ratio, 110 COALESCE(AVG(pr.additions + pr.deletions), 0) AS mean_diff_size, 111 COALESCE(AVG(pr.files_touched), 0) AS mean_files_touched, 112 COALESCE(SUM(pr.additions + pr.deletions), 0) AS churn, 113 COALESCE(AVG(pr.discussion_len), 0) AS mean_discussion_len, 114 (SELECT COUNT(*) FROM vouches v WHERE v.subject_did = c.did AND v.polarity < 0) AS denounce_count, 115 -- raw star count (advisory feature, gameable); the sybil-resistant trust-weighted 116 -- version (stars_trust) is computed in Python and rides on the EigenResult. 117 (SELECT COUNT(*) FROM stars st WHERE st.owner_did = c.did) AS stars_received, 118 AVG(pr.clean_merge) AS clean_merge_rate 119FROM contributors c 120LEFT JOIN pr ON pr.author_did = c.did 121GROUP BY c.did, c.did_created_at; 122""" 123 124 125# Per-PR clean_merge label (PRD 6.3) for supervised training; NULL when too recent. 126PR_LABELS_VIEW = f""" 127CREATE OR REPLACE VIEW pr_labels AS 128SELECT p.pr_id, p.author_did, p.opened_at, 129 CASE 130 WHEN p.opened_at > now() - INTERVAL {CFG.clean_merge_window_days} DAY THEN NULL 131 WHEN COALESCE(ps.status LIKE '%.merged', p.merged, FALSE) 132 AND COALESCE(p.ci_status, 'passed') <> 'failed' 133 AND NOT COALESCE(f.reverted, FALSE) 134 AND NOT COALESCE(f.patched_same_lines_within_n_days, FALSE) THEN 1 135 ELSE 0 136 END AS clean_merge 137FROM pull_requests p 138LEFT JOIN pr_followups f USING (pr_id) 139LEFT JOIN pull_status ps USING (pr_id); 140""" 141 142 143def connect(read_only: bool = False) -> duckdb.DuckDBPyConnection: 144 ensure_data_root() 145 con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only) 146 return con 147 148 149def init_db(con: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyConnection: 150 con = con or connect() 151 con.execute(SCHEMA) 152 con.execute(FEATURES_VIEW) 153 con.execute(PR_LABELS_VIEW) 154 return con 155 156 157@contextmanager 158def connection(read_only: bool = False, attempts: int = 500, delay: float = 0.02): 159 """Short-lived connection with retry on DuckDB's cross-process file lock. 160 161 DuckDB allows only one read-write process; a held lock blocks every other 162 open (even read-only). So long-running processes (API, score loop, ingester) 163 must open->work->close per operation, letting panes interleave under mprocs. 164 ponytail: open/close + retry over a single-writer daemon; fine at hackathon 165 scale, revisit if write throughput matters. 166 """ 167 ensure_data_root() 168 con = last = None 169 for _ in range(attempts): 170 try: 171 con = duckdb.connect(str(DUCKDB_PATH), read_only=read_only) 172 break 173 except duckdb.IOException as e: 174 last = e 175 time.sleep(delay) 176 if con is None: 177 raise last 178 try: 179 yield con 180 finally: 181 con.close() 182 183 184def ensure_schema() -> None: 185 """Create tables + the features view once (read-write), then release the lock.""" 186 with connection(read_only=False) as con: 187 con.execute(SCHEMA) 188 con.execute(FEATURES_VIEW) 189 con.execute(PR_LABELS_VIEW) 190 191 192def main() -> None: 193 con = init_db() 194 tables = con.execute("SHOW TABLES").fetchall() 195 print(f"[db] initialised {DUCKDB_PATH} with {len(tables)} tables/views") 196 197 198if __name__ == "__main__": 199 main()