This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1#!/usr/bin/env python3 2"""Scrape sh.tangled.repo.issue (+ state) from every known user PDS.""" 3 4from __future__ import annotations 5 6import json 7import os 8import sys 9import threading 10import time 11from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait 12from dataclasses import dataclass, field 13from pathlib import Path 14from typing import Any 15 16import httpx 17from dotenv import load_dotenv 18 19from db import connect, init_schema, set_crawl_state 20from parallel import concurrency_env 21from pds_client import list_records, pds_host_for_did 22from progress import banner, log, metric, phase, step, summary_block 23 24REPO_ROOT = Path(__file__).resolve().parent.parent 25CRAWL_KEY = "issues:fetch" 26ISSUE_COLLECTION = "sh.tangled.repo.issue" 27STATE_COLLECTION = "sh.tangled.repo.issue.state" 28STATE_OPEN = "sh.tangled.repo.issue.state.open" 29STATE_CLOSED = "sh.tangled.repo.issue.state.closed" 30HTTP_TIMEOUT = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=10.0) 31LOG_EVERY = 10 32HEARTBEAT_SEC = 15 33INFLIGHT_CHUNK = 200 34 35 36class _PdsCache: 37 def __init__(self) -> None: 38 self._hosts: dict[str, str | None] = {} 39 self._lock = threading.Lock() 40 41 def resolve(self, client: httpx.Client, user_did: str, hint: str | None) -> str | None: 42 if hint: 43 return hint.rstrip("/") 44 with self._lock: 45 if user_did in self._hosts: 46 return self._hosts[user_did] 47 try: 48 pds = pds_host_for_did(client, user_did) 49 except httpx.HTTPError: 50 pds = None 51 with self._lock: 52 self._hosts[user_did] = pds.rstrip("/") if pds else None 53 return self._hosts[user_did] 54 55 56@dataclass 57class UserIssueResult: 58 user_did: str 59 handle: str | None 60 status: str # ok | error 61 issues: list[dict[str, Any]] = field(default_factory=list) 62 states: list[dict[str, Any]] = field(default_factory=list) 63 error: str | None = None 64 65 66def _user_limit() -> int | None: 67 raw = os.getenv("TANGLED_ISSUE_USER_LIMIT", "").strip() 68 if not raw: 69 return None 70 return max(1, int(raw)) 71 72 73def _max_pages() -> int: 74 raw = os.getenv("TANGLED_ISSUE_MAX_PAGES", "50").strip() 75 return max(1, int(raw)) 76 77 78def _skip_existing() -> bool: 79 return os.getenv("TANGLED_ISSUE_REFRESH", "").strip().lower() not in ( 80 "1", 81 "true", 82 "yes", 83 ) 84 85 86def _all_users() -> bool: 87 return os.getenv("TANGLED_ISSUE_ALL_USERS", "1").strip().lower() not in ( 88 "0", 89 "false", 90 "no", 91 ) 92 93 94def _users_query(*, skip_existing: bool, user_limit: int | None, all_users: bool) -> str: 95 skip_clause = "" 96 if skip_existing: 97 skip_clause = """ 98 and not exists ( 99 select 1 from tangled_issue_user_sync s where s.user_did = u.did 100 ) 101 """ 102 pds_union = "" 103 if all_users: 104 pds_union = """ 105 union all 106 select did, handle, pds_host from tangled_pds_accounts 107 """ 108 query = f""" 109 select distinct on (u.did) u.did, u.handle, u.pds_host 110 from ( 111 select did, handle, pds_host from tangled_identities 112 union all 113 select owner_did as did, 114 max(owner_handle) as handle, 115 null::text as pds_host 116 from tangled_repos 117 where owner_did is not null 118 group by owner_did 119 {pds_union} 120 ) u 121 where u.did is not null 122 {skip_clause} 123 order by u.did, u.pds_host nulls last, u.handle nulls last 124 """ 125 if user_limit: 126 query += f" limit {user_limit}" 127 return query 128 129 130def _total_users_sql(*, all_users: bool) -> str: 131 pds_union = "" 132 if all_users: 133 pds_union = "union select did from tangled_pds_accounts" 134 return f""" 135 select count(*) as n from ( 136 select did from tangled_identities 137 union 138 select owner_did from tangled_repos where owner_did is not null 139 {pds_union} 140 ) x 141 """ 142 143 144def _rkey_from_uri(uri: str) -> str: 145 return uri.rsplit("/", 1)[-1] 146 147 148def _parse_repo_refs(value: dict[str, Any]) -> tuple[str | None, str | None]: 149 repo = value.get("repo") 150 if isinstance(repo, str): 151 if repo.startswith("did:"): 152 return repo, None 153 if repo.startswith("at://"): 154 return _repo_did_from_at_uri(repo), repo 155 return None, repo if isinstance(repo, str) else None 156 157 158def _repo_did_from_at_uri(uri: str) -> str | None: 159 if not uri.startswith("at://"): 160 return None 161 parts = uri.removeprefix("at://").split("/") 162 return parts[0] if parts and parts[0].startswith("did:") else None 163 164 165def _list_all_records( 166 client: httpx.Client, 167 pds_host: str, 168 user_did: str, 169 collection: str, 170 *, 171 max_pages: int, 172) -> list[dict[str, Any]]: 173 records: list[dict[str, Any]] = [] 174 cursor: str | None = None 175 seen_cursors: set[str] = set() 176 177 for _ in range(max_pages): 178 data = list_records( 179 client, pds_host, user_did, collection, cursor=cursor, limit=100 180 ) 181 page = data.get("records") or [] 182 records.extend(rec for rec in page if isinstance(rec, dict)) 183 next_cursor = data.get("cursor") 184 if not next_cursor or not page: 185 break 186 if not isinstance(next_cursor, str) or next_cursor in seen_cursors: 187 break 188 seen_cursors.add(next_cursor) 189 cursor = next_cursor 190 return records 191 192 193def _state_map(states: list[dict[str, Any]]) -> dict[str, str]: 194 mapping: dict[str, str] = {} 195 for rec in states: 196 value = rec.get("value") 197 if not isinstance(value, dict): 198 continue 199 issue_uri = value.get("issue") 200 state = value.get("state") 201 if not isinstance(state, str): 202 continue 203 if state == STATE_CLOSED: 204 normalized = "closed" 205 elif state == STATE_OPEN: 206 normalized = "open" 207 else: 208 normalized = "open" 209 if isinstance(issue_uri, str) and issue_uri: 210 mapping[issue_uri] = normalized 211 else: 212 rkey = _rkey_from_uri(rec["uri"]) if isinstance(rec.get("uri"), str) else None 213 if rkey: 214 mapping[f"rkey:{rkey}"] = normalized 215 return mapping 216 217 218def _issue_state(uri: str, rkey: str, states: dict[str, str]) -> str: 219 if uri in states: 220 return states[uri] 221 return states.get(f"rkey:{rkey}", "open") 222 223 224def _optional_timestamp(value: Any) -> str | None: 225 if not isinstance(value, str): 226 return None 227 value = value.strip() 228 return value if value else None 229 230 231def upsert_issue( 232 conn, 233 *, 234 record: dict[str, Any], 235 author_did: str, 236 author_handle: str | None, 237 state: str, 238) -> None: 239 uri = record["uri"] 240 value = record["value"] 241 rkey = _rkey_from_uri(uri) 242 repo_did, repo_uri = _parse_repo_refs(value) 243 title = value.get("title") if isinstance(value.get("title"), str) else None 244 body = value.get("body") if isinstance(value.get("body"), str) else None 245 created = _optional_timestamp(value.get("createdAt")) 246 247 conn.execute( 248 """ 249 insert into tangled_issues ( 250 uri, author_did, author_handle, rkey, repo_did, repo_uri, 251 title, body, state, issue_created_at, cid, record_raw, fetched_at 252 ) 253 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::timestamptz, %s, %s::jsonb, now()) 254 on conflict (uri) do update set 255 author_did = excluded.author_did, 256 author_handle = excluded.author_handle, 257 rkey = excluded.rkey, 258 repo_did = coalesce(excluded.repo_did, tangled_issues.repo_did), 259 repo_uri = coalesce(excluded.repo_uri, tangled_issues.repo_uri), 260 title = excluded.title, 261 body = excluded.body, 262 state = excluded.state, 263 issue_created_at = excluded.issue_created_at, 264 cid = excluded.cid, 265 record_raw = excluded.record_raw, 266 fetched_at = now(), 267 embedding = case 268 when tangled_issues.title is distinct from excluded.title 269 or tangled_issues.body is distinct from excluded.body 270 then null else tangled_issues.embedding end, 271 embedding_model = case 272 when tangled_issues.title is distinct from excluded.title 273 or tangled_issues.body is distinct from excluded.body 274 then null else tangled_issues.embedding_model end, 275 embedded_at = case 276 when tangled_issues.title is distinct from excluded.title 277 or tangled_issues.body is distinct from excluded.body 278 then null else tangled_issues.embedded_at end 279 """, 280 ( 281 uri, 282 author_did, 283 author_handle, 284 rkey, 285 repo_did, 286 repo_uri, 287 title, 288 body, 289 state, 290 created, 291 record.get("cid") if isinstance(record.get("cid"), str) else None, 292 json.dumps(value), 293 ), 294 ) 295 296 297def _mark_user_synced( 298 conn, 299 *, 300 user_did: str, 301 issue_count: int, 302 status: str, 303 error_message: str | None = None, 304) -> None: 305 conn.execute( 306 """ 307 insert into tangled_issue_user_sync ( 308 user_did, issue_count, synced_at, status, error_message 309 ) 310 values (%s, %s, now(), %s, %s) 311 on conflict (user_did) do update set 312 issue_count = excluded.issue_count, 313 synced_at = now(), 314 status = excluded.status, 315 error_message = excluded.error_message 316 """, 317 (user_did, issue_count, status, error_message), 318 ) 319 320 321def _fetch_user_issues( 322 user_did: str, 323 handle: str | None, 324 pds_host: str | None, 325 cache: _PdsCache, 326 max_pages: int, 327) -> UserIssueResult: 328 result = UserIssueResult(user_did=user_did, handle=handle, status="error") 329 try: 330 with httpx.Client(timeout=HTTP_TIMEOUT, follow_redirects=True) as client: 331 pds = cache.resolve(client, user_did, pds_host) 332 if not pds: 333 result.error = "could not resolve PDS" 334 return result 335 issues = _list_all_records( 336 client, pds, user_did, ISSUE_COLLECTION, max_pages=max_pages 337 ) 338 states: list[dict[str, Any]] = [] 339 if issues: 340 states = _list_all_records( 341 client, pds, user_did, STATE_COLLECTION, max_pages=max_pages 342 ) 343 result.issues = issues 344 result.states = states 345 result.status = "ok" 346 return result 347 except httpx.TimeoutException: 348 result.error = "PDS timeout" 349 return result 350 except httpx.HTTPError as exc: 351 result.error = str(exc)[:200] 352 return result 353 except Exception as exc: 354 result.error = str(exc)[:200] 355 return result 356 357 358def _heartbeat_loop( 359 *, 360 done: list[int], 361 total: int, 362 inflight: list[int], 363 last_done_at: list[float], 364 stop: threading.Event, 365) -> None: 366 while not stop.wait(HEARTBEAT_SEC): 367 n = done[0] 368 pending = total - n 369 active = inflight[0] 370 idle = time.monotonic() - last_done_at[0] 371 log( 372 "issues", 373 f"… heartbeat {n}/{total} done ({active} in-flight, " 374 f"{pending} pending, last +{idle:.0f}s)", 375 ) 376 377 378def run_fetch_issues(dsn: str) -> dict[str, int]: 379 workers = concurrency_env("TANGLED_ISSUE_CONCURRENCY", default=10) 380 user_limit = _user_limit() 381 skip_existing = _skip_existing() 382 all_users = _all_users() 383 max_pages = _max_pages() 384 385 banner("ISSUES — scrape sh.tangled.repo.issue from user PDSes") 386 log("issues", f"Concurrency: {workers} PDS read timeout: 15s") 387 log("issues", f"Max listRecords pages/user/collection: {max_pages}") 388 log("issues", f"User scope: {'all known DIDs (+ tngl PDS accounts)' if all_users else 'identities + repo owners'}") 389 if user_limit: 390 log("issues", f"User limit: {user_limit}") 391 if skip_existing: 392 log("issues", "Skip existing: on (set TANGLED_ISSUE_REFRESH=1 to re-scan all)") 393 else: 394 log("issues", "Skip existing: off — re-scanning every user (daily sync)") 395 396 with connect(dsn) as conn: 397 users = conn.execute( 398 _users_query(skip_existing=skip_existing, user_limit=user_limit, all_users=all_users) 399 ).fetchall() 400 total_users = conn.execute(_total_users_sql(all_users=all_users)).fetchone()["n"] 401 synced = 0 402 if skip_existing: 403 synced = conn.execute("select count(*) as n from tangled_issue_user_sync").fetchone()["n"] 404 405 if not users: 406 log("issues", "Nothing to fetch — all users already scanned.") 407 return { 408 "users_scanned": 0, 409 "issues_upserted": 0, 410 "open_issues": 0, 411 "already_synced": synced, 412 "errors": 0, 413 } 414 415 already_synced = synced if skip_existing else 0 416 metric("Known users", total_users) 417 if skip_existing: 418 metric("Already synced (skipped)", already_synced) 419 metric("To scan", len(users)) 420 421 stats = { 422 "users_scanned": 0, 423 "issues_upserted": 0, 424 "open_issues": 0, 425 "already_synced": already_synced, 426 "errors": 0, 427 } 428 done_box = [0] 429 inflight_box = [0] 430 last_done_at = [time.monotonic()] 431 done_lock = threading.Lock() 432 pds_cache = _PdsCache() 433 434 phase(1, f"Parallel PDS listRecords ({workers} workers)") 435 log("issues", f"Progress every {LOG_EVERY} users + heartbeat every {HEARTBEAT_SEC}s") 436 437 stop_heartbeat = threading.Event() 438 heartbeat = threading.Thread( 439 target=_heartbeat_loop, 440 kwargs={ 441 "done": done_box, 442 "total": len(users), 443 "inflight": inflight_box, 444 "last_done_at": last_done_at, 445 "stop": stop_heartbeat, 446 }, 447 daemon=True, 448 ) 449 heartbeat.start() 450 451 try: 452 with connect(dsn) as conn: 453 set_crawl_state( 454 conn, 455 key=CRAWL_KEY, 456 status="running", 457 meta={"user_count": len(users), "workers": workers}, 458 ) 459 conn.commit() 460 461 user_iter = iter(users) 462 pending_futures: dict[Any, dict[str, Any]] = {} 463 464 def submit_more(pool: ThreadPoolExecutor) -> None: 465 while len(pending_futures) < INFLIGHT_CHUNK: 466 try: 467 row = next(user_iter) 468 except StopIteration: 469 break 470 fut = pool.submit( 471 _fetch_user_issues, 472 row["did"], 473 row.get("handle"), 474 row.get("pds_host"), 475 pds_cache, 476 max_pages, 477 ) 478 pending_futures[fut] = row 479 inflight_box[0] = len(pending_futures) 480 481 with ThreadPoolExecutor(max_workers=workers) as pool: 482 submit_more(pool) 483 484 while pending_futures: 485 done_set, _ = wait(pending_futures, timeout=HEARTBEAT_SEC, return_when=FIRST_COMPLETED) 486 if not done_set: 487 continue 488 489 for future in done_set: 490 row = pending_futures.pop(future) 491 label = row.get("handle") or row["did"][:20] 492 493 try: 494 result = future.result() 495 except Exception as exc: 496 result = UserIssueResult( 497 user_did=row["did"], 498 handle=row.get("handle"), 499 status="error", 500 error=str(exc)[:200], 501 ) 502 503 with done_lock: 504 done_box[0] += 1 505 n = done_box[0] 506 last_done_at[0] = time.monotonic() 507 508 if result.status == "ok": 509 states = _state_map(result.states) 510 upserted = 0 511 open_n = 0 512 for rec in result.issues: 513 if not isinstance(rec.get("uri"), str) or not isinstance( 514 rec.get("value"), dict 515 ): 516 continue 517 rkey = _rkey_from_uri(rec["uri"]) 518 state = _issue_state(rec["uri"], rkey, states) 519 upsert_issue( 520 conn, 521 record=rec, 522 author_did=result.user_did, 523 author_handle=result.handle, 524 state=state, 525 ) 526 upserted += 1 527 if state == "open": 528 open_n += 1 529 530 _mark_user_synced( 531 conn, 532 user_did=result.user_did, 533 issue_count=upserted, 534 status="ok", 535 ) 536 stats["users_scanned"] += 1 537 stats["issues_upserted"] += upserted 538 stats["open_issues"] += open_n 539 msg = f"OK {label} {upserted} issue(s) ({open_n} open)" 540 else: 541 _mark_user_synced( 542 conn, 543 user_did=result.user_did, 544 issue_count=0, 545 status="error", 546 error_message=result.error, 547 ) 548 stats["errors"] += 1 549 msg = f"ERROR {label} {result.error or 'unknown'}" 550 551 if n <= 10 or n % LOG_EVERY == 0 or result.issues: 552 step("issues", n, len(users), msg) 553 554 if n % 25 == 0: 555 conn.commit() 556 557 submit_more(pool) 558 inflight_box[0] = len(pending_futures) 559 560 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 561 conn.commit() 562 finally: 563 stop_heartbeat.set() 564 heartbeat.join(timeout=1) 565 566 summary_block( 567 "Issues fetch complete", 568 [ 569 f"Users scanned: {stats['users_scanned']}", 570 f"Issues upserted: {stats['issues_upserted']}", 571 f"Open (this run): {stats['open_issues']}", 572 f"Already synced: {stats['already_synced']}", 573 f"Errors: {stats['errors']}", 574 "", 575 "Query open issues:", 576 " select count(*) from tangled_open_issues;", 577 ], 578 ) 579 return stats 580 581 582def main() -> None: 583 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"): 584 if candidate.exists(): 585 load_dotenv(candidate) 586 break 587 else: 588 load_dotenv() 589 590 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 591 if not dsn: 592 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr) 593 raise SystemExit(1) 594 595 init_schema(dsn) 596 run_fetch_issues(dsn) 597 598 599if __name__ == "__main__": 600 try: 601 main() 602 except KeyboardInterrupt: 603 print("\nInterrupted.", file=sys.stderr) 604 raise SystemExit(130) from None