This repository has no description
1#!/usr/bin/env python3
2"""Fetch collaborator lists for all repos via knot listCollaborators."""
3
4from __future__ import annotations
5
6import os
7import sys
8import threading
9from concurrent.futures import ThreadPoolExecutor, as_completed
10from dataclasses import dataclass, field
11from pathlib import Path
12from typing import Any
13
14import httpx
15from dotenv import load_dotenv
16
17from db import connect, init_schema, set_crawl_state
18from parallel import concurrency_env
19from pds_client import knot_xrpc
20from progress import banner, log, metric, phase, step, summary_block
21
22REPO_ROOT = Path(__file__).resolve().parent.parent
23CRAWL_KEY = "collaborators:fetch"
24PAGE_LIMIT = 1000
25
26
27@dataclass
28class CollabFetchResult:
29 repo_did: str
30 repo_uri: str | None
31 knot_hostname: str
32 status: str # ok | skipped_knot | error
33 collaborators: list[dict[str, Any]] = field(default_factory=list)
34 error: str | None = None
35
36
37def _repo_limit() -> int | None:
38 raw = os.getenv("TANGLED_COLLAB_REPO_LIMIT", "").strip()
39 if not raw:
40 return None
41 return max(1, int(raw))
42
43
44def _skip_existing() -> bool:
45 return os.getenv("TANGLED_COLLAB_REFRESH", "").strip().lower() not in (
46 "1",
47 "true",
48 "yes",
49 )
50
51
52def fetch_repo_collaborators(
53 client: httpx.Client,
54 *,
55 knot_hostname: str,
56 repo_did: str,
57) -> list[dict[str, Any]]:
58 items: list[dict[str, Any]] = []
59 cursor: str | None = None
60
61 while True:
62 params: dict[str, Any] = {
63 "subject": repo_did,
64 "limit": PAGE_LIMIT,
65 }
66 if cursor:
67 params["cursor"] = cursor
68
69 status, payload = knot_xrpc(
70 client,
71 knot_hostname,
72 "sh.tangled.repo.listCollaborators",
73 params,
74 )
75 if status != 200 or not isinstance(payload, dict):
76 raise RuntimeError(f"listCollaborators HTTP {status}")
77
78 page = payload.get("items") or []
79 if isinstance(page, list):
80 items.extend(item for item in page if isinstance(item, dict))
81
82 cursor = payload.get("cursor")
83 if not cursor or not page:
84 break
85
86 return items
87
88
89def upsert_collaborators(
90 conn,
91 *,
92 repo_did: str,
93 collaborators: list[dict[str, Any]],
94) -> int:
95 conn.execute(
96 "delete from tangled_repo_collaborators where repo_did = %s",
97 (repo_did,),
98 )
99
100 stored = 0
101 for item in collaborators:
102 collab_did = item.get("subject")
103 if not isinstance(collab_did, str) or not collab_did.startswith("did:"):
104 continue
105 conn.execute(
106 """
107 insert into tangled_repo_collaborators (
108 repo_did, collaborator_did, added_by, record_uri, record_cid,
109 created_at, last_synced_at
110 )
111 values (%s, %s, %s, %s, %s, %s::timestamptz, now())
112 on conflict (repo_did, collaborator_did) do update set
113 added_by = excluded.added_by,
114 record_uri = excluded.record_uri,
115 record_cid = excluded.record_cid,
116 created_at = excluded.created_at,
117 last_synced_at = now()
118 """,
119 (
120 repo_did,
121 collab_did,
122 item.get("addedBy") if isinstance(item.get("addedBy"), str) else None,
123 item.get("uri") if isinstance(item.get("uri"), str) else None,
124 item.get("cid") if isinstance(item.get("cid"), str) else None,
125 item.get("createdAt") if isinstance(item.get("createdAt"), str) else None,
126 ),
127 )
128 stored += 1
129
130 conn.execute(
131 """
132 insert into tangled_repo_collaborators_sync (repo_did, collaborator_count, synced_at)
133 values (%s, %s, now())
134 on conflict (repo_did) do update set
135 collaborator_count = excluded.collaborator_count,
136 synced_at = now()
137 """,
138 (repo_did, stored),
139 )
140 return stored
141
142
143def _fetch_one(repo: dict[str, Any], reachable: set[str]) -> CollabFetchResult:
144 repo_did = repo["repo_did"]
145 knot = repo.get("knot_hostname") or ""
146 base = CollabFetchResult(
147 repo_did=repo_did,
148 repo_uri=repo.get("uri"),
149 knot_hostname=knot,
150 status="error",
151 )
152
153 if not knot or knot not in reachable:
154 base.status = "skipped_knot"
155 base.error = f"knot not reachable: {knot or 'missing'}"
156 return base
157
158 try:
159 with httpx.Client(timeout=60.0, follow_redirects=True) as client:
160 collaborators = fetch_repo_collaborators(
161 client, knot_hostname=knot, repo_did=repo_did
162 )
163 base.collaborators = collaborators
164 base.status = "ok"
165 return base
166 except Exception as exc:
167 base.error = str(exc)
168 return base
169
170
171def run_fetch_collaborators(dsn: str) -> dict[str, int]:
172 workers = concurrency_env("TANGLED_COLLAB_CONCURRENCY", default=20)
173 repo_limit = _repo_limit()
174 skip_existing = _skip_existing()
175
176 banner("COLLABORATORS — knot listCollaborators for every repo")
177 log("collab", f"Concurrency: {workers}")
178 if repo_limit:
179 log("collab", f"Repo limit: {repo_limit}")
180 if skip_existing:
181 log(
182 "collab",
183 "Skip existing: on (set TANGLED_COLLAB_REFRESH=1 to re-fetch all)",
184 )
185 else:
186 log("collab", "Skip existing: off — refreshing every repo")
187
188 with connect(dsn) as conn:
189 reachable = {
190 row["hostname"]
191 for row in conn.execute(
192 "select hostname from tangled_knots where reachable = true"
193 ).fetchall()
194 }
195 skip_clause = ""
196 if skip_existing:
197 skip_clause = """
198 and not exists (
199 select 1 from tangled_repo_collaborators_sync s
200 where s.repo_did = tangled_repos.repo_did
201 )
202 """
203 query = f"""
204 select uri, repo_did, knot_hostname, owner_handle, name
205 from tangled_repos
206 where repo_did is not null
207 and knot_hostname is not null
208 {skip_clause}
209 order by uri
210 """
211 if repo_limit:
212 query += f" limit {repo_limit}"
213 repos = conn.execute(query).fetchall()
214 synced_count = 0
215 if skip_existing:
216 synced_count = conn.execute(
217 "select count(*) as n from tangled_repo_collaborators_sync"
218 ).fetchone()["n"]
219 total_eligible = conn.execute(
220 """
221 select count(*) as n from tangled_repos
222 where repo_did is not null and knot_hostname is not null
223 """
224 ).fetchone()["n"]
225
226 if not repos:
227 log("collab", "Nothing to fetch — all eligible repos already synced.")
228 return {
229 "repos_fetched": 0,
230 "collaborator_edges": 0,
231 "already_synced": total_eligible,
232 "skipped_knot": 0,
233 "errors": 0,
234 }
235
236 already_synced = synced_count if skip_existing else 0
237 if skip_existing:
238 metric("Eligible repos", total_eligible)
239 metric("Already synced (skipped)", already_synced)
240 metric("To fetch", len(repos))
241
242 stats = {
243 "repos_fetched": 0,
244 "collaborator_edges": 0,
245 "already_synced": already_synced,
246 "skipped_knot": 0,
247 "errors": 0,
248 }
249 done = 0
250 done_lock = threading.Lock()
251
252 phase(1, f"Parallel listCollaborators ({workers} workers)")
253
254 with connect(dsn) as conn:
255 set_crawl_state(
256 conn,
257 key=CRAWL_KEY,
258 status="running",
259 meta={"repo_count": len(repos), "workers": workers},
260 )
261 conn.commit()
262
263 with ThreadPoolExecutor(max_workers=workers) as pool:
264 futures = {
265 pool.submit(_fetch_one, dict(repo), reachable): repo for repo in repos
266 }
267
268 for future in as_completed(futures):
269 repo = futures[future]
270 label = f"{repo.get('owner_handle') or '?'}/{repo.get('name') or repo['repo_did'][:16]}"
271
272 try:
273 result = future.result()
274 except Exception as exc:
275 result = CollabFetchResult(
276 repo_did=repo["repo_did"],
277 repo_uri=repo.get("uri"),
278 knot_hostname=repo.get("knot_hostname") or "",
279 status="error",
280 error=str(exc),
281 )
282
283 with done_lock:
284 done += 1
285 n = done
286
287 if result.status == "ok":
288 count = upsert_collaborators(
289 conn,
290 repo_did=result.repo_did,
291 collaborators=result.collaborators,
292 )
293 stats["repos_fetched"] += 1
294 stats["collaborator_edges"] += count
295 if n <= 10 or n % 100 == 0 or count > 0:
296 step(
297 "collab",
298 n,
299 len(repos),
300 f"OK {label} {count} collaborator(s)",
301 )
302 elif result.status == "skipped_knot":
303 stats["skipped_knot"] += 1
304 if n <= 10 or n % 200 == 0:
305 step("collab", n, len(repos), f"SKIP {label} {result.error}")
306 else:
307 stats["errors"] += 1
308 if n <= 10 or n % 100 == 0:
309 step(
310 "collab",
311 n,
312 len(repos),
313 f"ERROR {label} {result.error or 'unknown'}",
314 )
315
316 if n % 50 == 0:
317 conn.commit()
318
319 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
320 conn.commit()
321
322 summary_block(
323 "Collaborators fetch complete",
324 [
325 f"Repos fetched: {stats['repos_fetched']}",
326 f"Collaborator edges: {stats['collaborator_edges']}",
327 f"Already synced: {stats['already_synced']}",
328 f"Skipped knot: {stats['skipped_knot']}",
329 f"Errors: {stats['errors']}",
330 "",
331 "Repos a user collaborates on:",
332 " select * from tangled_user_collaborations",
333 " where user_did = 'did:plc:...';",
334 ],
335 )
336 return stats
337
338
339def main() -> None:
340 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"):
341 if candidate.exists():
342 load_dotenv(candidate)
343 break
344 else:
345 load_dotenv()
346
347 dsn = os.getenv("DB_CONNECTION_STRING", "").strip()
348 if not dsn:
349 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr)
350 raise SystemExit(1)
351
352 init_schema(dsn)
353 run_fetch_collaborators(dsn)
354
355
356if __name__ == "__main__":
357 try:
358 main()
359 except KeyboardInterrupt:
360 print("\nInterrupted.", file=sys.stderr)
361 raise SystemExit(130) from None