Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.0 kB View raw
1"""Historical backfill: scrape ALL sh.tangled.* records across the network, once. 2 3`trust.ingest` is the live tail (Jetstream, ~5s replay). This is the history: 4enumerate every repo holding a sh.tangled.* collection via the relay's 5listReposByCollection, pull each repo's records with listRecords (JSON, no CAR 6parsing), archive them raw to the `events` table, and feed them through the SAME 7ingest.derive() -> contributors / vouches / pull_requests. Then train as usual 8(`python -m trust.learned`, `python -m trust.gnn`). 9 10Storage: everything goes through connection() -> DUCKDB_PATH under DATA_ROOT, so 11point DATA_ROOT at the external drive (`export DATA_ROOT=/Volumes/<drive>`); the 12writability assert in ensure_data_root() fails fast if it isn't mounted. 13 14Resumability: derive() upserts / inserts ON CONFLICT DO NOTHING, so re-running is 15idempotent — just run it again to resume. ponytail: idempotent writes instead of a 16checkpoint table; add a per-DID cursor table only if a full run gets too slow to repeat. 17 18Confirm record shapes FIRST: `python -m trust.backfill --sample` prints real 19records so you can verify the field names derive() assumes (the config NSID map and 20field guesses are flagged unconfirmed). If pulls turn out to be thin pointers 21(diff/CI live on the knot), that's a second fetch — don't build it until --sample 22proves it's needed (YAGNI). 23""" 24 25from __future__ import annotations 26 27import argparse 28import json 29import time 30import urllib.error 31import urllib.parse 32import urllib.request 33 34from .db import connection, ensure_schema 35from .ingest import derive 36 37# listReposByCollection lives on the relay; listRecords on each repo's PDS. 38RELAY = "https://relay1.us-west.bsky.network" 39PLC = "https://plc.directory" 40 41# The WHOLE sh.tangled.* lexicon that actually holds records (live census via 42# listReposByCollection; git.refUpdate and bobbin came back empty, so they're omitted). 43# Every record is archived raw to `events`; derive() typed-extracts the subset it 44# knows (pull / vouch / follow), the rest just lives in the raw mirror. 45COLLECTIONS = [ 46 "sh.tangled.repo.pull", "sh.tangled.repo.pull.status", "sh.tangled.graph.vouch", 47 "sh.tangled.graph.follow", "sh.tangled.repo.issue", "sh.tangled.repo", 48 "sh.tangled.actor.profile", "sh.tangled.feed.star", "sh.tangled.knot", 49 "sh.tangled.knot.member", "sh.tangled.repo.collaborator", "sh.tangled.spindle.member", 50 "sh.tangled.repo.artifact", "sh.tangled.pipeline", 51] 52 53_PDS_CACHE: dict[str, str | None] = {} # DIDs recur across collections; resolve each once. 54 55 56def _get(url: str, tries: int = 4) -> dict: 57 """GET JSON with naive backoff on 429/5xx. ponytail: linear sleep, no token bucket.""" 58 for i in range(tries): 59 try: 60 req = urllib.request.Request(url, headers={"User-Agent": "trust-backfill"}) 61 with urllib.request.urlopen(req, timeout=30) as r: 62 return json.load(r) 63 except urllib.error.HTTPError as e: 64 if e.code in (429, 502, 503) and i < tries - 1: 65 time.sleep(2 * (i + 1)) 66 continue 67 raise 68 return {} 69 70 71def _pds(did: str) -> str | None: 72 """DID -> PDS endpoint (cached). Handles did:plc via PLC directory and did:web inline.""" 73 if did in _PDS_CACHE: 74 return _PDS_CACHE[did] 75 endpoint = None 76 try: 77 if did.startswith("did:web:"): 78 doc = _get(f"https://{did[len('did:web:'):]}/.well-known/did.json") 79 else: 80 doc = _get(f"{PLC}/{urllib.parse.quote(did)}") 81 for s in doc.get("service", []): 82 if s.get("type") == "AtprotoPersonalDataServer": 83 endpoint = s["serviceEndpoint"].rstrip("/") 84 break 85 except Exception: 86 endpoint = None 87 _PDS_CACHE[did] = endpoint 88 return endpoint 89 90 91def _repos(collection: str): 92 """All DIDs holding `collection`, paginated via the relay.""" 93 cursor = None 94 while True: 95 q = {"collection": collection, "limit": "500"} 96 if cursor: 97 q["cursor"] = cursor 98 page = _get(f"{RELAY}/xrpc/com.atproto.sync.listReposByCollection?{urllib.parse.urlencode(q)}") 99 for r in page.get("repos", []): 100 yield r["did"] 101 cursor = page.get("cursor") 102 if not cursor: 103 return 104 105 106def _records(pds: str, did: str, collection: str): 107 """All records of one collection in one repo, paginated via listRecords.""" 108 cursor = None 109 while True: 110 q = {"repo": did, "collection": collection, "limit": "100"} 111 if cursor: 112 q["cursor"] = cursor 113 page = _get(f"{pds}/xrpc/com.atproto.repo.listRecords?{urllib.parse.urlencode(q)}") 114 for rec in page.get("records", []): 115 yield rec # {uri, cid, value} 116 cursor = page.get("cursor") 117 if not cursor: 118 return 119 120 121def _archive_and_derive(buf: list[tuple]) -> None: 122 """Durable raw log to `events` (on the external drive) + derive into typed tables. 123 Does NOT touch ingest_state — that cursor belongs to the live firehose, not backfill.""" 124 if not buf: 125 return 126 with connection(read_only=False) as con: 127 con.executemany( 128 "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)", 129 buf, 130 ) 131 derive(con, buf) 132 133 134def _fetch_repo(col: str, did: str) -> list[tuple] | None: 135 """Network only (thread-safe): all of one repo's records as event tuples, or None on error. 136 No DB access here, so WORKERS of these run concurrently without touching the single writer.""" 137 try: 138 pds = _pds(did) 139 if not pds: 140 return None 141 out = [] 142 for rec in _records(pds, did, col): 143 # time_us=0: listRecords has no firehose seq; archive key is (did, collection, rkey). 144 rkey = rec["uri"].rsplit("/", 1)[-1] 145 out.append((did, 0, "create", col, rkey, json.dumps(rec["value"]))) 146 return out 147 except Exception: # a dead PDS / 400 must not abort the run 148 return None 149 150 151def backfill(collections=COLLECTIONS, max_repos: int | None = None, workers: int = 12) -> dict: 152 """Parallel scrape: a thread pool fetches repos concurrently (the slow network part), 153 the main thread writes in chunks (DuckDB is single-writer; chunking also frees the file 154 between flushes so the dashboard's read-only polls interleave). ponytail: ThreadPoolExecutor 155 over a 12-wide pool, not asyncio — the work is I/O-bound and the GIL releases on socket waits.""" 156 from concurrent.futures import ThreadPoolExecutor 157 158 ensure_schema() 159 counts: dict[str, int] = {} 160 for col in collections: 161 dids = list(_repos(col)) 162 if max_repos: 163 dids = dids[:max_repos] 164 records = errors = 0 165 buf: list[tuple] = [] 166 with ThreadPoolExecutor(max_workers=workers) as ex: 167 for i, res in enumerate(ex.map(lambda d: _fetch_repo(col, d), dids), 1): 168 if res is None: 169 errors += 1 170 else: 171 buf.extend(res) 172 records += len(res) 173 if len(buf) >= 1000: # flush in chunks: one connection per ~1k records, not per repo 174 _archive_and_derive(buf) 175 buf = [] 176 if i % 200 == 0: 177 print(f"[backfill] {col}: {i}/{len(dids)} repos, {records} records ({errors} skipped)", flush=True) 178 _archive_and_derive(buf) 179 counts[col] = records 180 print(f"[backfill] DONE {col}: {records} records from {len(dids)} repos ({errors} skipped)", flush=True) 181 return counts 182 183 184def sample(collection: str = COLLECTIONS[0], n: int = 3) -> None: 185 """Print real record values so you can confirm the fields derive() assumes.""" 186 for did in _repos(collection): 187 pds = _pds(did) 188 if not pds: 189 continue 190 shown = 0 191 for rec in _records(pds, did, collection): 192 print(json.dumps(rec["value"], indent=2)) 193 shown += 1 194 if shown >= n: 195 return 196 197 198def main() -> None: 199 ap = argparse.ArgumentParser(description="Backfill all sh.tangled.* history into DuckDB (under DATA_ROOT)") 200 ap.add_argument("--sample", action="store_true", help="print real records to confirm field shapes, write nothing") 201 ap.add_argument("--collection", default=None, help="restrict to one NSID (default: all known)") 202 ap.add_argument("--max-repos", type=int, default=None, help="cap repos per collection (smoke test)") 203 ap.add_argument("--workers", type=int, default=12, help="concurrent repo fetchers (default 12)") 204 args = ap.parse_args() 205 if args.sample: 206 sample(args.collection or COLLECTIONS[0]) 207 return 208 cols = [args.collection] if args.collection else COLLECTIONS 209 c = backfill(cols, max_repos=args.max_repos, workers=args.workers) 210 print(f"[backfill] done: {c}") 211 212 213if __name__ == "__main__": 214 main()