Sunstead trust scoring project
1"""Historical backfill: scrape ALL sh.tangled.* records across the network, once.
2
3`trust.ingest` is the live tail (Jetstream, ~5s replay). This is the history:
4enumerate every repo holding a sh.tangled.* collection via the relay's
5listReposByCollection, pull each repo's records with listRecords (JSON, no CAR
6parsing), archive them raw to the `events` table, and feed them through the SAME
7ingest.derive() -> contributors / vouches / pull_requests. Then train as usual
8(`python -m trust.learned`, `python -m trust.gnn`).
9
10Storage: everything goes through connection() -> DUCKDB_PATH under DATA_ROOT, so
11point DATA_ROOT at the external drive (`export DATA_ROOT=/Volumes/<drive>`); the
12writability assert in ensure_data_root() fails fast if it isn't mounted.
13
14Resumability: derive() upserts / inserts ON CONFLICT DO NOTHING, so re-running is
15idempotent — just run it again to resume. ponytail: idempotent writes instead of a
16checkpoint table; add a per-DID cursor table only if a full run gets too slow to repeat.
17
18Confirm record shapes FIRST: `python -m trust.backfill --sample` prints real
19records so you can verify the field names derive() assumes (the config NSID map and
20field guesses are flagged unconfirmed). If pulls turn out to be thin pointers
21(diff/CI live on the knot), that's a second fetch — don't build it until --sample
22proves it's needed (YAGNI).
23"""
24
25from __future__ import annotations
26
27import argparse
28import json
29import time
30import urllib.error
31import urllib.parse
32import urllib.request
33
34from .db import connection, ensure_schema
35from .ingest import derive
36
37# listReposByCollection lives on the relay; listRecords on each repo's PDS.
38RELAY = "https://relay1.us-west.bsky.network"
39PLC = "https://plc.directory"
40
41# The WHOLE sh.tangled.* lexicon that actually holds records (live census via
42# listReposByCollection; git.refUpdate and bobbin came back empty, so they're omitted).
43# Every record is archived raw to `events`; derive() typed-extracts the subset it
44# knows (pull / vouch / follow), the rest just lives in the raw mirror.
45COLLECTIONS = [
46 "sh.tangled.repo.pull", "sh.tangled.repo.pull.status", "sh.tangled.graph.vouch",
47 "sh.tangled.graph.follow", "sh.tangled.repo.issue", "sh.tangled.repo",
48 "sh.tangled.actor.profile", "sh.tangled.feed.star", "sh.tangled.knot",
49 "sh.tangled.knot.member", "sh.tangled.repo.collaborator", "sh.tangled.spindle.member",
50 "sh.tangled.repo.artifact", "sh.tangled.pipeline",
51]
52
53_PDS_CACHE: dict[str, str | None] = {} # DIDs recur across collections; resolve each once.
54
55
56def _get(url: str, tries: int = 4) -> dict:
57 """GET JSON with naive backoff on 429/5xx. ponytail: linear sleep, no token bucket."""
58 for i in range(tries):
59 try:
60 req = urllib.request.Request(url, headers={"User-Agent": "trust-backfill"})
61 with urllib.request.urlopen(req, timeout=30) as r:
62 return json.load(r)
63 except urllib.error.HTTPError as e:
64 if e.code in (429, 502, 503) and i < tries - 1:
65 time.sleep(2 * (i + 1))
66 continue
67 raise
68 return {}
69
70
71def _pds(did: str) -> str | None:
72 """DID -> PDS endpoint (cached). Handles did:plc via PLC directory and did:web inline."""
73 if did in _PDS_CACHE:
74 return _PDS_CACHE[did]
75 endpoint = None
76 try:
77 if did.startswith("did:web:"):
78 doc = _get(f"https://{did[len('did:web:'):]}/.well-known/did.json")
79 else:
80 doc = _get(f"{PLC}/{urllib.parse.quote(did)}")
81 for s in doc.get("service", []):
82 if s.get("type") == "AtprotoPersonalDataServer":
83 endpoint = s["serviceEndpoint"].rstrip("/")
84 break
85 except Exception:
86 endpoint = None
87 _PDS_CACHE[did] = endpoint
88 return endpoint
89
90
91def _repos(collection: str):
92 """All DIDs holding `collection`, paginated via the relay."""
93 cursor = None
94 while True:
95 q = {"collection": collection, "limit": "500"}
96 if cursor:
97 q["cursor"] = cursor
98 page = _get(f"{RELAY}/xrpc/com.atproto.sync.listReposByCollection?{urllib.parse.urlencode(q)}")
99 for r in page.get("repos", []):
100 yield r["did"]
101 cursor = page.get("cursor")
102 if not cursor:
103 return
104
105
106def _records(pds: str, did: str, collection: str):
107 """All records of one collection in one repo, paginated via listRecords."""
108 cursor = None
109 while True:
110 q = {"repo": did, "collection": collection, "limit": "100"}
111 if cursor:
112 q["cursor"] = cursor
113 page = _get(f"{pds}/xrpc/com.atproto.repo.listRecords?{urllib.parse.urlencode(q)}")
114 for rec in page.get("records", []):
115 yield rec # {uri, cid, value}
116 cursor = page.get("cursor")
117 if not cursor:
118 return
119
120
121def _archive_and_derive(buf: list[tuple]) -> None:
122 """Durable raw log to `events` (on the external drive) + derive into typed tables.
123 Does NOT touch ingest_state — that cursor belongs to the live firehose, not backfill."""
124 if not buf:
125 return
126 with connection(read_only=False) as con:
127 con.executemany(
128 "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)",
129 buf,
130 )
131 derive(con, buf)
132
133
134def _fetch_repo(col: str, did: str) -> list[tuple] | None:
135 """Network only (thread-safe): all of one repo's records as event tuples, or None on error.
136 No DB access here, so WORKERS of these run concurrently without touching the single writer."""
137 try:
138 pds = _pds(did)
139 if not pds:
140 return None
141 out = []
142 for rec in _records(pds, did, col):
143 # time_us=0: listRecords has no firehose seq; archive key is (did, collection, rkey).
144 rkey = rec["uri"].rsplit("/", 1)[-1]
145 out.append((did, 0, "create", col, rkey, json.dumps(rec["value"])))
146 return out
147 except Exception: # a dead PDS / 400 must not abort the run
148 return None
149
150
151def backfill(collections=COLLECTIONS, max_repos: int | None = None, workers: int = 12) -> dict:
152 """Parallel scrape: a thread pool fetches repos concurrently (the slow network part),
153 the main thread writes in chunks (DuckDB is single-writer; chunking also frees the file
154 between flushes so the dashboard's read-only polls interleave). ponytail: ThreadPoolExecutor
155 over a 12-wide pool, not asyncio — the work is I/O-bound and the GIL releases on socket waits."""
156 from concurrent.futures import ThreadPoolExecutor
157
158 ensure_schema()
159 counts: dict[str, int] = {}
160 for col in collections:
161 dids = list(_repos(col))
162 if max_repos:
163 dids = dids[:max_repos]
164 records = errors = 0
165 buf: list[tuple] = []
166 with ThreadPoolExecutor(max_workers=workers) as ex:
167 for i, res in enumerate(ex.map(lambda d: _fetch_repo(col, d), dids), 1):
168 if res is None:
169 errors += 1
170 else:
171 buf.extend(res)
172 records += len(res)
173 if len(buf) >= 1000: # flush in chunks: one connection per ~1k records, not per repo
174 _archive_and_derive(buf)
175 buf = []
176 if i % 200 == 0:
177 print(f"[backfill] {col}: {i}/{len(dids)} repos, {records} records ({errors} skipped)", flush=True)
178 _archive_and_derive(buf)
179 counts[col] = records
180 print(f"[backfill] DONE {col}: {records} records from {len(dids)} repos ({errors} skipped)", flush=True)
181 return counts
182
183
184def sample(collection: str = COLLECTIONS[0], n: int = 3) -> None:
185 """Print real record values so you can confirm the fields derive() assumes."""
186 for did in _repos(collection):
187 pds = _pds(did)
188 if not pds:
189 continue
190 shown = 0
191 for rec in _records(pds, did, collection):
192 print(json.dumps(rec["value"], indent=2))
193 shown += 1
194 if shown >= n:
195 return
196
197
198def main() -> None:
199 ap = argparse.ArgumentParser(description="Backfill all sh.tangled.* history into DuckDB (under DATA_ROOT)")
200 ap.add_argument("--sample", action="store_true", help="print real records to confirm field shapes, write nothing")
201 ap.add_argument("--collection", default=None, help="restrict to one NSID (default: all known)")
202 ap.add_argument("--max-repos", type=int, default=None, help="cap repos per collection (smoke test)")
203 ap.add_argument("--workers", type=int, default=12, help="concurrent repo fetchers (default 12)")
204 args = ap.parse_args()
205 if args.sample:
206 sample(args.collection or COLLECTIONS[0])
207 return
208 cols = [args.collection] if args.collection else COLLECTIONS
209 c = backfill(cols, max_repos=args.max_repos, workers=args.workers)
210 print(f"[backfill] done: {c}")
211
212
213if __name__ == "__main__":
214 main()