This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 11 kB View raw
1#!/usr/bin/env python3 2"""Fetch collaborator lists for all repos via knot listCollaborators.""" 3 4from __future__ import annotations 5 6import os 7import sys 8import threading 9from concurrent.futures import ThreadPoolExecutor, as_completed 10from dataclasses import dataclass, field 11from pathlib import Path 12from typing import Any 13 14import httpx 15from dotenv import load_dotenv 16 17from db import connect, init_schema, set_crawl_state 18from parallel import concurrency_env 19from pds_client import knot_xrpc 20from progress import banner, log, metric, phase, step, summary_block 21 22REPO_ROOT = Path(__file__).resolve().parent.parent 23CRAWL_KEY = "collaborators:fetch" 24PAGE_LIMIT = 1000 25 26 27@dataclass 28class CollabFetchResult: 29 repo_did: str 30 repo_uri: str | None 31 knot_hostname: str 32 status: str # ok | skipped_knot | error 33 collaborators: list[dict[str, Any]] = field(default_factory=list) 34 error: str | None = None 35 36 37def _repo_limit() -> int | None: 38 raw = os.getenv("TANGLED_COLLAB_REPO_LIMIT", "").strip() 39 if not raw: 40 return None 41 return max(1, int(raw)) 42 43 44def _skip_existing() -> bool: 45 return os.getenv("TANGLED_COLLAB_REFRESH", "").strip().lower() not in ( 46 "1", 47 "true", 48 "yes", 49 ) 50 51 52def fetch_repo_collaborators( 53 client: httpx.Client, 54 *, 55 knot_hostname: str, 56 repo_did: str, 57) -> list[dict[str, Any]]: 58 items: list[dict[str, Any]] = [] 59 cursor: str | None = None 60 61 while True: 62 params: dict[str, Any] = { 63 "subject": repo_did, 64 "limit": PAGE_LIMIT, 65 } 66 if cursor: 67 params["cursor"] = cursor 68 69 status, payload = knot_xrpc( 70 client, 71 knot_hostname, 72 "sh.tangled.repo.listCollaborators", 73 params, 74 ) 75 if status != 200 or not isinstance(payload, dict): 76 raise RuntimeError(f"listCollaborators HTTP {status}") 77 78 page = payload.get("items") or [] 79 if isinstance(page, list): 80 items.extend(item for item in page if isinstance(item, dict)) 81 82 cursor = payload.get("cursor") 83 if not cursor or not page: 84 break 85 86 return items 87 88 89def upsert_collaborators( 90 conn, 91 *, 92 repo_did: str, 93 collaborators: list[dict[str, Any]], 94) -> int: 95 conn.execute( 96 "delete from tangled_repo_collaborators where repo_did = %s", 97 (repo_did,), 98 ) 99 100 stored = 0 101 for item in collaborators: 102 collab_did = item.get("subject") 103 if not isinstance(collab_did, str) or not collab_did.startswith("did:"): 104 continue 105 conn.execute( 106 """ 107 insert into tangled_repo_collaborators ( 108 repo_did, collaborator_did, added_by, record_uri, record_cid, 109 created_at, last_synced_at 110 ) 111 values (%s, %s, %s, %s, %s, %s::timestamptz, now()) 112 on conflict (repo_did, collaborator_did) do update set 113 added_by = excluded.added_by, 114 record_uri = excluded.record_uri, 115 record_cid = excluded.record_cid, 116 created_at = excluded.created_at, 117 last_synced_at = now() 118 """, 119 ( 120 repo_did, 121 collab_did, 122 item.get("addedBy") if isinstance(item.get("addedBy"), str) else None, 123 item.get("uri") if isinstance(item.get("uri"), str) else None, 124 item.get("cid") if isinstance(item.get("cid"), str) else None, 125 item.get("createdAt") if isinstance(item.get("createdAt"), str) else None, 126 ), 127 ) 128 stored += 1 129 130 conn.execute( 131 """ 132 insert into tangled_repo_collaborators_sync (repo_did, collaborator_count, synced_at) 133 values (%s, %s, now()) 134 on conflict (repo_did) do update set 135 collaborator_count = excluded.collaborator_count, 136 synced_at = now() 137 """, 138 (repo_did, stored), 139 ) 140 return stored 141 142 143def _fetch_one(repo: dict[str, Any], reachable: set[str]) -> CollabFetchResult: 144 repo_did = repo["repo_did"] 145 knot = repo.get("knot_hostname") or "" 146 base = CollabFetchResult( 147 repo_did=repo_did, 148 repo_uri=repo.get("uri"), 149 knot_hostname=knot, 150 status="error", 151 ) 152 153 if not knot or knot not in reachable: 154 base.status = "skipped_knot" 155 base.error = f"knot not reachable: {knot or 'missing'}" 156 return base 157 158 try: 159 with httpx.Client(timeout=60.0, follow_redirects=True) as client: 160 collaborators = fetch_repo_collaborators( 161 client, knot_hostname=knot, repo_did=repo_did 162 ) 163 base.collaborators = collaborators 164 base.status = "ok" 165 return base 166 except Exception as exc: 167 base.error = str(exc) 168 return base 169 170 171def run_fetch_collaborators(dsn: str) -> dict[str, int]: 172 workers = concurrency_env("TANGLED_COLLAB_CONCURRENCY", default=20) 173 repo_limit = _repo_limit() 174 skip_existing = _skip_existing() 175 176 banner("COLLABORATORS — knot listCollaborators for every repo") 177 log("collab", f"Concurrency: {workers}") 178 if repo_limit: 179 log("collab", f"Repo limit: {repo_limit}") 180 if skip_existing: 181 log( 182 "collab", 183 "Skip existing: on (set TANGLED_COLLAB_REFRESH=1 to re-fetch all)", 184 ) 185 else: 186 log("collab", "Skip existing: off — refreshing every repo") 187 188 with connect(dsn) as conn: 189 reachable = { 190 row["hostname"] 191 for row in conn.execute( 192 "select hostname from tangled_knots where reachable = true" 193 ).fetchall() 194 } 195 skip_clause = "" 196 if skip_existing: 197 skip_clause = """ 198 and not exists ( 199 select 1 from tangled_repo_collaborators_sync s 200 where s.repo_did = tangled_repos.repo_did 201 ) 202 """ 203 query = f""" 204 select uri, repo_did, knot_hostname, owner_handle, name 205 from tangled_repos 206 where repo_did is not null 207 and knot_hostname is not null 208 {skip_clause} 209 order by uri 210 """ 211 if repo_limit: 212 query += f" limit {repo_limit}" 213 repos = conn.execute(query).fetchall() 214 synced_count = 0 215 if skip_existing: 216 synced_count = conn.execute( 217 "select count(*) as n from tangled_repo_collaborators_sync" 218 ).fetchone()["n"] 219 total_eligible = conn.execute( 220 """ 221 select count(*) as n from tangled_repos 222 where repo_did is not null and knot_hostname is not null 223 """ 224 ).fetchone()["n"] 225 226 if not repos: 227 log("collab", "Nothing to fetch — all eligible repos already synced.") 228 return { 229 "repos_fetched": 0, 230 "collaborator_edges": 0, 231 "already_synced": total_eligible, 232 "skipped_knot": 0, 233 "errors": 0, 234 } 235 236 already_synced = synced_count if skip_existing else 0 237 if skip_existing: 238 metric("Eligible repos", total_eligible) 239 metric("Already synced (skipped)", already_synced) 240 metric("To fetch", len(repos)) 241 242 stats = { 243 "repos_fetched": 0, 244 "collaborator_edges": 0, 245 "already_synced": already_synced, 246 "skipped_knot": 0, 247 "errors": 0, 248 } 249 done = 0 250 done_lock = threading.Lock() 251 252 phase(1, f"Parallel listCollaborators ({workers} workers)") 253 254 with connect(dsn) as conn: 255 set_crawl_state( 256 conn, 257 key=CRAWL_KEY, 258 status="running", 259 meta={"repo_count": len(repos), "workers": workers}, 260 ) 261 conn.commit() 262 263 with ThreadPoolExecutor(max_workers=workers) as pool: 264 futures = { 265 pool.submit(_fetch_one, dict(repo), reachable): repo for repo in repos 266 } 267 268 for future in as_completed(futures): 269 repo = futures[future] 270 label = f"{repo.get('owner_handle') or '?'}/{repo.get('name') or repo['repo_did'][:16]}" 271 272 try: 273 result = future.result() 274 except Exception as exc: 275 result = CollabFetchResult( 276 repo_did=repo["repo_did"], 277 repo_uri=repo.get("uri"), 278 knot_hostname=repo.get("knot_hostname") or "", 279 status="error", 280 error=str(exc), 281 ) 282 283 with done_lock: 284 done += 1 285 n = done 286 287 if result.status == "ok": 288 count = upsert_collaborators( 289 conn, 290 repo_did=result.repo_did, 291 collaborators=result.collaborators, 292 ) 293 stats["repos_fetched"] += 1 294 stats["collaborator_edges"] += count 295 if n <= 10 or n % 100 == 0 or count > 0: 296 step( 297 "collab", 298 n, 299 len(repos), 300 f"OK {label} {count} collaborator(s)", 301 ) 302 elif result.status == "skipped_knot": 303 stats["skipped_knot"] += 1 304 if n <= 10 or n % 200 == 0: 305 step("collab", n, len(repos), f"SKIP {label} {result.error}") 306 else: 307 stats["errors"] += 1 308 if n <= 10 or n % 100 == 0: 309 step( 310 "collab", 311 n, 312 len(repos), 313 f"ERROR {label} {result.error or 'unknown'}", 314 ) 315 316 if n % 50 == 0: 317 conn.commit() 318 319 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 320 conn.commit() 321 322 summary_block( 323 "Collaborators fetch complete", 324 [ 325 f"Repos fetched: {stats['repos_fetched']}", 326 f"Collaborator edges: {stats['collaborator_edges']}", 327 f"Already synced: {stats['already_synced']}", 328 f"Skipped knot: {stats['skipped_knot']}", 329 f"Errors: {stats['errors']}", 330 "", 331 "Repos a user collaborates on:", 332 " select * from tangled_user_collaborations", 333 " where user_did = 'did:plc:...';", 334 ], 335 ) 336 return stats 337 338 339def main() -> None: 340 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"): 341 if candidate.exists(): 342 load_dotenv(candidate) 343 break 344 else: 345 load_dotenv() 346 347 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 348 if not dsn: 349 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr) 350 raise SystemExit(1) 351 352 init_schema(dsn) 353 run_fetch_collaborators(dsn) 354 355 356if __name__ == "__main__": 357 try: 358 main() 359 except KeyboardInterrupt: 360 print("\nInterrupted.", file=sys.stderr) 361 raise SystemExit(130) from None