This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 16 kB View raw
1from __future__ import annotations 2 3import json 4import os 5from typing import Any 6from urllib.parse import urlparse 7 8import httpx 9 10from db import connect, set_crawl_state 11from pds_client import ( 12 DEFAULT_PDS, 13 describe_pds, 14 describe_repo_on_knot, 15 handle_from_plc, 16 list_repo_records, 17 sync_list_repos, 18) 19from progress import banner, log, metric, phase, step, summary_block 20 21CRAWL_KEY_ACCOUNTS = "stage2:accounts" 22CRAWL_KEY_REPOS = "stage2:repos" 23COLLECTION = "sh.tangled.repo" 24 25 26def _pds_host() -> str: 27 return os.getenv("TANGLED_PDS_URL", DEFAULT_PDS).strip() 28 29 30def _account_limit() -> int | None: 31 raw = os.getenv("TANGLED_STAGE2_ACCOUNT_LIMIT", "").strip() 32 if not raw: 33 return None 34 return max(1, int(raw)) 35 36 37def _resolve_handles() -> bool: 38 return os.getenv("TANGLED_RESOLVE_HANDLES", "0").strip() in {"1", "true", "yes"} 39 40 41def _enrich_knots() -> bool: 42 return os.getenv("TANGLED_STAGE2_ENRICH_KNOTS", "1").strip() not in {"0", "false", "no"} 43 44 45def _rkey_from_uri(uri: str) -> str: 46 return uri.rsplit("/", 1)[-1] 47 48 49def _repo_name(value: dict[str, Any], rkey: str) -> str | None: 50 name = value.get("name") 51 if isinstance(name, str) and name: 52 return name 53 if rkey and not rkey.startswith("3l"): 54 return rkey 55 return None 56 57 58def update_account_scan( 59 conn, 60 *, 61 did: str, 62 handle: str | None, 63 repo_record_count: int, 64) -> None: 65 conn.execute( 66 """ 67 update tangled_pds_accounts 68 set 69 handle = coalesce(%s, handle), 70 repo_record_count = %s, 71 last_synced_at = now() 72 where did = %s 73 """, 74 (handle, repo_record_count, did), 75 ) 76 77 78def upsert_accounts_batch( 79 conn, 80 *, 81 pds_host: str, 82 entries: list[dict[str, Any]], 83) -> None: 84 if not entries: 85 return 86 conn.cursor().executemany( 87 """ 88 insert into tangled_pds_accounts ( 89 did, pds_host, head, rev, active, handle, list_repos_raw, 90 repo_record_count, last_synced_at 91 ) 92 values (%s, %s, %s, %s, %s, null, %s::jsonb, 0, now()) 93 on conflict (did) do update set 94 pds_host = excluded.pds_host, 95 head = excluded.head, 96 rev = excluded.rev, 97 active = excluded.active, 98 list_repos_raw = excluded.list_repos_raw, 99 last_synced_at = now() 100 """, 101 [ 102 ( 103 entry["did"], 104 pds_host, 105 entry.get("head"), 106 entry.get("rev"), 107 entry.get("active"), 108 json.dumps(entry), 109 ) 110 for entry in entries 111 if isinstance(entry.get("did"), str) 112 ], 113 ) 114 115 116def upsert_repo_record( 117 conn, 118 *, 119 uri: str, 120 owner_did: str, 121 rkey: str, 122 repo_did: str | None, 123 name: str | None, 124 knot_hostname: str | None, 125 cid: str | None, 126 record_raw: dict[str, Any], 127 describe_raw: dict[str, Any] | None = None, 128) -> None: 129 conn.execute( 130 """ 131 insert into tangled_repos ( 132 uri, owner_did, rkey, repo_did, name, knot_hostname, cid, 133 record_raw, describe_raw, last_synced_at 134 ) 135 values (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, now()) 136 on conflict (uri) do update set 137 repo_did = excluded.repo_did, 138 name = excluded.name, 139 knot_hostname = excluded.knot_hostname, 140 cid = excluded.cid, 141 record_raw = excluded.record_raw, 142 describe_raw = coalesce(excluded.describe_raw, tangled_repos.describe_raw), 143 last_synced_at = now() 144 """, 145 ( 146 uri, 147 owner_did, 148 rkey, 149 repo_did, 150 name, 151 knot_hostname, 152 cid, 153 json.dumps(record_raw), 154 json.dumps(describe_raw) if describe_raw else None, 155 ), 156 ) 157 158 159def phase1_enumerate_accounts(dsn: str, pds_host: str, client: httpx.Client) -> list[str]: 160 phase(1, "Enumerate accounts on Tangled PDS") 161 log("stage 2", f"PDS host: {pds_host}") 162 log("stage 2", "Calling com.atproto.server.describeServer ...") 163 164 try: 165 info = describe_pds(client, pds_host) 166 domains = info.get("availableUserDomains") or [] 167 metric("PDS DID", info.get("did", "?")) 168 metric("User domains", ", ".join(domains) if domains else "(none listed)") 169 except httpx.HTTPError as exc: 170 log("stage 2", f"WARNING: describeServer failed ({exc}) — continuing anyway") 171 172 account_limit = _account_limit() 173 if account_limit: 174 log("stage 2", f"Account limit active: {account_limit} (unset TANGLED_STAGE2_ACCOUNT_LIMIT for full crawl)") 175 176 log("stage 2", "Paging com.atproto.sync.listRepos ...") 177 178 all_dids: list[str] = [] 179 cursor: str | None = None 180 page = 0 181 182 with connect(dsn) as conn: 183 set_crawl_state(conn, key=CRAWL_KEY_ACCOUNTS, status="running") 184 conn.commit() 185 186 while True: 187 page += 1 188 data = sync_list_repos(client, pds_host, cursor=cursor) 189 batch = data.get("repos") or [] 190 cursor = data.get("cursor") 191 192 page_entries: list[dict[str, Any]] = [] 193 for entry in batch: 194 did = entry.get("did") 195 if not isinstance(did, str): 196 continue 197 page_entries.append(entry) 198 all_dids.append(did) 199 if account_limit and len(all_dids) >= account_limit: 200 break 201 202 upsert_accounts_batch(conn, pds_host=pds_host, entries=page_entries) 203 204 conn.commit() 205 log( 206 "stage 2", 207 f" page {page}: +{len(page_entries)} accounts (running total: {len(all_dids)})", 208 ) 209 210 if account_limit and len(all_dids) >= account_limit: 211 log("stage 2", f" stopped at account limit ({account_limit})") 212 break 213 if not cursor or not batch: 214 break 215 216 set_crawl_state( 217 conn, 218 key=CRAWL_KEY_ACCOUNTS, 219 status="complete", 220 meta={"pds_host": pds_host, "account_count": len(all_dids), "pages": page}, 221 ) 222 conn.commit() 223 224 metric("Total accounts on PDS", len(all_dids)) 225 return all_dids 226 227 228def phase2_scan_repo_records( 229 dsn: str, 230 pds_host: str, 231 client: httpx.Client, 232 account_dids: list[str], 233) -> dict[str, int]: 234 phase(2, "Scan sh.tangled.repo records per account") 235 log("stage 2", f"Checking {len(account_dids)} accounts for repo records ...") 236 237 stats = {"accounts_with_repos": 0, "accounts_without_repos": 0, "repo_records": 0, "errors": 0} 238 resolve_handles = _resolve_handles() 239 if resolve_handles: 240 log("stage 2", "Handle resolution enabled (PLC lookup per account — slower)") 241 242 with connect(dsn) as conn: 243 set_crawl_state(conn, key=CRAWL_KEY_REPOS, status="running") 244 conn.commit() 245 246 for i, did in enumerate(account_dids, start=1): 247 handle: str | None = None 248 if resolve_handles: 249 handle = handle_from_plc(client, did) 250 251 try: 252 cursor: str | None = None 253 repo_count = 0 254 while True: 255 data = list_repo_records(client, pds_host, did, cursor=cursor) 256 records = data.get("records") or [] 257 cursor = data.get("cursor") 258 259 for rec in records: 260 uri = rec.get("uri") 261 value = rec.get("value") 262 if not isinstance(uri, str) or not isinstance(value, dict): 263 continue 264 265 rkey = _rkey_from_uri(uri) 266 repo_did = value.get("repoDid") 267 if isinstance(repo_did, str): 268 repo_did_val: str | None = repo_did 269 else: 270 repo_did_val = None 271 272 knot = value.get("knot") 273 knot_hostname = knot if isinstance(knot, str) else None 274 275 upsert_repo_record( 276 conn, 277 uri=uri, 278 owner_did=did, 279 rkey=rkey, 280 repo_did=repo_did_val, 281 name=_repo_name(value, rkey), 282 knot_hostname=knot_hostname, 283 cid=rec.get("cid") if isinstance(rec.get("cid"), str) else None, 284 record_raw=value, 285 ) 286 conn.execute( 287 "update tangled_repos set discovered_via = 'tngl_pds' where uri = %s", 288 (uri,), 289 ) 290 repo_count += 1 291 stats["repo_records"] += 1 292 293 if not cursor or not records: 294 break 295 296 update_account_scan( 297 conn, 298 did=did, 299 handle=handle, 300 repo_record_count=repo_count, 301 ) 302 303 if repo_count: 304 stats["accounts_with_repos"] += 1 305 label = handle or did 306 step("stage 2", i, len(account_dids), f"{label}{repo_count} repo(s)") 307 else: 308 stats["accounts_without_repos"] += 1 309 if i % 100 == 0 or i == len(account_dids): 310 step( 311 "stage 2", 312 i, 313 len(account_dids), 314 f"{stats['accounts_with_repos']} accounts with repos so far", 315 ) 316 317 except httpx.HTTPError as exc: 318 stats["errors"] += 1 319 step("stage 2", i, len(account_dids), f"ERROR {did}: {exc}") 320 321 if i % 50 == 0: 322 conn.commit() 323 324 set_crawl_state( 325 conn, 326 key=CRAWL_KEY_REPOS, 327 status="complete", 328 meta=stats, 329 ) 330 conn.commit() 331 332 return stats 333 334 335def phase3_enrich_from_knots(dsn: str, client: httpx.Client) -> dict[str, int]: 336 phase(3, "Enrich repos from knot describeRepo (optional)") 337 stats = {"enriched": 0, "skipped": 0, "errors": 0} 338 339 if not _enrich_knots(): 340 log("stage 2", "Skipped (TANGLED_STAGE2_ENRICH_KNOTS=0)") 341 return stats 342 343 with connect(dsn) as conn: 344 knots = conn.execute( 345 "select hostname from tangled_knots where reachable = true order by hostname" 346 ).fetchall() 347 repos = conn.execute( 348 """ 349 select uri, repo_did, knot_hostname 350 from tangled_repos 351 where repo_did is not null and knot_hostname is not null 352 order by uri 353 """ 354 ).fetchall() 355 356 reachable = {row["hostname"] for row in knots} 357 log("stage 2", f"Enriching {len(repos)} repos via {len(reachable)} reachable knot(s) ...") 358 359 with connect(dsn) as conn: 360 for i, row in enumerate(repos, start=1): 361 knot = row["knot_hostname"] 362 repo_did = row["repo_did"] 363 if knot not in reachable: 364 stats["skipped"] += 1 365 continue 366 367 try: 368 describe = describe_repo_on_knot(client, knot, repo_did) 369 if describe: 370 conn.execute( 371 """ 372 update tangled_repos 373 set describe_raw = %s::jsonb, last_synced_at = now() 374 where uri = %s 375 """, 376 (json.dumps(describe), row["uri"]), 377 ) 378 stats["enriched"] += 1 379 if i <= 10 or i % 25 == 0: 380 step("stage 2", i, len(repos), f"describeRepo OK {repo_did}") 381 else: 382 stats["skipped"] += 1 383 except httpx.HTTPError as exc: 384 stats["errors"] += 1 385 step("stage 2", i, len(repos), f"describeRepo FAIL {repo_did}: {exc}") 386 387 if i % 50 == 0: 388 conn.commit() 389 conn.commit() 390 391 metric("Knot enrichments", stats["enriched"]) 392 return stats 393 394 395def run_stage2_accounts_only(dsn: str) -> dict[str, Any]: 396 banner("STAGE 2a — Count accounts on Tangled PDS") 397 pds_host = _pds_host() 398 with httpx.Client(timeout=30.0, follow_redirects=True) as client: 399 dids = phase1_enumerate_accounts(dsn, pds_host, client) 400 summary_block( 401 "Stage 2a complete", 402 [ 403 f"PDS: {pds_host}", 404 f"Accounts: {len(dids)}", 405 f"Next step: python scraper/scrape.py stage2-repos", 406 ], 407 ) 408 return {"account_count": len(dids)} 409 410 411def run_stage2_repos_only(dsn: str) -> dict[str, Any]: 412 banner("STAGE 2b — Scan repo records (accounts must exist in DB)") 413 pds_host = _pds_host() 414 415 with connect(dsn) as conn: 416 rows = conn.execute( 417 "select did from tangled_pds_accounts order by did" 418 ).fetchall() 419 if not rows: 420 raise RuntimeError( 421 "No accounts in tangled_pds_accounts. Run stage2-accounts first:\n" 422 " python scraper/scrape.py stage2-accounts" 423 ) 424 425 account_dids = [row["did"] for row in rows] 426 log("stage 2", f"Loaded {len(account_dids)} accounts from DB") 427 428 with httpx.Client(timeout=30.0, follow_redirects=True) as client: 429 repo_stats = phase2_scan_repo_records(dsn, pds_host, client, account_dids) 430 knot_stats = phase3_enrich_from_knots(dsn, client) 431 432 summary_block( 433 "Stage 2b complete", 434 [ 435 f"Accounts scanned: {len(account_dids)}", 436 f"Accounts with repos: {repo_stats['accounts_with_repos']}", 437 f"Repo records stored: {repo_stats['repo_records']}", 438 f"Knot enrichments: {knot_stats['enriched']}", 439 f"Errors: {repo_stats['errors'] + knot_stats['errors']}", 440 ], 441 ) 442 return {**repo_stats, **knot_stats} 443 444 445def run_stage2(dsn: str) -> dict[str, Any]: 446 banner("STAGE 2 — Discover repos via Tangled PDS (tngl.sh)") 447 log("stage 2", "Step-by-step: accounts → repo records → knot enrichment") 448 log("stage 2", "Note: sh.tangled.sync.listRepos on knots returns 404 — we use PDS instead.") 449 450 pds_host = _pds_host() 451 host_label = urlparse(pds_host).netloc or pds_host 452 453 with httpx.Client(timeout=30.0, follow_redirects=True) as client: 454 account_dids = phase1_enumerate_accounts(dsn, pds_host, client) 455 repo_stats = phase2_scan_repo_records(dsn, pds_host, client, account_dids) 456 knot_stats = phase3_enrich_from_knots(dsn, client) 457 458 summary_block( 459 "Stage 2 complete", 460 [ 461 f"PDS ({host_label}): {len(account_dids)} accounts", 462 f"Accounts with repos: {repo_stats['accounts_with_repos']}", 463 f"Empty accounts: {repo_stats['accounts_without_repos']}", 464 f"Repo records stored: {repo_stats['repo_records']}", 465 f"Knot enrichments: {knot_stats['enriched']}", 466 f"Errors: {repo_stats['errors'] + knot_stats['errors']}", 467 ], 468 ) 469 return { 470 "account_count": len(account_dids), 471 **repo_stats, 472 **knot_stats, 473 }