Sunstead trust scoring project
1"""M1 ingest: Jetstream firehose -> events (batched) -> derive typed tables.
2
3Single writer (PRD 6.1). Buffer in memory, append in batches, persist the
4`time_us` cursor so a crash resumes gaplessly. The cursor + the durable events
5log ARE the resumability a broker would give (PRD 2).
6"""
7
8from __future__ import annotations
9
10import argparse
11import asyncio
12import json
13from collections import Counter
14
15import websockets
16
17from .config import COLLECTION_KINDS, JETSTREAM_URL, WANTED_COLLECTIONS
18from .db import connection, ensure_schema
19
20STREAM = "jetstream"
21BATCH = 200
22FLUSH_SECONDS = 2.0
23
24
25def _kind(collection: str) -> str | None:
26 # Longest matching needle wins, so "tangled.repo.pull.status" maps to pull_status
27 # rather than being swallowed by the shorter "tangled.repo.pull" -> pull_request.
28 best, blen = None, -1
29 for needle, kind in COLLECTION_KINDS.items():
30 if needle in collection and len(needle) > blen:
31 best, blen = kind, len(needle)
32 return best
33
34
35def _url(con) -> str:
36 row = con.execute("SELECT last_time_us FROM ingest_state WHERE stream=?", [STREAM]).fetchone()
37 cursor = f"&cursor={row[0] - 5_000_000}" if row and row[0] else "" # -5s for gapless replay
38 cols = "".join(f"&wantedCollections={c}" for c in WANTED_COLLECTIONS.split(","))
39 return f"{JETSTREAM_URL}?{cols.lstrip('&')}{cursor}"
40
41
42def flush(con, buf: list[tuple]) -> None:
43 if not buf:
44 return
45 con.executemany(
46 "INSERT INTO events (did, time_us, operation, collection, rkey, record) VALUES (?,?,?,?,?,?)",
47 buf,
48 )
49 last = max(e[1] for e in buf)
50 con.execute(
51 "INSERT INTO ingest_state (stream, last_time_us) VALUES (?, ?) "
52 "ON CONFLICT (stream) DO UPDATE SET last_time_us=excluded.last_time_us",
53 [STREAM, last],
54 )
55 derive(con, buf)
56 buf.clear()
57
58
59def _ts(v):
60 """createdAt -> a value DuckDB's TIMESTAMP accepts, or NULL. Real records sometimes
61 carry createdAt="" (or missing), which crashes the insert; coerce those to None."""
62 v = (v or "").strip() if isinstance(v, str) else v
63 return v or None
64
65
66def derive(con, events: list[tuple]) -> None:
67 """Raw event tuples -> contributors / vouches / pull_requests (PRD 6.1, 6.2).
68 Per-record try/except: one malformed record is skipped, never aborts the batch."""
69 for did, time_us, op, collection, rkey, record_json in events:
70 try:
71 kind = _kind(collection)
72 rec = json.loads(record_json) if record_json else {}
73 con.execute(
74 "INSERT INTO contributors (did, first_seen) VALUES (?, now()) ON CONFLICT (did) DO NOTHING",
75 [did],
76 )
77 if kind in ("vouch", "denounce") and op != "delete":
78 # Real sh.tangled.graph.vouch: subject is the RKEY (at://voucher/.../<subject_did>),
79 # not a record field; vouch-vs-denounce is in rec["kind"]. Confirmed via listRecords.
80 subject = (rec.get("subject") or rec.get("subjectDid")
81 or (rkey if str(rkey).startswith("did:") else None))
82 if not subject:
83 continue
84 polarity = -1 if (kind == "denounce" or rec.get("kind") == "denounce") else 1
85 con.execute(
86 "INSERT INTO vouches (voucher_did, subject_did, polarity, reason, evidence_uri, created_at, weight) "
87 "VALUES (?,?,?,?,?,?,1.0) ON CONFLICT (voucher_did, subject_did) DO UPDATE SET "
88 "polarity=excluded.polarity, reason=excluded.reason",
89 [did, subject, polarity, rec.get("reason"), rec.get("evidence") or rec.get("uri"),
90 _ts(rec.get("createdAt"))],
91 )
92 elif kind == "pull_status" and op != "delete":
93 # sh.tangled.repo.pull.status: authoritative outcome (.merged/.closed/.open),
94 # the label signal absent from the pull record itself. It references the pull by
95 # AT-URI and may be authored by a DIFFERENT did than the pull owner, so the pr_id
96 # comes from the `pull` field (at://<did>/<coll>/<rkey> -> our pr_id), never from
97 # this record's own did/rkey. Side table so it's insertion-order independent.
98 pull_uri = rec.get("pull") or ""
99 pr_id = pull_uri[len("at://"):] if pull_uri.startswith("at://") else None
100 status = rec.get("status")
101 if pr_id and status:
102 con.execute(
103 "INSERT INTO pull_status (pr_id, status, updated_at) VALUES (?,?,now()) "
104 "ON CONFLICT (pr_id) DO UPDATE SET status=excluded.status, updated_at=now()",
105 [pr_id, status],
106 )
107 elif kind == "star" and op != "delete":
108 # sh.tangled.feed.star: subject.did is the starred repo's OWNER; the record
109 # author (did) is the starrer. A star is cheap -> feature only, never a trust
110 # edge. PK (starrer, owner) dedups multi-repo stars; skip self-stars.
111 subj = rec.get("subject") if isinstance(rec.get("subject"), dict) else {}
112 owner = subj.get("did")
113 if owner and owner != did:
114 con.execute(
115 "INSERT INTO stars (starrer_did, owner_did, created_at) VALUES (?,?,?) "
116 "ON CONFLICT (starrer_did, owner_did) DO NOTHING",
117 [did, owner, _ts(rec.get("createdAt"))],
118 )
119 elif kind == "attestation" and op != "delete": # 6.13 jurisdiction attestation
120 con.execute(
121 "INSERT INTO attestations (did, jurisdiction, method, created_at) VALUES (?,?,?,?) "
122 "ON CONFLICT (did, jurisdiction) DO NOTHING",
123 [did, rec.get("jurisdiction"), rec.get("method", "signed_record"), _ts(rec.get("createdAt"))],
124 )
125 elif kind == "pull_request" and op != "delete":
126 pr_id = f"{did}/{collection}/{rkey}"
127 tgt = rec.get("target") if isinstance(rec.get("target"), dict) else {}
128 # Real sh.tangled.repo.pull carries identity + branches + body, but NOT the
129 # label-bearing outcome: merged / ci_status / diff-stats are appview/knot state,
130 # absent from the PDS record (merged->NULL here). The diff is rounds[].patchBlob
131 # (a gzipped blob CID), not inline. ci_status/merged stay NULL until joined from
132 # the appview; see backfill.py header. ponytail: ingest what's in the record,
133 # leave outcome columns NULL rather than fabricating bool(missing)=False.
134 con.execute(
135 "INSERT INTO pull_requests (pr_id, author_did, repo, target, opened_at, ci_status, "
136 "merged, closed_unmerged, additions, deletions, files_touched, diff_text, discussion_len) "
137 "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (pr_id) DO NOTHING",
138 [pr_id, did, tgt.get("repo") or rec.get("repo"), tgt.get("branch") or rec.get("target"),
139 _ts(rec.get("createdAt")), rec.get("ciStatus"), rec.get("merged"), False,
140 rec.get("additions"), rec.get("deletions"), rec.get("filesTouched"),
141 rec.get("diff"), len(json.dumps(rec.get("body", "")))],
142 )
143 except Exception as e: # skip a single malformed record; never abort the batch
144 print(f"[derive] skip {collection} {rkey}: {type(e).__name__}", flush=True)
145
146
147def _flush(buf: list[tuple]) -> None:
148 # Short-lived read-write connection per batch so the API can read between flushes.
149 with connection(read_only=False) as con:
150 flush(con, buf)
151
152
153async def run(probe: bool = False, max_events: int | None = None) -> None:
154 ensure_schema()
155 with connection(read_only=True) as con:
156 url = _url(con)
157 buf: list[tuple] = []
158 seen: Counter[str] = Counter()
159 n = 0
160 async with websockets.connect(url, max_size=None) as ws:
161 loop = asyncio.get_event_loop()
162 last_flush = loop.time()
163 async for raw in ws:
164 evt = json.loads(raw)
165 if evt.get("kind") != "commit":
166 continue
167 c = evt["commit"]
168 collection = c.get("collection", "")
169 seen[collection] += 1
170 if probe:
171 n += 1
172 if n % 50 == 0:
173 print(f"[probe] {n} events; top collections: {seen.most_common(10)}")
174 if max_events and n >= max_events:
175 print(f"[probe] distinct collections seen:\n " +
176 "\n ".join(f"{k}: {v}" for k, v in seen.most_common()))
177 return
178 continue
179 buf.append((evt["did"], evt["time_us"], c.get("operation"), collection,
180 c.get("rkey"), json.dumps(c.get("record"))))
181 n += 1
182 if len(buf) >= BATCH or loop.time() - last_flush > FLUSH_SECONDS:
183 _flush(buf)
184 last_flush = loop.time()
185 if max_events and n >= max_events:
186 break
187 _flush(buf)
188
189
190def main() -> None:
191 ap = argparse.ArgumentParser(description="Jetstream -> DuckDB ingester")
192 ap.add_argument("--probe", action="store_true",
193 help="log live `collection` values to CONFIRM NSIDs; writes nothing")
194 ap.add_argument("--max-events", type=int, default=None)
195 args = ap.parse_args()
196 asyncio.run(run(probe=args.probe, max_events=args.max_events))
197
198
199if __name__ == "__main__":
200 main()