This repository has no description
1from __future__ import annotations
2
3import json
4import os
5import threading
6from concurrent.futures import ThreadPoolExecutor, as_completed
7from dataclasses import dataclass
8from typing import Any
9
10import httpx
11
12from appview_client import fetch_search_page
13from db import connect, set_crawl_state, upsert_atproto_record
14from parallel import concurrency_env
15from pds_client import DEFAULT_PDS, list_records, pds_host_for_did
16from progress import banner, log, metric, phase, step, summary_block
17
18CRAWL_KEY = "stage2:network"
19COLLECTION = "sh.tangled.repo"
20RESOLVE_PDS = ("https://bsky.social", "https://tngl.sh")
21
22
23def _page_limit() -> int:
24 return max(1, min(100, int(os.getenv("TANGLED_NETWORK_PAGE_SIZE", "100"))))
25
26
27def _repo_limit() -> int | None:
28 raw = os.getenv("TANGLED_STAGE2_NETWORK_LIMIT", "").strip()
29 if not raw:
30 return None
31 return max(1, int(raw))
32
33
34def _skip_existing() -> bool:
35 return os.getenv("TANGLED_STAGE2_NETWORK_REFRESH", "").strip().lower() not in (
36 "1",
37 "true",
38 "yes",
39 )
40
41
42def _link_key(handle: str, slug: str) -> tuple[str, str]:
43 return handle.lower(), slug.lower()
44
45
46def _load_existing_links(conn) -> set[tuple[str, str]]:
47 """(owner_handle, slug) pairs already stored — match on name or rkey."""
48 rows = conn.execute(
49 """
50 select owner_handle, name, rkey
51 from tangled_repos
52 where owner_handle is not null
53 """
54 ).fetchall()
55 existing: set[tuple[str, str]] = set()
56 for row in rows:
57 handle = row.get("owner_handle")
58 if not isinstance(handle, str) or not handle:
59 continue
60 for slug in (row.get("name"), row.get("rkey")):
61 if isinstance(slug, str) and slug:
62 existing.add(_link_key(handle, slug))
63 return existing
64
65
66def _partition_links(
67 links: list[tuple[str, str]], existing: set[tuple[str, str]]
68) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]:
69 pending: list[tuple[str, str]] = []
70 skipped: list[tuple[str, str]] = []
71 for handle, slug in links:
72 if _link_key(handle, slug) in existing:
73 skipped.append((handle, slug))
74 else:
75 pending.append((handle, slug))
76 return pending, skipped
77
78
79def resolve_handle(client: httpx.Client, handle: str) -> str | None:
80 for base in RESOLVE_PDS:
81 try:
82 resp = client.get(
83 f"{base}/xrpc/com.atproto.identity.resolveHandle",
84 params={"handle": handle},
85 )
86 if resp.status_code == 200:
87 did = resp.json().get("did")
88 if isinstance(did, str):
89 return did
90 except httpx.HTTPError:
91 continue
92 return None
93
94
95def fetch_repo_record(
96 client: httpx.Client,
97 *,
98 pds_host: str,
99 owner_did: str,
100 rkey: str,
101 repo_slug: str,
102) -> dict[str, Any] | None:
103 """Fetch sh.tangled.repo from owner's PDS (Bluesky or tngl)."""
104 base = pds_host.rstrip("/")
105 try:
106 resp = client.get(
107 f"{base}/xrpc/com.atproto.repo.getRecord",
108 params={
109 "repo": owner_did,
110 "collection": COLLECTION,
111 "rkey": rkey,
112 },
113 )
114 if resp.status_code == 200:
115 return resp.json()
116 except httpx.HTTPError:
117 pass
118
119 cursor: str | None = None
120 while True:
121 try:
122 data = list_records(
123 client, pds_host, owner_did, COLLECTION, cursor=cursor, limit=100
124 )
125 except httpx.HTTPError:
126 return None
127
128 for rec in data.get("records") or []:
129 value = rec.get("value")
130 uri = rec.get("uri")
131 if not isinstance(value, dict) or not isinstance(uri, str):
132 continue
133 name = value.get("name")
134 if uri.endswith(f"/{repo_slug}") or name == repo_slug:
135 return {"uri": uri, "cid": rec.get("cid"), "value": value}
136
137 cursor = data.get("cursor")
138 if not cursor or not data.get("records"):
139 break
140 return None
141
142
143@dataclass
144class NetworkFetchResult:
145 owner_handle: str
146 repo_slug: str
147 status: str # ok | resolve_failed | record_failed | error
148 owner_did: str | None = None
149 pds_host: str | None = None
150 record: dict[str, Any] | None = None
151 error: str | None = None
152
153
154class _ResolveCache:
155 def __init__(self) -> None:
156 self._handle_did: dict[str, str | None] = {}
157 self._did_pds: dict[str, str | None] = {}
158 self._lock = threading.Lock()
159
160 def resolve_owner(
161 self, client: httpx.Client, handle: str
162 ) -> tuple[str | None, str | None]:
163 with self._lock:
164 if handle in self._handle_did:
165 did = self._handle_did[handle]
166 if did is None:
167 return None, None
168 pds = self._did_pds.get(did)
169 if pds is not None:
170 return did, pds
171
172 did = resolve_handle(client, handle)
173 pds = None
174 if did:
175 pds = pds_host_for_did(client, did) or DEFAULT_PDS
176
177 with self._lock:
178 self._handle_did[handle] = did
179 if did:
180 self._did_pds[did] = pds
181 return did, pds
182
183
184def _fetch_one_link(
185 owner_handle: str,
186 repo_slug: str,
187 cache: _ResolveCache,
188) -> NetworkFetchResult:
189 result = NetworkFetchResult(
190 owner_handle=owner_handle,
191 repo_slug=repo_slug,
192 status="error",
193 )
194 try:
195 with httpx.Client(timeout=60.0, follow_redirects=True) as client:
196 owner_did, pds_host = cache.resolve_owner(client, owner_handle)
197 if not owner_did:
198 result.status = "resolve_failed"
199 return result
200
201 result.owner_did = owner_did
202 result.pds_host = pds_host
203
204 record = fetch_repo_record(
205 client,
206 pds_host=pds_host or DEFAULT_PDS,
207 owner_did=owner_did,
208 rkey=repo_slug,
209 repo_slug=repo_slug,
210 )
211 if not record:
212 result.status = "record_failed"
213 return result
214
215 result.record = record
216 result.status = "ok"
217 return result
218 except httpx.HTTPError as exc:
219 result.status = "error"
220 result.error = str(exc)
221 return result
222 except Exception as exc:
223 result.status = "error"
224 result.error = str(exc)
225 return result
226
227
228def upsert_identity(conn, *, did: str, handle: str | None, pds_host: str | None) -> None:
229 conn.execute(
230 """
231 insert into tangled_identities (did, handle, pds_host, last_synced_at)
232 values (%s, %s, %s, now())
233 on conflict (did) do update set
234 handle = coalesce(excluded.handle, tangled_identities.handle),
235 pds_host = coalesce(excluded.pds_host, tangled_identities.pds_host),
236 last_synced_at = now()
237 """,
238 (did, handle, pds_host),
239 )
240
241
242def upsert_network_repo(
243 conn,
244 *,
245 owner_did: str,
246 owner_handle: str,
247 repo_slug: str,
248 pds_host: str,
249 record: dict[str, Any],
250) -> None:
251 uri = record["uri"]
252 value = record["value"]
253 rkey = uri.rsplit("/", 1)[-1]
254 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else None
255 knot = value.get("knot") if isinstance(value.get("knot"), str) else None
256 name = value.get("name") if isinstance(value.get("name"), str) else None
257 if not name:
258 name = repo_slug if not repo_slug.startswith("3l") else None
259
260 conn.execute(
261 """
262 insert into tangled_repos (
263 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname,
264 cid, record_raw, discovered_via, last_synced_at
265 )
266 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, 'appview_search', now())
267 on conflict (uri) do update set
268 owner_did = excluded.owner_did,
269 owner_handle = excluded.owner_handle,
270 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did),
271 name = coalesce(excluded.name, tangled_repos.name),
272 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname),
273 cid = excluded.cid,
274 record_raw = excluded.record_raw,
275 discovered_via = coalesce(tangled_repos.discovered_via, excluded.discovered_via),
276 last_synced_at = now()
277 """,
278 (
279 uri,
280 owner_did,
281 owner_handle,
282 rkey,
283 repo_did,
284 name,
285 knot,
286 record.get("cid") if isinstance(record.get("cid"), str) else None,
287 json.dumps(value),
288 ),
289 )
290
291 upsert_atproto_record(
292 conn,
293 uri=uri,
294 author_did=owner_did,
295 collection=COLLECTION,
296 rkey=rkey,
297 payload=value,
298 cid=record.get("cid") if isinstance(record.get("cid"), str) else None,
299 repo_did=repo_did,
300 )
301
302
303def run_stage2_network(dsn: str) -> dict[str, Any]:
304 workers = concurrency_env("TANGLED_STAGE2_NETWORK_CONCURRENCY", default=20)
305
306 banner("STAGE 2-network — All Tangled repos (Bluesky + tngl.sh)")
307 log("stage 2-network", "Uses tangled.org search index — only accounts WITH repos.")
308 log("stage 2-network", "Does NOT scan all Bluesky users — only Tangled repo creators.")
309 log("stage 2-network", "Resolves each owner handle → DID → PDS, then fetches sh.tangled.repo.")
310 log("stage 2-network", f"Concurrency: {workers}")
311
312 page_size = _page_limit()
313 repo_limit = _repo_limit()
314 if repo_limit:
315 log("stage 2-network", f"Repo limit: {repo_limit}")
316 if _skip_existing():
317 log("stage 2-network", "Skip existing: on (set TANGLED_STAGE2_NETWORK_REFRESH=1 to re-fetch all)")
318 else:
319 log("stage 2-network", "Skip existing: off — refreshing every link")
320
321 stats = {
322 "search_links": 0,
323 "repos_stored": 0,
324 "already_in_db": 0,
325 "resolve_failed": 0,
326 "record_failed": 0,
327 "errors": 0,
328 }
329
330 all_links: list[tuple[str, str]] = []
331 seen_links: set[tuple[str, str]] = set()
332 total_index: int | None = None
333
334 phase(1, "Crawl tangled.org/search index")
335
336 with httpx.Client(timeout=60.0, follow_redirects=True) as client:
337 offset = 0
338 while True:
339 _html, links, total = fetch_search_page(
340 client, offset=offset, limit=page_size
341 )
342 if total is not None:
343 total_index = total
344
345 new = 0
346 for link in links:
347 if link not in seen_links:
348 seen_links.add(link)
349 all_links.append(link)
350 new += 1
351
352 log(
353 "stage 2-network",
354 f" search offset {offset}: +{new} links (unique: {len(all_links)}"
355 + (f" / {total_index})" if total_index else ")"),
356 )
357
358 if repo_limit and len(all_links) >= repo_limit:
359 all_links = all_links[:repo_limit]
360 break
361 if total_index is not None and offset + page_size >= total_index:
362 break
363 if new == 0 and offset > 0:
364 break
365 offset += page_size
366
367 metric("Unique repos in search index", len(all_links))
368 stats["search_links"] = len(all_links)
369
370 pending_links = all_links
371 if _skip_existing():
372 with connect(dsn) as conn:
373 existing = _load_existing_links(conn)
374 pending_links, skipped_links = _partition_links(all_links, existing)
375 stats["already_in_db"] = len(skipped_links)
376 metric("Already in DB (skipped)", len(skipped_links))
377 metric("To fetch", len(pending_links))
378 if not pending_links:
379 log("stage 2-network", "Nothing new to fetch.")
380 elif len(skipped_links) <= 10:
381 for handle, slug in skipped_links:
382 log("stage 2-network", f" skip {handle}/{slug}")
383
384 phase(2, f"Resolve owners & fetch repo records ({workers} workers)")
385
386 cache = _ResolveCache()
387 done = 0
388 done_lock = threading.Lock()
389 total_work = len(pending_links)
390
391 with connect(dsn) as conn:
392 set_crawl_state(
393 conn,
394 key=CRAWL_KEY,
395 status="running",
396 meta={
397 "link_count": len(all_links),
398 "pending_count": len(pending_links),
399 "skipped_count": stats["already_in_db"],
400 "total_index": total_index,
401 "workers": workers,
402 },
403 )
404 conn.commit()
405
406 if not pending_links:
407 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
408 conn.commit()
409 else:
410 with ThreadPoolExecutor(max_workers=workers) as pool:
411 futures = {
412 pool.submit(_fetch_one_link, handle, slug, cache): (handle, slug)
413 for handle, slug in pending_links
414 }
415
416 for future in as_completed(futures):
417 owner_handle, repo_slug = futures[future]
418 label = f"{owner_handle}/{repo_slug}"
419
420 try:
421 result = future.result()
422 except Exception as exc:
423 result = NetworkFetchResult(
424 owner_handle=owner_handle,
425 repo_slug=repo_slug,
426 status="error",
427 error=str(exc),
428 )
429
430 with done_lock:
431 done += 1
432 n = done
433
434 if result.status == "ok" and result.record and result.owner_did:
435 upsert_identity(
436 conn,
437 did=result.owner_did,
438 handle=owner_handle,
439 pds_host=result.pds_host,
440 )
441 upsert_network_repo(
442 conn,
443 owner_did=result.owner_did,
444 owner_handle=owner_handle,
445 repo_slug=repo_slug,
446 pds_host=result.pds_host or DEFAULT_PDS,
447 record=result.record,
448 )
449 stats["repos_stored"] += 1
450 if n <= 10 or n % 50 == 0:
451 pds_label = (
452 "bsky"
453 if result.pds_host and "bsky" in result.pds_host
454 else "tngl"
455 )
456 step(
457 "stage 2-network",
458 n,
459 total_work,
460 f"OK {label} did={result.owner_did[:20]}… pds={pds_label}",
461 )
462 elif result.status == "resolve_failed":
463 stats["resolve_failed"] += 1
464 if n <= 10 or n % 100 == 0:
465 step(
466 "stage 2-network",
467 n,
468 total_work,
469 f"SKIP {label} — handle not resolved",
470 )
471 elif result.status == "record_failed":
472 stats["record_failed"] += 1
473 if n <= 10 or n % 100 == 0:
474 step(
475 "stage 2-network",
476 n,
477 total_work,
478 f"FAIL {label} — no record on {result.pds_host or '?'}",
479 )
480 else:
481 stats["errors"] += 1
482 if n <= 10 or n % 100 == 0:
483 step(
484 "stage 2-network",
485 n,
486 total_work,
487 f"ERROR {label}: {result.error or 'unknown'}",
488 )
489
490 if n % 50 == 0:
491 conn.commit()
492
493 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
494 conn.commit()
495
496 summary_block(
497 "Stage 2-network complete",
498 [
499 f"Search index links: {len(all_links)}",
500 f"Already in DB (skip): {stats['already_in_db']}",
501 f"Repos stored/updated: {stats['repos_stored']}",
502 f"Handle resolve failed: {stats['resolve_failed']}",
503 f"Record fetch failed: {stats['record_failed']}",
504 f"Errors: {stats['errors']}",
505 "",
506 "Query: select discovered_via, count(*) from tangled_repos group by 1;",
507 ],
508 )
509 return stats