This repository has no description
1#!/usr/bin/env python3
2"""Backfill tangled_repos for issues that reference repos not yet ingested.
3
4Issues are scraped from issue authors' PDSes; repos come from separate crawls
5(stage2-network, stage2 PDS, manual seed). This script closes the gap by
6fetching sh.tangled.repo from each missing repo owner's PDS using repo_uri on
7the issue record.
8
9Usage:
10 python scraper/scrape.py backfill-repos-from-issues
11 TANGLED_BACKFILL_REPO_LIMIT=50 python scraper/scrape.py backfill-repos-from-issues
12
13After a successful run, fetch READMEs and embeddings for the new repos:
14 python scraper/scrape.py check-readmes
15 python scraper/scrape.py embed-readmes
16"""
17
18from __future__ import annotations
19
20import json
21import os
22import threading
23from concurrent.futures import ThreadPoolExecutor, as_completed
24from dataclasses import dataclass
25from typing import Any
26
27import httpx
28
29from db import connect, set_crawl_state, upsert_atproto_record
30from parallel import concurrency_env
31from pds_client import DEFAULT_PDS, handle_from_plc, pds_host_for_did
32from progress import banner, log, phase, step, summary_block
33from stage2_network import COLLECTION, fetch_repo_record, upsert_identity
34
35CRAWL_KEY = "repos:issue_backfill"
36DISCOVERED_VIA = "issue_backfill"
37
38
39def _repo_limit() -> int | None:
40 raw = os.getenv("TANGLED_BACKFILL_REPO_LIMIT", "").strip()
41 if not raw:
42 return None
43 return max(1, int(raw))
44
45
46def _missing_repos_sql(*, limit: int | None) -> str:
47 query = """
48 with missing as (
49 select i.repo_did
50 from tangled_issues i
51 left join tangled_repos r on r.repo_did = i.repo_did
52 where i.repo_did is not null
53 and r.repo_did is null
54 group by i.repo_did
55 ),
56 best_uri as (
57 select distinct on (i.repo_did)
58 i.repo_did,
59 i.repo_uri,
60 count(*) over (partition by i.repo_did) as issue_count
61 from tangled_issues i
62 inner join missing m on m.repo_did = i.repo_did
63 where i.repo_uri is not null
64 and i.repo_uri like 'at://did:%/sh.tangled.repo/%'
65 order by i.repo_did, i.fetched_at desc nulls last
66 )
67 select
68 b.repo_did,
69 b.repo_uri,
70 b.issue_count,
71 split_part(replace(b.repo_uri, 'at://', ''), '/', 1) as owner_did,
72 split_part(b.repo_uri, '/', 5) as repo_rkey,
73 ti.handle as owner_handle,
74 ti.pds_host
75 from best_uri b
76 left join tangled_identities ti
77 on ti.did = split_part(replace(b.repo_uri, 'at://', ''), '/', 1)
78 order by b.issue_count desc, b.repo_did
79 """
80 if limit:
81 query += f" limit {limit}"
82 return query
83
84
85def _count_missing_sql() -> str:
86 return """
87 select
88 count(distinct i.repo_did) filter (
89 where i.repo_uri is not null
90 and i.repo_uri like 'at://did:%/sh.tangled.repo/%'
91 ) as backfillable,
92 count(distinct i.repo_did) filter (
93 where i.repo_uri is null
94 or i.repo_uri not like 'at://did:%/sh.tangled.repo/%'
95 ) as not_backfillable,
96 count(distinct i.repo_did) as total_missing
97 from tangled_issues i
98 left join tangled_repos r on r.repo_did = i.repo_did
99 where i.repo_did is not null
100 and r.repo_did is null
101 """
102
103
104@dataclass
105class MissingRepo:
106 repo_did: str
107 repo_uri: str
108 issue_count: int
109 owner_did: str
110 repo_rkey: str
111 owner_handle: str | None
112 pds_host: str | None
113
114
115@dataclass
116class BackfillResult:
117 row: MissingRepo
118 status: str # ok | pds_failed | record_failed | error
119 owner_handle: str | None = None
120 pds_host: str | None = None
121 record: dict[str, Any] | None = None
122 error: str | None = None
123
124
125class _PdsCache:
126 def __init__(self) -> None:
127 self._hosts: dict[str, str | None] = {}
128 self._handles: dict[str, str | None] = {}
129 self._lock = threading.Lock()
130
131 def resolve_pds(
132 self, client: httpx.Client, owner_did: str, hint: str | None
133 ) -> str | None:
134 if hint:
135 return hint.rstrip("/")
136 with self._lock:
137 if owner_did in self._hosts:
138 return self._hosts[owner_did]
139 try:
140 pds = pds_host_for_did(client, owner_did)
141 except httpx.HTTPError:
142 pds = None
143 host = pds.rstrip("/") if pds else None
144 with self._lock:
145 self._hosts[owner_did] = host
146 return host
147
148 def resolve_handle(
149 self, client: httpx.Client, owner_did: str, hint: str | None
150 ) -> str | None:
151 if hint:
152 return hint
153 with self._lock:
154 if owner_did in self._handles:
155 return self._handles[owner_did]
156 try:
157 handle = handle_from_plc(client, owner_did)
158 except httpx.HTTPError:
159 handle = None
160 with self._lock:
161 self._handles[owner_did] = handle
162 return handle
163
164
165def upsert_issue_backfill_repo(
166 conn,
167 *,
168 owner_did: str,
169 owner_handle: str | None,
170 repo_rkey: str,
171 pds_host: str,
172 record: dict[str, Any],
173) -> None:
174 uri = record["uri"]
175 value = record["value"]
176 rkey = uri.rsplit("/", 1)[-1]
177 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else None
178 knot = value.get("knot") if isinstance(value.get("knot"), str) else None
179 name = value.get("name") if isinstance(value.get("name"), str) else None
180 if not name and not repo_rkey.startswith("3l"):
181 name = repo_rkey
182
183 conn.execute(
184 """
185 insert into tangled_repos (
186 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname,
187 cid, record_raw, discovered_via, last_synced_at
188 )
189 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, now())
190 on conflict (uri) do update set
191 owner_did = excluded.owner_did,
192 owner_handle = coalesce(excluded.owner_handle, tangled_repos.owner_handle),
193 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did),
194 name = coalesce(excluded.name, tangled_repos.name),
195 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname),
196 cid = excluded.cid,
197 record_raw = excluded.record_raw,
198 discovered_via = coalesce(tangled_repos.discovered_via, excluded.discovered_via),
199 last_synced_at = now()
200 """,
201 (
202 uri,
203 owner_did,
204 owner_handle,
205 rkey,
206 repo_did,
207 name,
208 knot,
209 record.get("cid") if isinstance(record.get("cid"), str) else None,
210 json.dumps(value),
211 DISCOVERED_VIA,
212 ),
213 )
214
215 upsert_atproto_record(
216 conn,
217 uri=uri,
218 author_did=owner_did,
219 collection=COLLECTION,
220 rkey=rkey,
221 payload=value,
222 cid=record.get("cid") if isinstance(record.get("cid"), str) else None,
223 repo_did=repo_did,
224 )
225
226
227def _fetch_one(row: MissingRepo, cache: _PdsCache) -> BackfillResult:
228 result = BackfillResult(row=row, status="error")
229 try:
230 with httpx.Client(timeout=60.0, follow_redirects=True) as client:
231 pds = cache.resolve_pds(client, row.owner_did, row.pds_host)
232 if not pds:
233 result.status = "pds_failed"
234 return result
235
236 owner_handle = cache.resolve_handle(client, row.owner_did, row.owner_handle)
237 result.owner_handle = owner_handle
238 result.pds_host = pds
239
240 record = fetch_repo_record(
241 client,
242 pds_host=pds,
243 owner_did=row.owner_did,
244 rkey=row.repo_rkey,
245 repo_slug=row.repo_rkey,
246 )
247 if not record:
248 result.status = "record_failed"
249 return result
250
251 result.record = record
252 result.status = "ok"
253 return result
254 except httpx.HTTPError as exc:
255 result.status = "error"
256 result.error = str(exc)[:200]
257 return result
258 except Exception as exc:
259 result.status = "error"
260 result.error = str(exc)[:200]
261 return result
262
263
264def run_backfill_repos_from_issues(dsn: str) -> dict[str, Any]:
265 workers = concurrency_env("TANGLED_BACKFILL_REPO_CONCURRENCY", default=20)
266 repo_limit = _repo_limit()
267
268 banner("BACKFILL — Repos referenced by issues but missing from tangled_repos")
269 log("backfill", f"Concurrency: {workers}")
270 if repo_limit:
271 log("backfill", f"Repo limit: {repo_limit}")
272
273 stats: dict[str, Any] = {
274 "backfillable": 0,
275 "not_backfillable": 0,
276 "total_missing": 0,
277 "queued": 0,
278 "repos_stored": 0,
279 "pds_failed": 0,
280 "record_failed": 0,
281 "errors": 0,
282 }
283
284 with connect(dsn) as conn:
285 counts = conn.execute(_count_missing_sql()).fetchone()
286 if counts:
287 stats["backfillable"] = int(counts.get("backfillable") or 0)
288 stats["not_backfillable"] = int(counts.get("not_backfillable") or 0)
289 stats["total_missing"] = int(counts.get("total_missing") or 0)
290
291 log(
292 "backfill",
293 f"Missing repos: {stats['total_missing']} "
294 f"({stats['backfillable']} with parseable repo_uri, "
295 f"{stats['not_backfillable']} without)",
296 )
297
298 rows = conn.execute(_missing_repos_sql(limit=repo_limit)).fetchall()
299 pending = [
300 MissingRepo(
301 repo_did=row["repo_did"],
302 repo_uri=row["repo_uri"],
303 issue_count=int(row["issue_count"] or 0),
304 owner_did=row["owner_did"],
305 repo_rkey=row["repo_rkey"],
306 owner_handle=row.get("owner_handle"),
307 pds_host=row.get("pds_host"),
308 )
309 for row in rows
310 if row.get("owner_did") and row.get("repo_rkey")
311 ]
312 stats["queued"] = len(pending)
313
314 if not pending:
315 log("backfill", "Nothing to backfill.")
316 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
317 conn.commit()
318 return stats
319
320 phase(1, f"Fetch sh.tangled.repo for {len(pending)} missing repos")
321 set_crawl_state(
322 conn,
323 key=CRAWL_KEY,
324 status="running",
325 meta={**stats, "workers": workers},
326 )
327 conn.commit()
328
329 cache = _PdsCache()
330 done = 0
331 done_lock = threading.Lock()
332
333 with ThreadPoolExecutor(max_workers=workers) as pool:
334 futures = {
335 pool.submit(_fetch_one, row, cache): row for row in pending
336 }
337
338 for future in as_completed(futures):
339 row = futures[future]
340 label = f"{row.owner_did[:20]}…/{row.repo_rkey}"
341
342 try:
343 result = future.result()
344 except Exception as exc:
345 result = BackfillResult(
346 row=row,
347 status="error",
348 error=str(exc)[:200],
349 )
350
351 with done_lock:
352 done += 1
353 n = done
354
355 if result.status == "ok" and result.record:
356 upsert_identity(
357 conn,
358 did=row.owner_did,
359 handle=result.owner_handle,
360 pds_host=result.pds_host,
361 )
362 upsert_issue_backfill_repo(
363 conn,
364 owner_did=row.owner_did,
365 owner_handle=result.owner_handle,
366 repo_rkey=row.repo_rkey,
367 pds_host=result.pds_host or DEFAULT_PDS,
368 record=result.record,
369 )
370 stats["repos_stored"] += 1
371 if n <= 10 or n % 25 == 0:
372 step(
373 "backfill",
374 n,
375 len(pending),
376 f"OK {label} issues={row.issue_count}",
377 )
378 elif result.status == "pds_failed":
379 stats["pds_failed"] += 1
380 if n <= 10 or n % 50 == 0:
381 step(
382 "backfill",
383 n,
384 len(pending),
385 f"SKIP {label} — could not resolve PDS",
386 )
387 elif result.status == "record_failed":
388 stats["record_failed"] += 1
389 if n <= 10 or n % 50 == 0:
390 step(
391 "backfill",
392 n,
393 len(pending),
394 f"FAIL {label} — no sh.tangled.repo on PDS",
395 )
396 else:
397 stats["errors"] += 1
398 if n <= 10 or n % 50 == 0:
399 step(
400 "backfill",
401 n,
402 len(pending),
403 f"ERROR {label}: {result.error or 'unknown'}",
404 )
405
406 if n % 25 == 0:
407 conn.commit()
408
409 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
410 conn.commit()
411
412 summary_block(
413 "Issue repo backfill complete",
414 [
415 f"Missing repos (total): {stats['total_missing']}",
416 f"Backfillable (repo_uri): {stats['backfillable']}",
417 f"Queued this run: {stats['queued']}",
418 f"Repos stored/updated: {stats['repos_stored']}",
419 f"PDS resolve failed: {stats['pds_failed']}",
420 f"Record fetch failed: {stats['record_failed']}",
421 f"Errors: {stats['errors']}",
422 "",
423 "Next: python scraper/scrape.py check-readmes",
424 " python scraper/scrape.py embed-readmes",
425 ],
426 )
427 return stats