This repository has no description
1#!/usr/bin/env python3
2"""Scrape sh.tangled.repo.issue (+ state) from every known user PDS."""
3
4from __future__ import annotations
5
6import json
7import os
8import sys
9import threading
10import time
11from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
12from dataclasses import dataclass, field
13from pathlib import Path
14from typing import Any
15
16import httpx
17from dotenv import load_dotenv
18
19from db import connect, init_schema, set_crawl_state
20from parallel import concurrency_env
21from pds_client import list_records, pds_host_for_did
22from progress import banner, log, metric, phase, step, summary_block
23
24REPO_ROOT = Path(__file__).resolve().parent.parent
25CRAWL_KEY = "issues:fetch"
26ISSUE_COLLECTION = "sh.tangled.repo.issue"
27STATE_COLLECTION = "sh.tangled.repo.issue.state"
28STATE_OPEN = "sh.tangled.repo.issue.state.open"
29STATE_CLOSED = "sh.tangled.repo.issue.state.closed"
30HTTP_TIMEOUT = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=10.0)
31LOG_EVERY = 10
32HEARTBEAT_SEC = 15
33INFLIGHT_CHUNK = 200
34
35
36class _PdsCache:
37 def __init__(self) -> None:
38 self._hosts: dict[str, str | None] = {}
39 self._lock = threading.Lock()
40
41 def resolve(self, client: httpx.Client, user_did: str, hint: str | None) -> str | None:
42 if hint:
43 return hint.rstrip("/")
44 with self._lock:
45 if user_did in self._hosts:
46 return self._hosts[user_did]
47 try:
48 pds = pds_host_for_did(client, user_did)
49 except httpx.HTTPError:
50 pds = None
51 with self._lock:
52 self._hosts[user_did] = pds.rstrip("/") if pds else None
53 return self._hosts[user_did]
54
55
56@dataclass
57class UserIssueResult:
58 user_did: str
59 handle: str | None
60 status: str # ok | error
61 issues: list[dict[str, Any]] = field(default_factory=list)
62 states: list[dict[str, Any]] = field(default_factory=list)
63 error: str | None = None
64
65
66def _user_limit() -> int | None:
67 raw = os.getenv("TANGLED_ISSUE_USER_LIMIT", "").strip()
68 if not raw:
69 return None
70 return max(1, int(raw))
71
72
73def _max_pages() -> int:
74 raw = os.getenv("TANGLED_ISSUE_MAX_PAGES", "50").strip()
75 return max(1, int(raw))
76
77
78def _skip_existing() -> bool:
79 return os.getenv("TANGLED_ISSUE_REFRESH", "").strip().lower() not in (
80 "1",
81 "true",
82 "yes",
83 )
84
85
86def _all_users() -> bool:
87 return os.getenv("TANGLED_ISSUE_ALL_USERS", "1").strip().lower() not in (
88 "0",
89 "false",
90 "no",
91 )
92
93
94def _users_query(*, skip_existing: bool, user_limit: int | None, all_users: bool) -> str:
95 skip_clause = ""
96 if skip_existing:
97 skip_clause = """
98 and not exists (
99 select 1 from tangled_issue_user_sync s where s.user_did = u.did
100 )
101 """
102 pds_union = ""
103 if all_users:
104 pds_union = """
105 union all
106 select did, handle, pds_host from tangled_pds_accounts
107 """
108 query = f"""
109 select distinct on (u.did) u.did, u.handle, u.pds_host
110 from (
111 select did, handle, pds_host from tangled_identities
112 union all
113 select owner_did as did,
114 max(owner_handle) as handle,
115 null::text as pds_host
116 from tangled_repos
117 where owner_did is not null
118 group by owner_did
119 {pds_union}
120 ) u
121 where u.did is not null
122 {skip_clause}
123 order by u.did, u.pds_host nulls last, u.handle nulls last
124 """
125 if user_limit:
126 query += f" limit {user_limit}"
127 return query
128
129
130def _total_users_sql(*, all_users: bool) -> str:
131 pds_union = ""
132 if all_users:
133 pds_union = "union select did from tangled_pds_accounts"
134 return f"""
135 select count(*) as n from (
136 select did from tangled_identities
137 union
138 select owner_did from tangled_repos where owner_did is not null
139 {pds_union}
140 ) x
141 """
142
143
144def _rkey_from_uri(uri: str) -> str:
145 return uri.rsplit("/", 1)[-1]
146
147
148def _parse_repo_refs(value: dict[str, Any]) -> tuple[str | None, str | None]:
149 repo = value.get("repo")
150 if isinstance(repo, str):
151 if repo.startswith("did:"):
152 return repo, None
153 if repo.startswith("at://"):
154 return _repo_did_from_at_uri(repo), repo
155 return None, repo if isinstance(repo, str) else None
156
157
158def _repo_did_from_at_uri(uri: str) -> str | None:
159 if not uri.startswith("at://"):
160 return None
161 parts = uri.removeprefix("at://").split("/")
162 return parts[0] if parts and parts[0].startswith("did:") else None
163
164
165def _list_all_records(
166 client: httpx.Client,
167 pds_host: str,
168 user_did: str,
169 collection: str,
170 *,
171 max_pages: int,
172) -> list[dict[str, Any]]:
173 records: list[dict[str, Any]] = []
174 cursor: str | None = None
175 seen_cursors: set[str] = set()
176
177 for _ in range(max_pages):
178 data = list_records(
179 client, pds_host, user_did, collection, cursor=cursor, limit=100
180 )
181 page = data.get("records") or []
182 records.extend(rec for rec in page if isinstance(rec, dict))
183 next_cursor = data.get("cursor")
184 if not next_cursor or not page:
185 break
186 if not isinstance(next_cursor, str) or next_cursor in seen_cursors:
187 break
188 seen_cursors.add(next_cursor)
189 cursor = next_cursor
190 return records
191
192
193def _state_map(states: list[dict[str, Any]]) -> dict[str, str]:
194 mapping: dict[str, str] = {}
195 for rec in states:
196 value = rec.get("value")
197 if not isinstance(value, dict):
198 continue
199 issue_uri = value.get("issue")
200 state = value.get("state")
201 if not isinstance(state, str):
202 continue
203 if state == STATE_CLOSED:
204 normalized = "closed"
205 elif state == STATE_OPEN:
206 normalized = "open"
207 else:
208 normalized = "open"
209 if isinstance(issue_uri, str) and issue_uri:
210 mapping[issue_uri] = normalized
211 else:
212 rkey = _rkey_from_uri(rec["uri"]) if isinstance(rec.get("uri"), str) else None
213 if rkey:
214 mapping[f"rkey:{rkey}"] = normalized
215 return mapping
216
217
218def _issue_state(uri: str, rkey: str, states: dict[str, str]) -> str:
219 if uri in states:
220 return states[uri]
221 return states.get(f"rkey:{rkey}", "open")
222
223
224def _optional_timestamp(value: Any) -> str | None:
225 if not isinstance(value, str):
226 return None
227 value = value.strip()
228 return value if value else None
229
230
231def upsert_issue(
232 conn,
233 *,
234 record: dict[str, Any],
235 author_did: str,
236 author_handle: str | None,
237 state: str,
238) -> None:
239 uri = record["uri"]
240 value = record["value"]
241 rkey = _rkey_from_uri(uri)
242 repo_did, repo_uri = _parse_repo_refs(value)
243 title = value.get("title") if isinstance(value.get("title"), str) else None
244 body = value.get("body") if isinstance(value.get("body"), str) else None
245 created = _optional_timestamp(value.get("createdAt"))
246
247 conn.execute(
248 """
249 insert into tangled_issues (
250 uri, author_did, author_handle, rkey, repo_did, repo_uri,
251 title, body, state, issue_created_at, cid, record_raw, fetched_at
252 )
253 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::timestamptz, %s, %s::jsonb, now())
254 on conflict (uri) do update set
255 author_did = excluded.author_did,
256 author_handle = excluded.author_handle,
257 rkey = excluded.rkey,
258 repo_did = coalesce(excluded.repo_did, tangled_issues.repo_did),
259 repo_uri = coalesce(excluded.repo_uri, tangled_issues.repo_uri),
260 title = excluded.title,
261 body = excluded.body,
262 state = excluded.state,
263 issue_created_at = excluded.issue_created_at,
264 cid = excluded.cid,
265 record_raw = excluded.record_raw,
266 fetched_at = now(),
267 embedding = case
268 when tangled_issues.title is distinct from excluded.title
269 or tangled_issues.body is distinct from excluded.body
270 then null else tangled_issues.embedding end,
271 embedding_model = case
272 when tangled_issues.title is distinct from excluded.title
273 or tangled_issues.body is distinct from excluded.body
274 then null else tangled_issues.embedding_model end,
275 embedded_at = case
276 when tangled_issues.title is distinct from excluded.title
277 or tangled_issues.body is distinct from excluded.body
278 then null else tangled_issues.embedded_at end
279 """,
280 (
281 uri,
282 author_did,
283 author_handle,
284 rkey,
285 repo_did,
286 repo_uri,
287 title,
288 body,
289 state,
290 created,
291 record.get("cid") if isinstance(record.get("cid"), str) else None,
292 json.dumps(value),
293 ),
294 )
295
296
297def _mark_user_synced(
298 conn,
299 *,
300 user_did: str,
301 issue_count: int,
302 status: str,
303 error_message: str | None = None,
304) -> None:
305 conn.execute(
306 """
307 insert into tangled_issue_user_sync (
308 user_did, issue_count, synced_at, status, error_message
309 )
310 values (%s, %s, now(), %s, %s)
311 on conflict (user_did) do update set
312 issue_count = excluded.issue_count,
313 synced_at = now(),
314 status = excluded.status,
315 error_message = excluded.error_message
316 """,
317 (user_did, issue_count, status, error_message),
318 )
319
320
321def _fetch_user_issues(
322 user_did: str,
323 handle: str | None,
324 pds_host: str | None,
325 cache: _PdsCache,
326 max_pages: int,
327) -> UserIssueResult:
328 result = UserIssueResult(user_did=user_did, handle=handle, status="error")
329 try:
330 with httpx.Client(timeout=HTTP_TIMEOUT, follow_redirects=True) as client:
331 pds = cache.resolve(client, user_did, pds_host)
332 if not pds:
333 result.error = "could not resolve PDS"
334 return result
335 issues = _list_all_records(
336 client, pds, user_did, ISSUE_COLLECTION, max_pages=max_pages
337 )
338 states: list[dict[str, Any]] = []
339 if issues:
340 states = _list_all_records(
341 client, pds, user_did, STATE_COLLECTION, max_pages=max_pages
342 )
343 result.issues = issues
344 result.states = states
345 result.status = "ok"
346 return result
347 except httpx.TimeoutException:
348 result.error = "PDS timeout"
349 return result
350 except httpx.HTTPError as exc:
351 result.error = str(exc)[:200]
352 return result
353 except Exception as exc:
354 result.error = str(exc)[:200]
355 return result
356
357
358def _heartbeat_loop(
359 *,
360 done: list[int],
361 total: int,
362 inflight: list[int],
363 last_done_at: list[float],
364 stop: threading.Event,
365) -> None:
366 while not stop.wait(HEARTBEAT_SEC):
367 n = done[0]
368 pending = total - n
369 active = inflight[0]
370 idle = time.monotonic() - last_done_at[0]
371 log(
372 "issues",
373 f"… heartbeat {n}/{total} done ({active} in-flight, "
374 f"{pending} pending, last +{idle:.0f}s)",
375 )
376
377
378def run_fetch_issues(dsn: str) -> dict[str, int]:
379 workers = concurrency_env("TANGLED_ISSUE_CONCURRENCY", default=10)
380 user_limit = _user_limit()
381 skip_existing = _skip_existing()
382 all_users = _all_users()
383 max_pages = _max_pages()
384
385 banner("ISSUES — scrape sh.tangled.repo.issue from user PDSes")
386 log("issues", f"Concurrency: {workers} PDS read timeout: 15s")
387 log("issues", f"Max listRecords pages/user/collection: {max_pages}")
388 log("issues", f"User scope: {'all known DIDs (+ tngl PDS accounts)' if all_users else 'identities + repo owners'}")
389 if user_limit:
390 log("issues", f"User limit: {user_limit}")
391 if skip_existing:
392 log("issues", "Skip existing: on (set TANGLED_ISSUE_REFRESH=1 to re-scan all)")
393 else:
394 log("issues", "Skip existing: off — re-scanning every user (daily sync)")
395
396 with connect(dsn) as conn:
397 users = conn.execute(
398 _users_query(skip_existing=skip_existing, user_limit=user_limit, all_users=all_users)
399 ).fetchall()
400 total_users = conn.execute(_total_users_sql(all_users=all_users)).fetchone()["n"]
401 synced = 0
402 if skip_existing:
403 synced = conn.execute("select count(*) as n from tangled_issue_user_sync").fetchone()["n"]
404
405 if not users:
406 log("issues", "Nothing to fetch — all users already scanned.")
407 return {
408 "users_scanned": 0,
409 "issues_upserted": 0,
410 "open_issues": 0,
411 "already_synced": synced,
412 "errors": 0,
413 }
414
415 already_synced = synced if skip_existing else 0
416 metric("Known users", total_users)
417 if skip_existing:
418 metric("Already synced (skipped)", already_synced)
419 metric("To scan", len(users))
420
421 stats = {
422 "users_scanned": 0,
423 "issues_upserted": 0,
424 "open_issues": 0,
425 "already_synced": already_synced,
426 "errors": 0,
427 }
428 done_box = [0]
429 inflight_box = [0]
430 last_done_at = [time.monotonic()]
431 done_lock = threading.Lock()
432 pds_cache = _PdsCache()
433
434 phase(1, f"Parallel PDS listRecords ({workers} workers)")
435 log("issues", f"Progress every {LOG_EVERY} users + heartbeat every {HEARTBEAT_SEC}s")
436
437 stop_heartbeat = threading.Event()
438 heartbeat = threading.Thread(
439 target=_heartbeat_loop,
440 kwargs={
441 "done": done_box,
442 "total": len(users),
443 "inflight": inflight_box,
444 "last_done_at": last_done_at,
445 "stop": stop_heartbeat,
446 },
447 daemon=True,
448 )
449 heartbeat.start()
450
451 try:
452 with connect(dsn) as conn:
453 set_crawl_state(
454 conn,
455 key=CRAWL_KEY,
456 status="running",
457 meta={"user_count": len(users), "workers": workers},
458 )
459 conn.commit()
460
461 user_iter = iter(users)
462 pending_futures: dict[Any, dict[str, Any]] = {}
463
464 def submit_more(pool: ThreadPoolExecutor) -> None:
465 while len(pending_futures) < INFLIGHT_CHUNK:
466 try:
467 row = next(user_iter)
468 except StopIteration:
469 break
470 fut = pool.submit(
471 _fetch_user_issues,
472 row["did"],
473 row.get("handle"),
474 row.get("pds_host"),
475 pds_cache,
476 max_pages,
477 )
478 pending_futures[fut] = row
479 inflight_box[0] = len(pending_futures)
480
481 with ThreadPoolExecutor(max_workers=workers) as pool:
482 submit_more(pool)
483
484 while pending_futures:
485 done_set, _ = wait(pending_futures, timeout=HEARTBEAT_SEC, return_when=FIRST_COMPLETED)
486 if not done_set:
487 continue
488
489 for future in done_set:
490 row = pending_futures.pop(future)
491 label = row.get("handle") or row["did"][:20]
492
493 try:
494 result = future.result()
495 except Exception as exc:
496 result = UserIssueResult(
497 user_did=row["did"],
498 handle=row.get("handle"),
499 status="error",
500 error=str(exc)[:200],
501 )
502
503 with done_lock:
504 done_box[0] += 1
505 n = done_box[0]
506 last_done_at[0] = time.monotonic()
507
508 if result.status == "ok":
509 states = _state_map(result.states)
510 upserted = 0
511 open_n = 0
512 for rec in result.issues:
513 if not isinstance(rec.get("uri"), str) or not isinstance(
514 rec.get("value"), dict
515 ):
516 continue
517 rkey = _rkey_from_uri(rec["uri"])
518 state = _issue_state(rec["uri"], rkey, states)
519 upsert_issue(
520 conn,
521 record=rec,
522 author_did=result.user_did,
523 author_handle=result.handle,
524 state=state,
525 )
526 upserted += 1
527 if state == "open":
528 open_n += 1
529
530 _mark_user_synced(
531 conn,
532 user_did=result.user_did,
533 issue_count=upserted,
534 status="ok",
535 )
536 stats["users_scanned"] += 1
537 stats["issues_upserted"] += upserted
538 stats["open_issues"] += open_n
539 msg = f"OK {label} {upserted} issue(s) ({open_n} open)"
540 else:
541 _mark_user_synced(
542 conn,
543 user_did=result.user_did,
544 issue_count=0,
545 status="error",
546 error_message=result.error,
547 )
548 stats["errors"] += 1
549 msg = f"ERROR {label} {result.error or 'unknown'}"
550
551 if n <= 10 or n % LOG_EVERY == 0 or result.issues:
552 step("issues", n, len(users), msg)
553
554 if n % 25 == 0:
555 conn.commit()
556
557 submit_more(pool)
558 inflight_box[0] = len(pending_futures)
559
560 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
561 conn.commit()
562 finally:
563 stop_heartbeat.set()
564 heartbeat.join(timeout=1)
565
566 summary_block(
567 "Issues fetch complete",
568 [
569 f"Users scanned: {stats['users_scanned']}",
570 f"Issues upserted: {stats['issues_upserted']}",
571 f"Open (this run): {stats['open_issues']}",
572 f"Already synced: {stats['already_synced']}",
573 f"Errors: {stats['errors']}",
574 "",
575 "Query open issues:",
576 " select count(*) from tangled_open_issues;",
577 ],
578 )
579 return stats
580
581
582def main() -> None:
583 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"):
584 if candidate.exists():
585 load_dotenv(candidate)
586 break
587 else:
588 load_dotenv()
589
590 dsn = os.getenv("DB_CONNECTION_STRING", "").strip()
591 if not dsn:
592 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr)
593 raise SystemExit(1)
594
595 init_schema(dsn)
596 run_fetch_issues(dsn)
597
598
599if __name__ == "__main__":
600 try:
601 main()
602 except KeyboardInterrupt:
603 print("\nInterrupted.", file=sys.stderr)
604 raise SystemExit(130) from None