Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.5 kB View raw
1"""M1 ingest: Jetstream firehose -> events (batched) -> derive typed tables. 2 3Single writer (PRD 6.1). Buffer in memory, append in batches, persist the 4`time_us` cursor so a crash resumes gaplessly. The cursor + the durable events 5log ARE the resumability a broker would give (PRD 2). 6""" 7 8from __future__ import annotations 9 10import argparse 11import asyncio 12import json 13from collections import Counter 14 15import websockets 16 17from .config import COLLECTION_KINDS, JETSTREAM_URL, WANTED_COLLECTIONS 18from .db import connection, ensure_schema 19 20STREAM = "jetstream" 21BATCH = 200 22FLUSH_SECONDS = 2.0 23 24 25def _kind(collection: str) -> str | None: 26 # Longest matching needle wins, so "tangled.repo.pull.status" maps to pull_status 27 # rather than being swallowed by the shorter "tangled.repo.pull" -> pull_request. 28 best, blen = None, -1 29 for needle, kind in COLLECTION_KINDS.items(): 30 if needle in collection and len(needle) > blen: 31 best, blen = kind, len(needle) 32 return best 33 34 35def _url(con) -> str: 36 row = con.execute("SELECT last_time_us FROM ingest_state WHERE stream=?", [STREAM]).fetchone() 37 cursor = f"&cursor={row[0] - 5_000_000}" if row and row[0] else "" # -5s for gapless replay 38 cols = "".join(f"&wantedCollections={c}" for c in WANTED_COLLECTIONS.split(",")) 39 return f"{JETSTREAM_URL}?{cols.lstrip('&')}{cursor}" 40 41 42def flush(con, buf: list[tuple]) -> None: 43 if not buf: 44 return 45 con.executemany( 46 "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)", 47 buf, 48 ) 49 last = max(e[1] for e in buf) 50 con.execute( 51 "INSERT INTO ingest_state (stream, last_time_us) VALUES (?, ?) " 52 "ON CONFLICT (stream) DO UPDATE SET last_time_us=excluded.last_time_us", 53 [STREAM, last], 54 ) 55 derive(con, buf) 56 buf.clear() 57 58 59def _ts(v): 60 """createdAt -> a value DuckDB's TIMESTAMP accepts, or NULL. Real records sometimes 61 carry createdAt="" (or missing), which crashes the insert; coerce those to None.""" 62 v = (v or "").strip() if isinstance(v, str) else v 63 return v or None 64 65 66def derive(con, events: list[tuple]) -> None: 67 """Raw event tuples -> contributors / vouches / pull_requests (PRD 6.1, 6.2). 68 Per-record try/except: one malformed record is skipped, never aborts the batch.""" 69 for did, time_us, op, collection, rkey, record_json in events: 70 try: 71 kind = _kind(collection) 72 rec = json.loads(record_json) if record_json else {} 73 con.execute( 74 "INSERT INTO contributors (did, first_seen) VALUES (?, now()) ON CONFLICT (did) DO NOTHING", 75 [did], 76 ) 77 if kind in ("vouch", "denounce") and op != "delete": 78 # Real sh.tangled.graph.vouch: subject is the RKEY (at://voucher/.../<subject_did>), 79 # not a record field; vouch-vs-denounce is in rec["kind"]. Confirmed via listRecords. 80 subject = (rec.get("subject") or rec.get("subjectDid") 81 or (rkey if str(rkey).startswith("did:") else None)) 82 if not subject: 83 continue 84 polarity = -1 if (kind == "denounce" or rec.get("kind") == "denounce") else 1 85 con.execute( 86 "INSERT INTO vouches (voucher_did, subject_did, polarity, reason, evidence_uri, created_at, weight) " 87 "VALUES (?,?,?,?,?,?,1.0) ON CONFLICT (voucher_did, subject_did) DO UPDATE SET " 88 "polarity=excluded.polarity, reason=excluded.reason", 89 [did, subject, polarity, rec.get("reason"), rec.get("evidence") or rec.get("uri"), 90 _ts(rec.get("createdAt"))], 91 ) 92 elif kind == "pull_status" and op != "delete": 93 # sh.tangled.repo.pull.status: authoritative outcome (.merged/.closed/.open), 94 # the label signal absent from the pull record itself. It references the pull by 95 # AT-URI and may be authored by a DIFFERENT did than the pull owner, so the pr_id 96 # comes from the `pull` field (at://<did>/<coll>/<rkey> -> our pr_id), never from 97 # this record's own did/rkey. Side table so it's insertion-order independent. 98 pull_uri = rec.get("pull") or "" 99 pr_id = pull_uri[len("at://"):] if pull_uri.startswith("at://") else None 100 status = rec.get("status") 101 if pr_id and status: 102 con.execute( 103 "INSERT INTO pull_status (pr_id, status, updated_at) VALUES (?,?,now()) " 104 "ON CONFLICT (pr_id) DO UPDATE SET status=excluded.status, updated_at=now()", 105 [pr_id, status], 106 ) 107 elif kind == "star" and op != "delete": 108 # sh.tangled.feed.star: subject.did is the starred repo's OWNER; the record 109 # author (did) is the starrer. A star is cheap -> feature only, never a trust 110 # edge. PK (starrer, owner) dedups multi-repo stars; skip self-stars. 111 subj = rec.get("subject") if isinstance(rec.get("subject"), dict) else {} 112 owner = subj.get("did") 113 if owner and owner != did: 114 con.execute( 115 "INSERT INTO stars (starrer_did, owner_did, created_at) VALUES (?,?,?) " 116 "ON CONFLICT (starrer_did, owner_did) DO NOTHING", 117 [did, owner, _ts(rec.get("createdAt"))], 118 ) 119 elif kind == "attestation" and op != "delete": # 6.13 jurisdiction attestation 120 con.execute( 121 "INSERT INTO attestations (did, jurisdiction, method, created_at) VALUES (?,?,?,?) " 122 "ON CONFLICT (did, jurisdiction) DO NOTHING", 123 [did, rec.get("jurisdiction"), rec.get("method", "signed_record"), _ts(rec.get("createdAt"))], 124 ) 125 elif kind == "pull_request" and op != "delete": 126 pr_id = f"{did}/{collection}/{rkey}" 127 tgt = rec.get("target") if isinstance(rec.get("target"), dict) else {} 128 # Real sh.tangled.repo.pull carries identity + branches + body, but NOT the 129 # label-bearing outcome: merged / ci_status / diff-stats are appview/knot state, 130 # absent from the PDS record (merged->NULL here). The diff is rounds[].patchBlob 131 # (a gzipped blob CID), not inline. ci_status/merged stay NULL until joined from 132 # the appview; see backfill.py header. ponytail: ingest what's in the record, 133 # leave outcome columns NULL rather than fabricating bool(missing)=False. 134 con.execute( 135 "INSERT INTO pull_requests (pr_id, author_did, repo, target, opened_at, ci_status, " 136 "merged, closed_unmerged, additions, deletions, files_touched, diff_text, discussion_len) " 137 "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (pr_id) DO NOTHING", 138 [pr_id, did, tgt.get("repo") or rec.get("repo"), tgt.get("branch") or rec.get("target"), 139 _ts(rec.get("createdAt")), rec.get("ciStatus"), rec.get("merged"), False, 140 rec.get("additions"), rec.get("deletions"), rec.get("filesTouched"), 141 rec.get("diff"), len(json.dumps(rec.get("body", "")))], 142 ) 143 except Exception as e: # skip a single malformed record; never abort the batch 144 print(f"[derive] skip {collection} {rkey}: {type(e).__name__}", flush=True) 145 146 147def _flush(buf: list[tuple]) -> None: 148 # Short-lived read-write connection per batch so the API can read between flushes. 149 with connection(read_only=False) as con: 150 flush(con, buf) 151 152 153async def run(probe: bool = False, max_events: int | None = None) -> None: 154 ensure_schema() 155 with connection(read_only=True) as con: 156 url = _url(con) 157 buf: list[tuple] = [] 158 seen: Counter[str] = Counter() 159 n = 0 160 async with websockets.connect(url, max_size=None) as ws: 161 loop = asyncio.get_event_loop() 162 last_flush = loop.time() 163 async for raw in ws: 164 evt = json.loads(raw) 165 if evt.get("kind") != "commit": 166 continue 167 c = evt["commit"] 168 collection = c.get("collection", "") 169 seen[collection] += 1 170 if probe: 171 n += 1 172 if n % 50 == 0: 173 print(f"[probe] {n} events; top collections: {seen.most_common(10)}") 174 if max_events and n >= max_events: 175 print(f"[probe] distinct collections seen:\n " + 176 "\n ".join(f"{k}: {v}" for k, v in seen.most_common())) 177 return 178 continue 179 buf.append((evt["did"], evt["time_us"], c.get("operation"), collection, 180 c.get("rkey"), json.dumps(c.get("record")))) 181 n += 1 182 if len(buf) >= BATCH or loop.time() - last_flush > FLUSH_SECONDS: 183 _flush(buf) 184 last_flush = loop.time() 185 if max_events and n >= max_events: 186 break 187 _flush(buf) 188 189 190def main() -> None: 191 ap = argparse.ArgumentParser(description="Jetstream -> DuckDB ingester") 192 ap.add_argument("--probe", action="store_true", 193 help="log live `collection` values to CONFIRM NSIDs; writes nothing") 194 ap.add_argument("--max-events", type=int, default=None) 195 args = ap.parse_args() 196 asyncio.run(run(probe=args.probe, max_events=args.max_events)) 197 198 199if __name__ == "__main__": 200 main()