This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 14 kB View raw
1#!/usr/bin/env python3 2"""Backfill tangled_repos for issues that reference repos not yet ingested. 3 4Issues are scraped from issue authors' PDSes; repos come from separate crawls 5(stage2-network, stage2 PDS, manual seed). This script closes the gap by 6fetching sh.tangled.repo from each missing repo owner's PDS using repo_uri on 7the issue record. 8 9Usage: 10 python scraper/scrape.py backfill-repos-from-issues 11 TANGLED_BACKFILL_REPO_LIMIT=50 python scraper/scrape.py backfill-repos-from-issues 12 13After a successful run, fetch READMEs and embeddings for the new repos: 14 python scraper/scrape.py check-readmes 15 python scraper/scrape.py embed-readmes 16""" 17 18from __future__ import annotations 19 20import json 21import os 22import threading 23from concurrent.futures import ThreadPoolExecutor, as_completed 24from dataclasses import dataclass 25from typing import Any 26 27import httpx 28 29from db import connect, set_crawl_state, upsert_atproto_record 30from parallel import concurrency_env 31from pds_client import DEFAULT_PDS, handle_from_plc, pds_host_for_did 32from progress import banner, log, phase, step, summary_block 33from stage2_network import COLLECTION, fetch_repo_record, upsert_identity 34 35CRAWL_KEY = "repos:issue_backfill" 36DISCOVERED_VIA = "issue_backfill" 37 38 39def _repo_limit() -> int | None: 40 raw = os.getenv("TANGLED_BACKFILL_REPO_LIMIT", "").strip() 41 if not raw: 42 return None 43 return max(1, int(raw)) 44 45 46def _missing_repos_sql(*, limit: int | None) -> str: 47 query = """ 48 with missing as ( 49 select i.repo_did 50 from tangled_issues i 51 left join tangled_repos r on r.repo_did = i.repo_did 52 where i.repo_did is not null 53 and r.repo_did is null 54 group by i.repo_did 55 ), 56 best_uri as ( 57 select distinct on (i.repo_did) 58 i.repo_did, 59 i.repo_uri, 60 count(*) over (partition by i.repo_did) as issue_count 61 from tangled_issues i 62 inner join missing m on m.repo_did = i.repo_did 63 where i.repo_uri is not null 64 and i.repo_uri like 'at://did:%/sh.tangled.repo/%' 65 order by i.repo_did, i.fetched_at desc nulls last 66 ) 67 select 68 b.repo_did, 69 b.repo_uri, 70 b.issue_count, 71 split_part(replace(b.repo_uri, 'at://', ''), '/', 1) as owner_did, 72 split_part(b.repo_uri, '/', 5) as repo_rkey, 73 ti.handle as owner_handle, 74 ti.pds_host 75 from best_uri b 76 left join tangled_identities ti 77 on ti.did = split_part(replace(b.repo_uri, 'at://', ''), '/', 1) 78 order by b.issue_count desc, b.repo_did 79 """ 80 if limit: 81 query += f" limit {limit}" 82 return query 83 84 85def _count_missing_sql() -> str: 86 return """ 87 select 88 count(distinct i.repo_did) filter ( 89 where i.repo_uri is not null 90 and i.repo_uri like 'at://did:%/sh.tangled.repo/%' 91 ) as backfillable, 92 count(distinct i.repo_did) filter ( 93 where i.repo_uri is null 94 or i.repo_uri not like 'at://did:%/sh.tangled.repo/%' 95 ) as not_backfillable, 96 count(distinct i.repo_did) as total_missing 97 from tangled_issues i 98 left join tangled_repos r on r.repo_did = i.repo_did 99 where i.repo_did is not null 100 and r.repo_did is null 101 """ 102 103 104@dataclass 105class MissingRepo: 106 repo_did: str 107 repo_uri: str 108 issue_count: int 109 owner_did: str 110 repo_rkey: str 111 owner_handle: str | None 112 pds_host: str | None 113 114 115@dataclass 116class BackfillResult: 117 row: MissingRepo 118 status: str # ok | pds_failed | record_failed | error 119 owner_handle: str | None = None 120 pds_host: str | None = None 121 record: dict[str, Any] | None = None 122 error: str | None = None 123 124 125class _PdsCache: 126 def __init__(self) -> None: 127 self._hosts: dict[str, str | None] = {} 128 self._handles: dict[str, str | None] = {} 129 self._lock = threading.Lock() 130 131 def resolve_pds( 132 self, client: httpx.Client, owner_did: str, hint: str | None 133 ) -> str | None: 134 if hint: 135 return hint.rstrip("/") 136 with self._lock: 137 if owner_did in self._hosts: 138 return self._hosts[owner_did] 139 try: 140 pds = pds_host_for_did(client, owner_did) 141 except httpx.HTTPError: 142 pds = None 143 host = pds.rstrip("/") if pds else None 144 with self._lock: 145 self._hosts[owner_did] = host 146 return host 147 148 def resolve_handle( 149 self, client: httpx.Client, owner_did: str, hint: str | None 150 ) -> str | None: 151 if hint: 152 return hint 153 with self._lock: 154 if owner_did in self._handles: 155 return self._handles[owner_did] 156 try: 157 handle = handle_from_plc(client, owner_did) 158 except httpx.HTTPError: 159 handle = None 160 with self._lock: 161 self._handles[owner_did] = handle 162 return handle 163 164 165def upsert_issue_backfill_repo( 166 conn, 167 *, 168 owner_did: str, 169 owner_handle: str | None, 170 repo_rkey: str, 171 pds_host: str, 172 record: dict[str, Any], 173) -> None: 174 uri = record["uri"] 175 value = record["value"] 176 rkey = uri.rsplit("/", 1)[-1] 177 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else None 178 knot = value.get("knot") if isinstance(value.get("knot"), str) else None 179 name = value.get("name") if isinstance(value.get("name"), str) else None 180 if not name and not repo_rkey.startswith("3l"): 181 name = repo_rkey 182 183 conn.execute( 184 """ 185 insert into tangled_repos ( 186 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname, 187 cid, record_raw, discovered_via, last_synced_at 188 ) 189 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, now()) 190 on conflict (uri) do update set 191 owner_did = excluded.owner_did, 192 owner_handle = coalesce(excluded.owner_handle, tangled_repos.owner_handle), 193 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did), 194 name = coalesce(excluded.name, tangled_repos.name), 195 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname), 196 cid = excluded.cid, 197 record_raw = excluded.record_raw, 198 discovered_via = coalesce(tangled_repos.discovered_via, excluded.discovered_via), 199 last_synced_at = now() 200 """, 201 ( 202 uri, 203 owner_did, 204 owner_handle, 205 rkey, 206 repo_did, 207 name, 208 knot, 209 record.get("cid") if isinstance(record.get("cid"), str) else None, 210 json.dumps(value), 211 DISCOVERED_VIA, 212 ), 213 ) 214 215 upsert_atproto_record( 216 conn, 217 uri=uri, 218 author_did=owner_did, 219 collection=COLLECTION, 220 rkey=rkey, 221 payload=value, 222 cid=record.get("cid") if isinstance(record.get("cid"), str) else None, 223 repo_did=repo_did, 224 ) 225 226 227def _fetch_one(row: MissingRepo, cache: _PdsCache) -> BackfillResult: 228 result = BackfillResult(row=row, status="error") 229 try: 230 with httpx.Client(timeout=60.0, follow_redirects=True) as client: 231 pds = cache.resolve_pds(client, row.owner_did, row.pds_host) 232 if not pds: 233 result.status = "pds_failed" 234 return result 235 236 owner_handle = cache.resolve_handle(client, row.owner_did, row.owner_handle) 237 result.owner_handle = owner_handle 238 result.pds_host = pds 239 240 record = fetch_repo_record( 241 client, 242 pds_host=pds, 243 owner_did=row.owner_did, 244 rkey=row.repo_rkey, 245 repo_slug=row.repo_rkey, 246 ) 247 if not record: 248 result.status = "record_failed" 249 return result 250 251 result.record = record 252 result.status = "ok" 253 return result 254 except httpx.HTTPError as exc: 255 result.status = "error" 256 result.error = str(exc)[:200] 257 return result 258 except Exception as exc: 259 result.status = "error" 260 result.error = str(exc)[:200] 261 return result 262 263 264def run_backfill_repos_from_issues(dsn: str) -> dict[str, Any]: 265 workers = concurrency_env("TANGLED_BACKFILL_REPO_CONCURRENCY", default=20) 266 repo_limit = _repo_limit() 267 268 banner("BACKFILL — Repos referenced by issues but missing from tangled_repos") 269 log("backfill", f"Concurrency: {workers}") 270 if repo_limit: 271 log("backfill", f"Repo limit: {repo_limit}") 272 273 stats: dict[str, Any] = { 274 "backfillable": 0, 275 "not_backfillable": 0, 276 "total_missing": 0, 277 "queued": 0, 278 "repos_stored": 0, 279 "pds_failed": 0, 280 "record_failed": 0, 281 "errors": 0, 282 } 283 284 with connect(dsn) as conn: 285 counts = conn.execute(_count_missing_sql()).fetchone() 286 if counts: 287 stats["backfillable"] = int(counts.get("backfillable") or 0) 288 stats["not_backfillable"] = int(counts.get("not_backfillable") or 0) 289 stats["total_missing"] = int(counts.get("total_missing") or 0) 290 291 log( 292 "backfill", 293 f"Missing repos: {stats['total_missing']} " 294 f"({stats['backfillable']} with parseable repo_uri, " 295 f"{stats['not_backfillable']} without)", 296 ) 297 298 rows = conn.execute(_missing_repos_sql(limit=repo_limit)).fetchall() 299 pending = [ 300 MissingRepo( 301 repo_did=row["repo_did"], 302 repo_uri=row["repo_uri"], 303 issue_count=int(row["issue_count"] or 0), 304 owner_did=row["owner_did"], 305 repo_rkey=row["repo_rkey"], 306 owner_handle=row.get("owner_handle"), 307 pds_host=row.get("pds_host"), 308 ) 309 for row in rows 310 if row.get("owner_did") and row.get("repo_rkey") 311 ] 312 stats["queued"] = len(pending) 313 314 if not pending: 315 log("backfill", "Nothing to backfill.") 316 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 317 conn.commit() 318 return stats 319 320 phase(1, f"Fetch sh.tangled.repo for {len(pending)} missing repos") 321 set_crawl_state( 322 conn, 323 key=CRAWL_KEY, 324 status="running", 325 meta={**stats, "workers": workers}, 326 ) 327 conn.commit() 328 329 cache = _PdsCache() 330 done = 0 331 done_lock = threading.Lock() 332 333 with ThreadPoolExecutor(max_workers=workers) as pool: 334 futures = { 335 pool.submit(_fetch_one, row, cache): row for row in pending 336 } 337 338 for future in as_completed(futures): 339 row = futures[future] 340 label = f"{row.owner_did[:20]}…/{row.repo_rkey}" 341 342 try: 343 result = future.result() 344 except Exception as exc: 345 result = BackfillResult( 346 row=row, 347 status="error", 348 error=str(exc)[:200], 349 ) 350 351 with done_lock: 352 done += 1 353 n = done 354 355 if result.status == "ok" and result.record: 356 upsert_identity( 357 conn, 358 did=row.owner_did, 359 handle=result.owner_handle, 360 pds_host=result.pds_host, 361 ) 362 upsert_issue_backfill_repo( 363 conn, 364 owner_did=row.owner_did, 365 owner_handle=result.owner_handle, 366 repo_rkey=row.repo_rkey, 367 pds_host=result.pds_host or DEFAULT_PDS, 368 record=result.record, 369 ) 370 stats["repos_stored"] += 1 371 if n <= 10 or n % 25 == 0: 372 step( 373 "backfill", 374 n, 375 len(pending), 376 f"OK {label} issues={row.issue_count}", 377 ) 378 elif result.status == "pds_failed": 379 stats["pds_failed"] += 1 380 if n <= 10 or n % 50 == 0: 381 step( 382 "backfill", 383 n, 384 len(pending), 385 f"SKIP {label} — could not resolve PDS", 386 ) 387 elif result.status == "record_failed": 388 stats["record_failed"] += 1 389 if n <= 10 or n % 50 == 0: 390 step( 391 "backfill", 392 n, 393 len(pending), 394 f"FAIL {label} — no sh.tangled.repo on PDS", 395 ) 396 else: 397 stats["errors"] += 1 398 if n <= 10 or n % 50 == 0: 399 step( 400 "backfill", 401 n, 402 len(pending), 403 f"ERROR {label}: {result.error or 'unknown'}", 404 ) 405 406 if n % 25 == 0: 407 conn.commit() 408 409 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 410 conn.commit() 411 412 summary_block( 413 "Issue repo backfill complete", 414 [ 415 f"Missing repos (total): {stats['total_missing']}", 416 f"Backfillable (repo_uri): {stats['backfillable']}", 417 f"Queued this run: {stats['queued']}", 418 f"Repos stored/updated: {stats['repos_stored']}", 419 f"PDS resolve failed: {stats['pds_failed']}", 420 f"Record fetch failed: {stats['record_failed']}", 421 f"Errors: {stats['errors']}", 422 "", 423 "Next: python scraper/scrape.py check-readmes", 424 " python scraper/scrape.py embed-readmes", 425 ], 426 ) 427 return stats