This repository has no description
1from __future__ import annotations
2
3import json
4import os
5from typing import Any
6from urllib.parse import urlparse
7
8import httpx
9
10from db import connect, set_crawl_state
11from pds_client import (
12 DEFAULT_PDS,
13 describe_pds,
14 describe_repo_on_knot,
15 handle_from_plc,
16 list_repo_records,
17 sync_list_repos,
18)
19from progress import banner, log, metric, phase, step, summary_block
20
21CRAWL_KEY_ACCOUNTS = "stage2:accounts"
22CRAWL_KEY_REPOS = "stage2:repos"
23COLLECTION = "sh.tangled.repo"
24
25
26def _pds_host() -> str:
27 return os.getenv("TANGLED_PDS_URL", DEFAULT_PDS).strip()
28
29
30def _account_limit() -> int | None:
31 raw = os.getenv("TANGLED_STAGE2_ACCOUNT_LIMIT", "").strip()
32 if not raw:
33 return None
34 return max(1, int(raw))
35
36
37def _resolve_handles() -> bool:
38 return os.getenv("TANGLED_RESOLVE_HANDLES", "0").strip() in {"1", "true", "yes"}
39
40
41def _enrich_knots() -> bool:
42 return os.getenv("TANGLED_STAGE2_ENRICH_KNOTS", "1").strip() not in {"0", "false", "no"}
43
44
45def _rkey_from_uri(uri: str) -> str:
46 return uri.rsplit("/", 1)[-1]
47
48
49def _repo_name(value: dict[str, Any], rkey: str) -> str | None:
50 name = value.get("name")
51 if isinstance(name, str) and name:
52 return name
53 if rkey and not rkey.startswith("3l"):
54 return rkey
55 return None
56
57
58def update_account_scan(
59 conn,
60 *,
61 did: str,
62 handle: str | None,
63 repo_record_count: int,
64) -> None:
65 conn.execute(
66 """
67 update tangled_pds_accounts
68 set
69 handle = coalesce(%s, handle),
70 repo_record_count = %s,
71 last_synced_at = now()
72 where did = %s
73 """,
74 (handle, repo_record_count, did),
75 )
76
77
78def upsert_accounts_batch(
79 conn,
80 *,
81 pds_host: str,
82 entries: list[dict[str, Any]],
83) -> None:
84 if not entries:
85 return
86 conn.cursor().executemany(
87 """
88 insert into tangled_pds_accounts (
89 did, pds_host, head, rev, active, handle, list_repos_raw,
90 repo_record_count, last_synced_at
91 )
92 values (%s, %s, %s, %s, %s, null, %s::jsonb, 0, now())
93 on conflict (did) do update set
94 pds_host = excluded.pds_host,
95 head = excluded.head,
96 rev = excluded.rev,
97 active = excluded.active,
98 list_repos_raw = excluded.list_repos_raw,
99 last_synced_at = now()
100 """,
101 [
102 (
103 entry["did"],
104 pds_host,
105 entry.get("head"),
106 entry.get("rev"),
107 entry.get("active"),
108 json.dumps(entry),
109 )
110 for entry in entries
111 if isinstance(entry.get("did"), str)
112 ],
113 )
114
115
116def upsert_repo_record(
117 conn,
118 *,
119 uri: str,
120 owner_did: str,
121 rkey: str,
122 repo_did: str | None,
123 name: str | None,
124 knot_hostname: str | None,
125 cid: str | None,
126 record_raw: dict[str, Any],
127 describe_raw: dict[str, Any] | None = None,
128) -> None:
129 conn.execute(
130 """
131 insert into tangled_repos (
132 uri, owner_did, rkey, repo_did, name, knot_hostname, cid,
133 record_raw, describe_raw, last_synced_at
134 )
135 values (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, now())
136 on conflict (uri) do update set
137 repo_did = excluded.repo_did,
138 name = excluded.name,
139 knot_hostname = excluded.knot_hostname,
140 cid = excluded.cid,
141 record_raw = excluded.record_raw,
142 describe_raw = coalesce(excluded.describe_raw, tangled_repos.describe_raw),
143 last_synced_at = now()
144 """,
145 (
146 uri,
147 owner_did,
148 rkey,
149 repo_did,
150 name,
151 knot_hostname,
152 cid,
153 json.dumps(record_raw),
154 json.dumps(describe_raw) if describe_raw else None,
155 ),
156 )
157
158
159def phase1_enumerate_accounts(dsn: str, pds_host: str, client: httpx.Client) -> list[str]:
160 phase(1, "Enumerate accounts on Tangled PDS")
161 log("stage 2", f"PDS host: {pds_host}")
162 log("stage 2", "Calling com.atproto.server.describeServer ...")
163
164 try:
165 info = describe_pds(client, pds_host)
166 domains = info.get("availableUserDomains") or []
167 metric("PDS DID", info.get("did", "?"))
168 metric("User domains", ", ".join(domains) if domains else "(none listed)")
169 except httpx.HTTPError as exc:
170 log("stage 2", f"WARNING: describeServer failed ({exc}) — continuing anyway")
171
172 account_limit = _account_limit()
173 if account_limit:
174 log("stage 2", f"Account limit active: {account_limit} (unset TANGLED_STAGE2_ACCOUNT_LIMIT for full crawl)")
175
176 log("stage 2", "Paging com.atproto.sync.listRepos ...")
177
178 all_dids: list[str] = []
179 cursor: str | None = None
180 page = 0
181
182 with connect(dsn) as conn:
183 set_crawl_state(conn, key=CRAWL_KEY_ACCOUNTS, status="running")
184 conn.commit()
185
186 while True:
187 page += 1
188 data = sync_list_repos(client, pds_host, cursor=cursor)
189 batch = data.get("repos") or []
190 cursor = data.get("cursor")
191
192 page_entries: list[dict[str, Any]] = []
193 for entry in batch:
194 did = entry.get("did")
195 if not isinstance(did, str):
196 continue
197 page_entries.append(entry)
198 all_dids.append(did)
199 if account_limit and len(all_dids) >= account_limit:
200 break
201
202 upsert_accounts_batch(conn, pds_host=pds_host, entries=page_entries)
203
204 conn.commit()
205 log(
206 "stage 2",
207 f" page {page}: +{len(page_entries)} accounts (running total: {len(all_dids)})",
208 )
209
210 if account_limit and len(all_dids) >= account_limit:
211 log("stage 2", f" stopped at account limit ({account_limit})")
212 break
213 if not cursor or not batch:
214 break
215
216 set_crawl_state(
217 conn,
218 key=CRAWL_KEY_ACCOUNTS,
219 status="complete",
220 meta={"pds_host": pds_host, "account_count": len(all_dids), "pages": page},
221 )
222 conn.commit()
223
224 metric("Total accounts on PDS", len(all_dids))
225 return all_dids
226
227
228def phase2_scan_repo_records(
229 dsn: str,
230 pds_host: str,
231 client: httpx.Client,
232 account_dids: list[str],
233) -> dict[str, int]:
234 phase(2, "Scan sh.tangled.repo records per account")
235 log("stage 2", f"Checking {len(account_dids)} accounts for repo records ...")
236
237 stats = {"accounts_with_repos": 0, "accounts_without_repos": 0, "repo_records": 0, "errors": 0}
238 resolve_handles = _resolve_handles()
239 if resolve_handles:
240 log("stage 2", "Handle resolution enabled (PLC lookup per account — slower)")
241
242 with connect(dsn) as conn:
243 set_crawl_state(conn, key=CRAWL_KEY_REPOS, status="running")
244 conn.commit()
245
246 for i, did in enumerate(account_dids, start=1):
247 handle: str | None = None
248 if resolve_handles:
249 handle = handle_from_plc(client, did)
250
251 try:
252 cursor: str | None = None
253 repo_count = 0
254 while True:
255 data = list_repo_records(client, pds_host, did, cursor=cursor)
256 records = data.get("records") or []
257 cursor = data.get("cursor")
258
259 for rec in records:
260 uri = rec.get("uri")
261 value = rec.get("value")
262 if not isinstance(uri, str) or not isinstance(value, dict):
263 continue
264
265 rkey = _rkey_from_uri(uri)
266 repo_did = value.get("repoDid")
267 if isinstance(repo_did, str):
268 repo_did_val: str | None = repo_did
269 else:
270 repo_did_val = None
271
272 knot = value.get("knot")
273 knot_hostname = knot if isinstance(knot, str) else None
274
275 upsert_repo_record(
276 conn,
277 uri=uri,
278 owner_did=did,
279 rkey=rkey,
280 repo_did=repo_did_val,
281 name=_repo_name(value, rkey),
282 knot_hostname=knot_hostname,
283 cid=rec.get("cid") if isinstance(rec.get("cid"), str) else None,
284 record_raw=value,
285 )
286 conn.execute(
287 "update tangled_repos set discovered_via = 'tngl_pds' where uri = %s",
288 (uri,),
289 )
290 repo_count += 1
291 stats["repo_records"] += 1
292
293 if not cursor or not records:
294 break
295
296 update_account_scan(
297 conn,
298 did=did,
299 handle=handle,
300 repo_record_count=repo_count,
301 )
302
303 if repo_count:
304 stats["accounts_with_repos"] += 1
305 label = handle or did
306 step("stage 2", i, len(account_dids), f"{label} → {repo_count} repo(s)")
307 else:
308 stats["accounts_without_repos"] += 1
309 if i % 100 == 0 or i == len(account_dids):
310 step(
311 "stage 2",
312 i,
313 len(account_dids),
314 f"… {stats['accounts_with_repos']} accounts with repos so far",
315 )
316
317 except httpx.HTTPError as exc:
318 stats["errors"] += 1
319 step("stage 2", i, len(account_dids), f"ERROR {did}: {exc}")
320
321 if i % 50 == 0:
322 conn.commit()
323
324 set_crawl_state(
325 conn,
326 key=CRAWL_KEY_REPOS,
327 status="complete",
328 meta=stats,
329 )
330 conn.commit()
331
332 return stats
333
334
335def phase3_enrich_from_knots(dsn: str, client: httpx.Client) -> dict[str, int]:
336 phase(3, "Enrich repos from knot describeRepo (optional)")
337 stats = {"enriched": 0, "skipped": 0, "errors": 0}
338
339 if not _enrich_knots():
340 log("stage 2", "Skipped (TANGLED_STAGE2_ENRICH_KNOTS=0)")
341 return stats
342
343 with connect(dsn) as conn:
344 knots = conn.execute(
345 "select hostname from tangled_knots where reachable = true order by hostname"
346 ).fetchall()
347 repos = conn.execute(
348 """
349 select uri, repo_did, knot_hostname
350 from tangled_repos
351 where repo_did is not null and knot_hostname is not null
352 order by uri
353 """
354 ).fetchall()
355
356 reachable = {row["hostname"] for row in knots}
357 log("stage 2", f"Enriching {len(repos)} repos via {len(reachable)} reachable knot(s) ...")
358
359 with connect(dsn) as conn:
360 for i, row in enumerate(repos, start=1):
361 knot = row["knot_hostname"]
362 repo_did = row["repo_did"]
363 if knot not in reachable:
364 stats["skipped"] += 1
365 continue
366
367 try:
368 describe = describe_repo_on_knot(client, knot, repo_did)
369 if describe:
370 conn.execute(
371 """
372 update tangled_repos
373 set describe_raw = %s::jsonb, last_synced_at = now()
374 where uri = %s
375 """,
376 (json.dumps(describe), row["uri"]),
377 )
378 stats["enriched"] += 1
379 if i <= 10 or i % 25 == 0:
380 step("stage 2", i, len(repos), f"describeRepo OK {repo_did}")
381 else:
382 stats["skipped"] += 1
383 except httpx.HTTPError as exc:
384 stats["errors"] += 1
385 step("stage 2", i, len(repos), f"describeRepo FAIL {repo_did}: {exc}")
386
387 if i % 50 == 0:
388 conn.commit()
389 conn.commit()
390
391 metric("Knot enrichments", stats["enriched"])
392 return stats
393
394
395def run_stage2_accounts_only(dsn: str) -> dict[str, Any]:
396 banner("STAGE 2a — Count accounts on Tangled PDS")
397 pds_host = _pds_host()
398 with httpx.Client(timeout=30.0, follow_redirects=True) as client:
399 dids = phase1_enumerate_accounts(dsn, pds_host, client)
400 summary_block(
401 "Stage 2a complete",
402 [
403 f"PDS: {pds_host}",
404 f"Accounts: {len(dids)}",
405 f"Next step: python scraper/scrape.py stage2-repos",
406 ],
407 )
408 return {"account_count": len(dids)}
409
410
411def run_stage2_repos_only(dsn: str) -> dict[str, Any]:
412 banner("STAGE 2b — Scan repo records (accounts must exist in DB)")
413 pds_host = _pds_host()
414
415 with connect(dsn) as conn:
416 rows = conn.execute(
417 "select did from tangled_pds_accounts order by did"
418 ).fetchall()
419 if not rows:
420 raise RuntimeError(
421 "No accounts in tangled_pds_accounts. Run stage2-accounts first:\n"
422 " python scraper/scrape.py stage2-accounts"
423 )
424
425 account_dids = [row["did"] for row in rows]
426 log("stage 2", f"Loaded {len(account_dids)} accounts from DB")
427
428 with httpx.Client(timeout=30.0, follow_redirects=True) as client:
429 repo_stats = phase2_scan_repo_records(dsn, pds_host, client, account_dids)
430 knot_stats = phase3_enrich_from_knots(dsn, client)
431
432 summary_block(
433 "Stage 2b complete",
434 [
435 f"Accounts scanned: {len(account_dids)}",
436 f"Accounts with repos: {repo_stats['accounts_with_repos']}",
437 f"Repo records stored: {repo_stats['repo_records']}",
438 f"Knot enrichments: {knot_stats['enriched']}",
439 f"Errors: {repo_stats['errors'] + knot_stats['errors']}",
440 ],
441 )
442 return {**repo_stats, **knot_stats}
443
444
445def run_stage2(dsn: str) -> dict[str, Any]:
446 banner("STAGE 2 — Discover repos via Tangled PDS (tngl.sh)")
447 log("stage 2", "Step-by-step: accounts → repo records → knot enrichment")
448 log("stage 2", "Note: sh.tangled.sync.listRepos on knots returns 404 — we use PDS instead.")
449
450 pds_host = _pds_host()
451 host_label = urlparse(pds_host).netloc or pds_host
452
453 with httpx.Client(timeout=30.0, follow_redirects=True) as client:
454 account_dids = phase1_enumerate_accounts(dsn, pds_host, client)
455 repo_stats = phase2_scan_repo_records(dsn, pds_host, client, account_dids)
456 knot_stats = phase3_enrich_from_knots(dsn, client)
457
458 summary_block(
459 "Stage 2 complete",
460 [
461 f"PDS ({host_label}): {len(account_dids)} accounts",
462 f"Accounts with repos: {repo_stats['accounts_with_repos']}",
463 f"Empty accounts: {repo_stats['accounts_without_repos']}",
464 f"Repo records stored: {repo_stats['repo_records']}",
465 f"Knot enrichments: {knot_stats['enriched']}",
466 f"Errors: {repo_stats['errors'] + knot_stats['errors']}",
467 ],
468 )
469 return {
470 "account_count": len(account_dids),
471 **repo_stats,
472 **knot_stats,
473 }