This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 18 kB View raw
1from __future__ import annotations 2 3import json 4import os 5import threading 6from concurrent.futures import ThreadPoolExecutor, as_completed 7from dataclasses import dataclass 8from typing import Any 9 10import httpx 11 12from appview_client import fetch_search_page 13from db import connect, set_crawl_state, upsert_atproto_record 14from parallel import concurrency_env 15from pds_client import DEFAULT_PDS, list_records, pds_host_for_did 16from progress import banner, log, metric, phase, step, summary_block 17 18CRAWL_KEY = "stage2:network" 19COLLECTION = "sh.tangled.repo" 20RESOLVE_PDS = ("https://bsky.social", "https://tngl.sh") 21 22 23def _page_limit() -> int: 24 return max(1, min(100, int(os.getenv("TANGLED_NETWORK_PAGE_SIZE", "100")))) 25 26 27def _repo_limit() -> int | None: 28 raw = os.getenv("TANGLED_STAGE2_NETWORK_LIMIT", "").strip() 29 if not raw: 30 return None 31 return max(1, int(raw)) 32 33 34def _skip_existing() -> bool: 35 return os.getenv("TANGLED_STAGE2_NETWORK_REFRESH", "").strip().lower() not in ( 36 "1", 37 "true", 38 "yes", 39 ) 40 41 42def _link_key(handle: str, slug: str) -> tuple[str, str]: 43 return handle.lower(), slug.lower() 44 45 46def _load_existing_links(conn) -> set[tuple[str, str]]: 47 """(owner_handle, slug) pairs already stored — match on name or rkey.""" 48 rows = conn.execute( 49 """ 50 select owner_handle, name, rkey 51 from tangled_repos 52 where owner_handle is not null 53 """ 54 ).fetchall() 55 existing: set[tuple[str, str]] = set() 56 for row in rows: 57 handle = row.get("owner_handle") 58 if not isinstance(handle, str) or not handle: 59 continue 60 for slug in (row.get("name"), row.get("rkey")): 61 if isinstance(slug, str) and slug: 62 existing.add(_link_key(handle, slug)) 63 return existing 64 65 66def _partition_links( 67 links: list[tuple[str, str]], existing: set[tuple[str, str]] 68) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]: 69 pending: list[tuple[str, str]] = [] 70 skipped: list[tuple[str, str]] = [] 71 for handle, slug in links: 72 if _link_key(handle, slug) in existing: 73 skipped.append((handle, slug)) 74 else: 75 pending.append((handle, slug)) 76 return pending, skipped 77 78 79def resolve_handle(client: httpx.Client, handle: str) -> str | None: 80 for base in RESOLVE_PDS: 81 try: 82 resp = client.get( 83 f"{base}/xrpc/com.atproto.identity.resolveHandle", 84 params={"handle": handle}, 85 ) 86 if resp.status_code == 200: 87 did = resp.json().get("did") 88 if isinstance(did, str): 89 return did 90 except httpx.HTTPError: 91 continue 92 return None 93 94 95def fetch_repo_record( 96 client: httpx.Client, 97 *, 98 pds_host: str, 99 owner_did: str, 100 rkey: str, 101 repo_slug: str, 102) -> dict[str, Any] | None: 103 """Fetch sh.tangled.repo from owner's PDS (Bluesky or tngl).""" 104 base = pds_host.rstrip("/") 105 try: 106 resp = client.get( 107 f"{base}/xrpc/com.atproto.repo.getRecord", 108 params={ 109 "repo": owner_did, 110 "collection": COLLECTION, 111 "rkey": rkey, 112 }, 113 ) 114 if resp.status_code == 200: 115 return resp.json() 116 except httpx.HTTPError: 117 pass 118 119 cursor: str | None = None 120 while True: 121 try: 122 data = list_records( 123 client, pds_host, owner_did, COLLECTION, cursor=cursor, limit=100 124 ) 125 except httpx.HTTPError: 126 return None 127 128 for rec in data.get("records") or []: 129 value = rec.get("value") 130 uri = rec.get("uri") 131 if not isinstance(value, dict) or not isinstance(uri, str): 132 continue 133 name = value.get("name") 134 if uri.endswith(f"/{repo_slug}") or name == repo_slug: 135 return {"uri": uri, "cid": rec.get("cid"), "value": value} 136 137 cursor = data.get("cursor") 138 if not cursor or not data.get("records"): 139 break 140 return None 141 142 143@dataclass 144class NetworkFetchResult: 145 owner_handle: str 146 repo_slug: str 147 status: str # ok | resolve_failed | record_failed | error 148 owner_did: str | None = None 149 pds_host: str | None = None 150 record: dict[str, Any] | None = None 151 error: str | None = None 152 153 154class _ResolveCache: 155 def __init__(self) -> None: 156 self._handle_did: dict[str, str | None] = {} 157 self._did_pds: dict[str, str | None] = {} 158 self._lock = threading.Lock() 159 160 def resolve_owner( 161 self, client: httpx.Client, handle: str 162 ) -> tuple[str | None, str | None]: 163 with self._lock: 164 if handle in self._handle_did: 165 did = self._handle_did[handle] 166 if did is None: 167 return None, None 168 pds = self._did_pds.get(did) 169 if pds is not None: 170 return did, pds 171 172 did = resolve_handle(client, handle) 173 pds = None 174 if did: 175 pds = pds_host_for_did(client, did) or DEFAULT_PDS 176 177 with self._lock: 178 self._handle_did[handle] = did 179 if did: 180 self._did_pds[did] = pds 181 return did, pds 182 183 184def _fetch_one_link( 185 owner_handle: str, 186 repo_slug: str, 187 cache: _ResolveCache, 188) -> NetworkFetchResult: 189 result = NetworkFetchResult( 190 owner_handle=owner_handle, 191 repo_slug=repo_slug, 192 status="error", 193 ) 194 try: 195 with httpx.Client(timeout=60.0, follow_redirects=True) as client: 196 owner_did, pds_host = cache.resolve_owner(client, owner_handle) 197 if not owner_did: 198 result.status = "resolve_failed" 199 return result 200 201 result.owner_did = owner_did 202 result.pds_host = pds_host 203 204 record = fetch_repo_record( 205 client, 206 pds_host=pds_host or DEFAULT_PDS, 207 owner_did=owner_did, 208 rkey=repo_slug, 209 repo_slug=repo_slug, 210 ) 211 if not record: 212 result.status = "record_failed" 213 return result 214 215 result.record = record 216 result.status = "ok" 217 return result 218 except httpx.HTTPError as exc: 219 result.status = "error" 220 result.error = str(exc) 221 return result 222 except Exception as exc: 223 result.status = "error" 224 result.error = str(exc) 225 return result 226 227 228def upsert_identity(conn, *, did: str, handle: str | None, pds_host: str | None) -> None: 229 conn.execute( 230 """ 231 insert into tangled_identities (did, handle, pds_host, last_synced_at) 232 values (%s, %s, %s, now()) 233 on conflict (did) do update set 234 handle = coalesce(excluded.handle, tangled_identities.handle), 235 pds_host = coalesce(excluded.pds_host, tangled_identities.pds_host), 236 last_synced_at = now() 237 """, 238 (did, handle, pds_host), 239 ) 240 241 242def upsert_network_repo( 243 conn, 244 *, 245 owner_did: str, 246 owner_handle: str, 247 repo_slug: str, 248 pds_host: str, 249 record: dict[str, Any], 250) -> None: 251 uri = record["uri"] 252 value = record["value"] 253 rkey = uri.rsplit("/", 1)[-1] 254 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else None 255 knot = value.get("knot") if isinstance(value.get("knot"), str) else None 256 name = value.get("name") if isinstance(value.get("name"), str) else None 257 if not name: 258 name = repo_slug if not repo_slug.startswith("3l") else None 259 260 conn.execute( 261 """ 262 insert into tangled_repos ( 263 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname, 264 cid, record_raw, discovered_via, last_synced_at 265 ) 266 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, 'appview_search', now()) 267 on conflict (uri) do update set 268 owner_did = excluded.owner_did, 269 owner_handle = excluded.owner_handle, 270 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did), 271 name = coalesce(excluded.name, tangled_repos.name), 272 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname), 273 cid = excluded.cid, 274 record_raw = excluded.record_raw, 275 discovered_via = coalesce(tangled_repos.discovered_via, excluded.discovered_via), 276 last_synced_at = now() 277 """, 278 ( 279 uri, 280 owner_did, 281 owner_handle, 282 rkey, 283 repo_did, 284 name, 285 knot, 286 record.get("cid") if isinstance(record.get("cid"), str) else None, 287 json.dumps(value), 288 ), 289 ) 290 291 upsert_atproto_record( 292 conn, 293 uri=uri, 294 author_did=owner_did, 295 collection=COLLECTION, 296 rkey=rkey, 297 payload=value, 298 cid=record.get("cid") if isinstance(record.get("cid"), str) else None, 299 repo_did=repo_did, 300 ) 301 302 303def run_stage2_network(dsn: str) -> dict[str, Any]: 304 workers = concurrency_env("TANGLED_STAGE2_NETWORK_CONCURRENCY", default=20) 305 306 banner("STAGE 2-network — All Tangled repos (Bluesky + tngl.sh)") 307 log("stage 2-network", "Uses tangled.org search index — only accounts WITH repos.") 308 log("stage 2-network", "Does NOT scan all Bluesky users — only Tangled repo creators.") 309 log("stage 2-network", "Resolves each owner handle → DID → PDS, then fetches sh.tangled.repo.") 310 log("stage 2-network", f"Concurrency: {workers}") 311 312 page_size = _page_limit() 313 repo_limit = _repo_limit() 314 if repo_limit: 315 log("stage 2-network", f"Repo limit: {repo_limit}") 316 if _skip_existing(): 317 log("stage 2-network", "Skip existing: on (set TANGLED_STAGE2_NETWORK_REFRESH=1 to re-fetch all)") 318 else: 319 log("stage 2-network", "Skip existing: off — refreshing every link") 320 321 stats = { 322 "search_links": 0, 323 "repos_stored": 0, 324 "already_in_db": 0, 325 "resolve_failed": 0, 326 "record_failed": 0, 327 "errors": 0, 328 } 329 330 all_links: list[tuple[str, str]] = [] 331 seen_links: set[tuple[str, str]] = set() 332 total_index: int | None = None 333 334 phase(1, "Crawl tangled.org/search index") 335 336 with httpx.Client(timeout=60.0, follow_redirects=True) as client: 337 offset = 0 338 while True: 339 _html, links, total = fetch_search_page( 340 client, offset=offset, limit=page_size 341 ) 342 if total is not None: 343 total_index = total 344 345 new = 0 346 for link in links: 347 if link not in seen_links: 348 seen_links.add(link) 349 all_links.append(link) 350 new += 1 351 352 log( 353 "stage 2-network", 354 f" search offset {offset}: +{new} links (unique: {len(all_links)}" 355 + (f" / {total_index})" if total_index else ")"), 356 ) 357 358 if repo_limit and len(all_links) >= repo_limit: 359 all_links = all_links[:repo_limit] 360 break 361 if total_index is not None and offset + page_size >= total_index: 362 break 363 if new == 0 and offset > 0: 364 break 365 offset += page_size 366 367 metric("Unique repos in search index", len(all_links)) 368 stats["search_links"] = len(all_links) 369 370 pending_links = all_links 371 if _skip_existing(): 372 with connect(dsn) as conn: 373 existing = _load_existing_links(conn) 374 pending_links, skipped_links = _partition_links(all_links, existing) 375 stats["already_in_db"] = len(skipped_links) 376 metric("Already in DB (skipped)", len(skipped_links)) 377 metric("To fetch", len(pending_links)) 378 if not pending_links: 379 log("stage 2-network", "Nothing new to fetch.") 380 elif len(skipped_links) <= 10: 381 for handle, slug in skipped_links: 382 log("stage 2-network", f" skip {handle}/{slug}") 383 384 phase(2, f"Resolve owners & fetch repo records ({workers} workers)") 385 386 cache = _ResolveCache() 387 done = 0 388 done_lock = threading.Lock() 389 total_work = len(pending_links) 390 391 with connect(dsn) as conn: 392 set_crawl_state( 393 conn, 394 key=CRAWL_KEY, 395 status="running", 396 meta={ 397 "link_count": len(all_links), 398 "pending_count": len(pending_links), 399 "skipped_count": stats["already_in_db"], 400 "total_index": total_index, 401 "workers": workers, 402 }, 403 ) 404 conn.commit() 405 406 if not pending_links: 407 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 408 conn.commit() 409 else: 410 with ThreadPoolExecutor(max_workers=workers) as pool: 411 futures = { 412 pool.submit(_fetch_one_link, handle, slug, cache): (handle, slug) 413 for handle, slug in pending_links 414 } 415 416 for future in as_completed(futures): 417 owner_handle, repo_slug = futures[future] 418 label = f"{owner_handle}/{repo_slug}" 419 420 try: 421 result = future.result() 422 except Exception as exc: 423 result = NetworkFetchResult( 424 owner_handle=owner_handle, 425 repo_slug=repo_slug, 426 status="error", 427 error=str(exc), 428 ) 429 430 with done_lock: 431 done += 1 432 n = done 433 434 if result.status == "ok" and result.record and result.owner_did: 435 upsert_identity( 436 conn, 437 did=result.owner_did, 438 handle=owner_handle, 439 pds_host=result.pds_host, 440 ) 441 upsert_network_repo( 442 conn, 443 owner_did=result.owner_did, 444 owner_handle=owner_handle, 445 repo_slug=repo_slug, 446 pds_host=result.pds_host or DEFAULT_PDS, 447 record=result.record, 448 ) 449 stats["repos_stored"] += 1 450 if n <= 10 or n % 50 == 0: 451 pds_label = ( 452 "bsky" 453 if result.pds_host and "bsky" in result.pds_host 454 else "tngl" 455 ) 456 step( 457 "stage 2-network", 458 n, 459 total_work, 460 f"OK {label} did={result.owner_did[:20]}… pds={pds_label}", 461 ) 462 elif result.status == "resolve_failed": 463 stats["resolve_failed"] += 1 464 if n <= 10 or n % 100 == 0: 465 step( 466 "stage 2-network", 467 n, 468 total_work, 469 f"SKIP {label} — handle not resolved", 470 ) 471 elif result.status == "record_failed": 472 stats["record_failed"] += 1 473 if n <= 10 or n % 100 == 0: 474 step( 475 "stage 2-network", 476 n, 477 total_work, 478 f"FAIL {label} — no record on {result.pds_host or '?'}", 479 ) 480 else: 481 stats["errors"] += 1 482 if n <= 10 or n % 100 == 0: 483 step( 484 "stage 2-network", 485 n, 486 total_work, 487 f"ERROR {label}: {result.error or 'unknown'}", 488 ) 489 490 if n % 50 == 0: 491 conn.commit() 492 493 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 494 conn.commit() 495 496 summary_block( 497 "Stage 2-network complete", 498 [ 499 f"Search index links: {len(all_links)}", 500 f"Already in DB (skip): {stats['already_in_db']}", 501 f"Repos stored/updated: {stats['repos_stored']}", 502 f"Handle resolve failed: {stats['resolve_failed']}", 503 f"Record fetch failed: {stats['record_failed']}", 504 f"Errors: {stats['errors']}", 505 "", 506 "Query: select discovered_via, count(*) from tangled_repos group by 1;", 507 ], 508 ) 509 return stats