This repository has no description
1from __future__ import annotations
2
3import json
4import os
5from typing import Any
6
7import httpx
8
9from db import connect, set_crawl_state, upsert_atproto_record, upsert_xrpc_snapshot
10from pds_client import (
11 DEFAULT_PDS,
12 describe_repo_on_knot,
13 knot_xrpc,
14 list_records,
15 params_hash,
16 pds_host_for_did,
17)
18from progress import banner, log, metric, phase, step, summary_block
19
20CRAWL_KEY = "stage4:repo_metadata"
21
22# Knot XRPC methods fetched per repo (deeper than Stage 2 metadata record alone).
23KNOT_METHODS: list[tuple[str, str, dict[str, Any] | None]] = [
24 ("sh.tangled.repo.getDefaultBranch", "repo", None),
25 ("sh.tangled.repo.languages", "repo", None),
26 ("sh.tangled.repo.branches", "repo", {"limit": 100}),
27 ("sh.tangled.repo.tags", "repo", {"limit": 100}),
28]
29
30COLLABORATOR_COLLECTION = "sh.tangled.repo.collaborator"
31
32
33def _repo_limit() -> int | None:
34 raw = os.getenv("TANGLED_STAGE4_REPO_LIMIT", "").strip()
35 if not raw:
36 return None
37 return max(1, int(raw))
38
39
40def _branch_limit() -> int:
41 return max(1, int(os.getenv("TANGLED_STAGE4_BRANCH_LIMIT", "100")))
42
43
44def _collab_page_limit() -> int:
45 return max(1, min(1000, int(os.getenv("TANGLED_STAGE4_COLLAB_LIMIT", "100"))))
46
47
48def _rkey_from_uri(uri: str) -> str:
49 return uri.rsplit("/", 1)[-1]
50
51
52def _store_snapshot(
53 conn,
54 *,
55 method: str,
56 repo_did: str,
57 params: dict[str, Any],
58 payload: Any,
59) -> bool:
60 if not isinstance(payload, (dict, list)):
61 return False
62 ph = params_hash(params)
63 upsert_xrpc_snapshot(
64 conn,
65 method=method,
66 repo_did=repo_did,
67 params=params,
68 params_hash=ph,
69 payload=payload,
70 )
71 return True
72
73
74def _fetch_knot_method(
75 client: httpx.Client,
76 conn,
77 *,
78 knot_hostname: str,
79 repo_did: str,
80 method: str,
81 param_key: str,
82 extra: dict[str, Any] | None,
83) -> tuple[bool, str | None]:
84 params: dict[str, Any] = {param_key: repo_did}
85 if extra:
86 params.update(extra)
87 if method == "sh.tangled.repo.branches":
88 params["limit"] = _branch_limit()
89
90 status, payload = knot_xrpc(client, knot_hostname, method, params)
91 if status != 200:
92 return False, f"HTTP {status}"
93
94 if isinstance(payload, dict) and payload.get("error"):
95 return False, str(payload.get("body", payload))
96
97 ok = _store_snapshot(conn, method=method, repo_did=repo_did, params=params, payload=payload)
98 return ok, None
99
100
101def _fetch_collaborators(
102 client: httpx.Client,
103 conn,
104 *,
105 knot_hostname: str,
106 repo_did: str,
107) -> int:
108 """Paginate sh.tangled.repo.listCollaborators (subject=repo_did)."""
109 stored = 0
110 cursor: str | None = None
111 page = 0
112
113 while True:
114 page += 1
115 params: dict[str, Any] = {
116 "subject": repo_did,
117 "limit": _collab_page_limit(),
118 }
119 if cursor:
120 params["cursor"] = cursor
121
122 status, payload = knot_xrpc(
123 client, knot_hostname, "sh.tangled.repo.listCollaborators", params
124 )
125 if status != 200 or not isinstance(payload, dict):
126 break
127
128 if _store_snapshot(
129 conn,
130 method="sh.tangled.repo.listCollaborators",
131 repo_did=repo_did,
132 params=params,
133 payload=payload,
134 ):
135 stored += 1
136
137 cursor = payload.get("cursor")
138 items = payload.get("items") or []
139 if not cursor or not items:
140 break
141
142 return stored
143
144
145def _fetch_pds_collaborator_records(
146 client: httpx.Client,
147 conn,
148 *,
149 owner_did: str,
150 repo_did: str,
151) -> int:
152 pds = pds_host_for_did(client, owner_did) or DEFAULT_PDS
153 stored = 0
154 cursor: str | None = None
155
156 while True:
157 try:
158 data = list_records(
159 client,
160 pds,
161 owner_did,
162 COLLABORATOR_COLLECTION,
163 cursor=cursor,
164 limit=100,
165 )
166 except httpx.HTTPError:
167 break
168
169 records = data.get("records") or []
170 for rec in records:
171 uri = rec.get("uri")
172 value = rec.get("value")
173 if not isinstance(uri, str) or not isinstance(value, dict):
174 continue
175 if value.get("repo") != repo_did:
176 continue
177
178 upsert_atproto_record(
179 conn,
180 uri=uri,
181 author_did=owner_did,
182 collection=COLLABORATOR_COLLECTION,
183 rkey=_rkey_from_uri(uri),
184 payload=value,
185 cid=rec.get("cid") if isinstance(rec.get("cid"), str) else None,
186 repo_did=repo_did,
187 )
188 stored += 1
189
190 cursor = data.get("cursor")
191 if not cursor or not records:
192 break
193
194 return stored
195
196
197def run_stage4(dsn: str) -> dict[str, Any]:
198 banner("STAGE 4 — Deeper repo metadata")
199 log("stage 4", "Enriches each repo with knot git stats + collaborators.")
200 log("stage 4", "Stores raw XRPC JSON in tangled_xrpc_snapshots.")
201 log("stage 4", "Stores collaborator records in tangled_atproto_records.")
202
203 repo_limit = _repo_limit()
204 if repo_limit:
205 log("stage 4", f"Repo limit: {repo_limit} (unset TANGLED_STAGE4_REPO_LIMIT for all)")
206
207 with connect(dsn) as conn:
208 reachable = {
209 row["hostname"]
210 for row in conn.execute(
211 "select hostname from tangled_knots where reachable = true"
212 ).fetchall()
213 }
214 query = """
215 select uri, owner_did, repo_did, knot_hostname, name, record_raw
216 from tangled_repos
217 where repo_did is not null
218 order by uri
219 """
220 if repo_limit:
221 query += f" limit {repo_limit}"
222 repos = conn.execute(query).fetchall()
223
224 if not repos:
225 raise RuntimeError("No repos with repo_did in tangled_repos. Run stage2-repos first.")
226
227 log("stage 4", f"Found {len(repos)} repos to enrich.")
228
229 stats = {
230 "repos_processed": 0,
231 "repos_skipped_knot": 0,
232 "describe_repo_updated": 0,
233 "xrpc_snapshots": 0,
234 "collaborator_records": 0,
235 "errors": 0,
236 }
237
238 phase(1, "Knot metadata (branches, tags, languages, collaborators)")
239 phase(2, "Owner PDS collaborator records")
240
241 with httpx.Client(timeout=60.0, follow_redirects=True) as client, connect(dsn) as conn:
242 set_crawl_state(conn, key=CRAWL_KEY, status="running", meta={"repo_count": len(repos)})
243 conn.commit()
244
245 for i, repo in enumerate(repos, start=1):
246 repo_did = repo["repo_did"]
247 knot = repo["knot_hostname"]
248 owner_did = repo["owner_did"]
249 label = repo["name"] or repo_did
250
251 if not knot or knot not in reachable:
252 stats["repos_skipped_knot"] += 1
253 if i <= 5 or i % 50 == 0:
254 step("stage 4", i, len(repos), f"SKIP {label} — knot unreachable ({knot})")
255 continue
256
257 try:
258 # describeRepo → tangled_repos.describe_raw
259 describe = describe_repo_on_knot(client, knot, repo_did)
260 if describe:
261 conn.execute(
262 """
263 update tangled_repos
264 set describe_raw = %s::jsonb, last_synced_at = now()
265 where uri = %s
266 """,
267 (json.dumps(describe), repo["uri"]),
268 )
269 stats["describe_repo_updated"] += 1
270
271 # Knot XRPC snapshots
272 for method, param_key, extra in KNOT_METHODS:
273 ok, err = _fetch_knot_method(
274 client,
275 conn,
276 knot_hostname=knot,
277 repo_did=repo_did,
278 method=method,
279 param_key=param_key,
280 extra=extra,
281 )
282 if ok:
283 stats["xrpc_snapshots"] += 1
284 elif err and i <= 3:
285 log("stage 4", f" {method}: {err}")
286
287 stats["xrpc_snapshots"] += _fetch_collaborators(
288 client, conn, knot_hostname=knot, repo_did=repo_did
289 )
290
291 # PDS collaborator records
292 collab_n = _fetch_pds_collaborator_records(
293 client, conn, owner_did=owner_did, repo_did=repo_did
294 )
295 stats["collaborator_records"] += collab_n
296
297 stats["repos_processed"] += 1
298 step(
299 "stage 4",
300 i,
301 len(repos),
302 f"{label} snapshots+ collab_records={collab_n}",
303 )
304
305 except httpx.HTTPError as exc:
306 stats["errors"] += 1
307 step("stage 4", i, len(repos), f"ERROR {label}: {exc}")
308
309 if i % 25 == 0:
310 conn.commit()
311
312 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
313 conn.commit()
314
315 summary_block(
316 "Stage 4 complete",
317 [
318 f"Repos processed: {stats['repos_processed']}",
319 f"Skipped (bad knot): {stats['repos_skipped_knot']}",
320 f"describeRepo updated: {stats['describe_repo_updated']}",
321 f"XRPC snapshots stored: {stats['xrpc_snapshots']}",
322 f"Collaborator records: {stats['collaborator_records']}",
323 f"Errors: {stats['errors']}",
324 ],
325 )
326 return stats