This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.8 kB View raw
1from __future__ import annotations 2 3import json 4import os 5from typing import Any 6 7import httpx 8 9from db import connect, set_crawl_state, upsert_atproto_record, upsert_xrpc_snapshot 10from pds_client import ( 11 DEFAULT_PDS, 12 describe_repo_on_knot, 13 knot_xrpc, 14 list_records, 15 params_hash, 16 pds_host_for_did, 17) 18from progress import banner, log, metric, phase, step, summary_block 19 20CRAWL_KEY = "stage4:repo_metadata" 21 22# Knot XRPC methods fetched per repo (deeper than Stage 2 metadata record alone). 23KNOT_METHODS: list[tuple[str, str, dict[str, Any] | None]] = [ 24 ("sh.tangled.repo.getDefaultBranch", "repo", None), 25 ("sh.tangled.repo.languages", "repo", None), 26 ("sh.tangled.repo.branches", "repo", {"limit": 100}), 27 ("sh.tangled.repo.tags", "repo", {"limit": 100}), 28] 29 30COLLABORATOR_COLLECTION = "sh.tangled.repo.collaborator" 31 32 33def _repo_limit() -> int | None: 34 raw = os.getenv("TANGLED_STAGE4_REPO_LIMIT", "").strip() 35 if not raw: 36 return None 37 return max(1, int(raw)) 38 39 40def _branch_limit() -> int: 41 return max(1, int(os.getenv("TANGLED_STAGE4_BRANCH_LIMIT", "100"))) 42 43 44def _collab_page_limit() -> int: 45 return max(1, min(1000, int(os.getenv("TANGLED_STAGE4_COLLAB_LIMIT", "100")))) 46 47 48def _rkey_from_uri(uri: str) -> str: 49 return uri.rsplit("/", 1)[-1] 50 51 52def _store_snapshot( 53 conn, 54 *, 55 method: str, 56 repo_did: str, 57 params: dict[str, Any], 58 payload: Any, 59) -> bool: 60 if not isinstance(payload, (dict, list)): 61 return False 62 ph = params_hash(params) 63 upsert_xrpc_snapshot( 64 conn, 65 method=method, 66 repo_did=repo_did, 67 params=params, 68 params_hash=ph, 69 payload=payload, 70 ) 71 return True 72 73 74def _fetch_knot_method( 75 client: httpx.Client, 76 conn, 77 *, 78 knot_hostname: str, 79 repo_did: str, 80 method: str, 81 param_key: str, 82 extra: dict[str, Any] | None, 83) -> tuple[bool, str | None]: 84 params: dict[str, Any] = {param_key: repo_did} 85 if extra: 86 params.update(extra) 87 if method == "sh.tangled.repo.branches": 88 params["limit"] = _branch_limit() 89 90 status, payload = knot_xrpc(client, knot_hostname, method, params) 91 if status != 200: 92 return False, f"HTTP {status}" 93 94 if isinstance(payload, dict) and payload.get("error"): 95 return False, str(payload.get("body", payload)) 96 97 ok = _store_snapshot(conn, method=method, repo_did=repo_did, params=params, payload=payload) 98 return ok, None 99 100 101def _fetch_collaborators( 102 client: httpx.Client, 103 conn, 104 *, 105 knot_hostname: str, 106 repo_did: str, 107) -> int: 108 """Paginate sh.tangled.repo.listCollaborators (subject=repo_did).""" 109 stored = 0 110 cursor: str | None = None 111 page = 0 112 113 while True: 114 page += 1 115 params: dict[str, Any] = { 116 "subject": repo_did, 117 "limit": _collab_page_limit(), 118 } 119 if cursor: 120 params["cursor"] = cursor 121 122 status, payload = knot_xrpc( 123 client, knot_hostname, "sh.tangled.repo.listCollaborators", params 124 ) 125 if status != 200 or not isinstance(payload, dict): 126 break 127 128 if _store_snapshot( 129 conn, 130 method="sh.tangled.repo.listCollaborators", 131 repo_did=repo_did, 132 params=params, 133 payload=payload, 134 ): 135 stored += 1 136 137 cursor = payload.get("cursor") 138 items = payload.get("items") or [] 139 if not cursor or not items: 140 break 141 142 return stored 143 144 145def _fetch_pds_collaborator_records( 146 client: httpx.Client, 147 conn, 148 *, 149 owner_did: str, 150 repo_did: str, 151) -> int: 152 pds = pds_host_for_did(client, owner_did) or DEFAULT_PDS 153 stored = 0 154 cursor: str | None = None 155 156 while True: 157 try: 158 data = list_records( 159 client, 160 pds, 161 owner_did, 162 COLLABORATOR_COLLECTION, 163 cursor=cursor, 164 limit=100, 165 ) 166 except httpx.HTTPError: 167 break 168 169 records = data.get("records") or [] 170 for rec in records: 171 uri = rec.get("uri") 172 value = rec.get("value") 173 if not isinstance(uri, str) or not isinstance(value, dict): 174 continue 175 if value.get("repo") != repo_did: 176 continue 177 178 upsert_atproto_record( 179 conn, 180 uri=uri, 181 author_did=owner_did, 182 collection=COLLABORATOR_COLLECTION, 183 rkey=_rkey_from_uri(uri), 184 payload=value, 185 cid=rec.get("cid") if isinstance(rec.get("cid"), str) else None, 186 repo_did=repo_did, 187 ) 188 stored += 1 189 190 cursor = data.get("cursor") 191 if not cursor or not records: 192 break 193 194 return stored 195 196 197def run_stage4(dsn: str) -> dict[str, Any]: 198 banner("STAGE 4 — Deeper repo metadata") 199 log("stage 4", "Enriches each repo with knot git stats + collaborators.") 200 log("stage 4", "Stores raw XRPC JSON in tangled_xrpc_snapshots.") 201 log("stage 4", "Stores collaborator records in tangled_atproto_records.") 202 203 repo_limit = _repo_limit() 204 if repo_limit: 205 log("stage 4", f"Repo limit: {repo_limit} (unset TANGLED_STAGE4_REPO_LIMIT for all)") 206 207 with connect(dsn) as conn: 208 reachable = { 209 row["hostname"] 210 for row in conn.execute( 211 "select hostname from tangled_knots where reachable = true" 212 ).fetchall() 213 } 214 query = """ 215 select uri, owner_did, repo_did, knot_hostname, name, record_raw 216 from tangled_repos 217 where repo_did is not null 218 order by uri 219 """ 220 if repo_limit: 221 query += f" limit {repo_limit}" 222 repos = conn.execute(query).fetchall() 223 224 if not repos: 225 raise RuntimeError("No repos with repo_did in tangled_repos. Run stage2-repos first.") 226 227 log("stage 4", f"Found {len(repos)} repos to enrich.") 228 229 stats = { 230 "repos_processed": 0, 231 "repos_skipped_knot": 0, 232 "describe_repo_updated": 0, 233 "xrpc_snapshots": 0, 234 "collaborator_records": 0, 235 "errors": 0, 236 } 237 238 phase(1, "Knot metadata (branches, tags, languages, collaborators)") 239 phase(2, "Owner PDS collaborator records") 240 241 with httpx.Client(timeout=60.0, follow_redirects=True) as client, connect(dsn) as conn: 242 set_crawl_state(conn, key=CRAWL_KEY, status="running", meta={"repo_count": len(repos)}) 243 conn.commit() 244 245 for i, repo in enumerate(repos, start=1): 246 repo_did = repo["repo_did"] 247 knot = repo["knot_hostname"] 248 owner_did = repo["owner_did"] 249 label = repo["name"] or repo_did 250 251 if not knot or knot not in reachable: 252 stats["repos_skipped_knot"] += 1 253 if i <= 5 or i % 50 == 0: 254 step("stage 4", i, len(repos), f"SKIP {label} — knot unreachable ({knot})") 255 continue 256 257 try: 258 # describeRepo → tangled_repos.describe_raw 259 describe = describe_repo_on_knot(client, knot, repo_did) 260 if describe: 261 conn.execute( 262 """ 263 update tangled_repos 264 set describe_raw = %s::jsonb, last_synced_at = now() 265 where uri = %s 266 """, 267 (json.dumps(describe), repo["uri"]), 268 ) 269 stats["describe_repo_updated"] += 1 270 271 # Knot XRPC snapshots 272 for method, param_key, extra in KNOT_METHODS: 273 ok, err = _fetch_knot_method( 274 client, 275 conn, 276 knot_hostname=knot, 277 repo_did=repo_did, 278 method=method, 279 param_key=param_key, 280 extra=extra, 281 ) 282 if ok: 283 stats["xrpc_snapshots"] += 1 284 elif err and i <= 3: 285 log("stage 4", f" {method}: {err}") 286 287 stats["xrpc_snapshots"] += _fetch_collaborators( 288 client, conn, knot_hostname=knot, repo_did=repo_did 289 ) 290 291 # PDS collaborator records 292 collab_n = _fetch_pds_collaborator_records( 293 client, conn, owner_did=owner_did, repo_did=repo_did 294 ) 295 stats["collaborator_records"] += collab_n 296 297 stats["repos_processed"] += 1 298 step( 299 "stage 4", 300 i, 301 len(repos), 302 f"{label} snapshots+ collab_records={collab_n}", 303 ) 304 305 except httpx.HTTPError as exc: 306 stats["errors"] += 1 307 step("stage 4", i, len(repos), f"ERROR {label}: {exc}") 308 309 if i % 25 == 0: 310 conn.commit() 311 312 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 313 conn.commit() 314 315 summary_block( 316 "Stage 4 complete", 317 [ 318 f"Repos processed: {stats['repos_processed']}", 319 f"Skipped (bad knot): {stats['repos_skipped_knot']}", 320 f"describeRepo updated: {stats['describe_repo_updated']}", 321 f"XRPC snapshots stored: {stats['xrpc_snapshots']}", 322 f"Collaborator records: {stats['collaborator_records']}", 323 f"Errors: {stats['errors']}", 324 ], 325 ) 326 return stats