This repository has no description
1#!/usr/bin/env python3
2"""Fetch and store README files from knot git for all scraped repos."""
3
4from __future__ import annotations
5
6import os
7import sys
8import threading
9from concurrent.futures import ThreadPoolExecutor, as_completed
10from dataclasses import dataclass
11from pathlib import Path
12from typing import Any
13
14import httpx
15from dotenv import load_dotenv
16
17from db import connect, init_schema, set_crawl_state
18from parallel import concurrency_env
19from pds_client import knot_xrpc
20from progress import banner, log, metric, phase, step, summary_block
21
22REPO_ROOT = Path(__file__).resolve().parent.parent
23CRAWL_KEY = "readmes:check"
24README_NAMES = frozenset(
25 {"readme.md", "readme", "readme.markdown", "readme.mdown", "readme.mkd"}
26)
27
28
29@dataclass
30class ReadmeResult:
31 repo_did: str
32 repo_uri: str | None
33 owner_handle: str | None
34 repo_name: str | None
35 knot_hostname: str
36 status: str
37 readme_path: str | None = None
38 content: str | None = None
39 size_bytes: int | None = None
40 error_message: str | None = None
41
42
43def _repo_limit() -> int | None:
44 raw = os.getenv("TANGLED_README_REPO_LIMIT", "").strip()
45 if not raw:
46 return None
47 return max(1, int(raw))
48
49
50def _skip_existing() -> bool:
51 return os.getenv("TANGLED_README_REFRESH", "").strip().lower() not in (
52 "1",
53 "true",
54 "yes",
55 )
56
57
58def _repos_query(*, skip_existing: bool, repo_limit: int | None) -> str:
59 skip_clause = ""
60 if skip_existing:
61 skip_clause = """
62 and not exists (
63 select 1 from tangled_readmes t
64 where t.repo_did = tangled_repos.repo_did
65 and t.status in ('found', 'missing')
66 )
67 """
68 query = f"""
69 select repo_did, uri, owner_handle, name, knot_hostname
70 from tangled_repos
71 where repo_did is not null
72 and knot_hostname is not null
73 {skip_clause}
74 order by uri
75 """
76 if repo_limit:
77 query += f" limit {repo_limit}"
78 return query
79
80
81def _find_readme_in_tree(tree: dict[str, Any]) -> str | None:
82 for entry in tree.get("files") or []:
83 if not isinstance(entry, dict):
84 continue
85 name = entry.get("name")
86 if isinstance(name, str) and name.lower() in README_NAMES:
87 if entry.get("type") == "file" or entry.get("mode") in (
88 "100644",
89 "100755",
90 "blob",
91 ):
92 return name
93 # tree listing uses name only for files
94 if entry.get("type") != "dir":
95 return name
96 return None
97
98
99def fetch_readme(
100 client: httpx.Client,
101 *,
102 knot_hostname: str,
103 repo_did: str,
104) -> ReadmeResult:
105 base = ReadmeResult(
106 repo_did=repo_did,
107 repo_uri=None,
108 owner_handle=None,
109 repo_name=None,
110 knot_hostname=knot_hostname,
111 status="error",
112 )
113
114 status, tree = knot_xrpc(
115 client,
116 knot_hostname,
117 "sh.tangled.repo.tree",
118 {"repo": repo_did, "ref": "HEAD"},
119 )
120 if status != 200 or not isinstance(tree, dict):
121 base.status = "error"
122 base.error_message = f"tree HTTP {status}"
123 return base
124
125 readme_path = _find_readme_in_tree(tree)
126 if not readme_path:
127 base.status = "missing"
128 return base
129
130 status, blob = knot_xrpc(
131 client,
132 knot_hostname,
133 "sh.tangled.repo.blob",
134 {"repo": repo_did, "ref": "HEAD", "path": readme_path},
135 )
136 if status != 200 or not isinstance(blob, dict):
137 base.status = "error"
138 base.readme_path = readme_path
139 base.error_message = f"blob HTTP {status}"
140 return base
141
142 content = blob.get("content")
143 if not isinstance(content, str):
144 base.status = "error"
145 base.readme_path = readme_path
146 base.error_message = "blob response missing content"
147 return base
148
149 base.status = "found"
150 base.readme_path = readme_path
151 base.content = content
152 base.size_bytes = len(content.encode("utf-8"))
153 return base
154
155
156def upsert_readme(conn, row: ReadmeResult) -> None:
157 conn.execute(
158 """
159 insert into tangled_readmes (
160 repo_did, repo_uri, owner_handle, repo_name, knot_hostname,
161 readme_path, status, content, size_bytes, error_message, fetched_at
162 )
163 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
164 on conflict (repo_did) do update set
165 repo_uri = excluded.repo_uri,
166 owner_handle = excluded.owner_handle,
167 repo_name = excluded.repo_name,
168 knot_hostname = excluded.knot_hostname,
169 readme_path = excluded.readme_path,
170 status = excluded.status,
171 content = excluded.content,
172 size_bytes = excluded.size_bytes,
173 error_message = excluded.error_message,
174 fetched_at = now(),
175 embedding = case
176 when tangled_readmes.content is distinct from excluded.content then null
177 else tangled_readmes.embedding
178 end,
179 embedding_model = case
180 when tangled_readmes.content is distinct from excluded.content then null
181 else tangled_readmes.embedding_model
182 end,
183 embedded_at = case
184 when tangled_readmes.content is distinct from excluded.content then null
185 else tangled_readmes.embedded_at
186 end
187 """,
188 (
189 row.repo_did,
190 row.repo_uri,
191 row.owner_handle,
192 row.repo_name,
193 row.knot_hostname,
194 row.readme_path,
195 row.status,
196 row.content,
197 row.size_bytes,
198 row.error_message,
199 ),
200 )
201
202
203def run_check_readmes(dsn: str) -> dict[str, int]:
204 workers = concurrency_env("TANGLED_README_CONCURRENCY", default=20)
205 repo_limit = _repo_limit()
206
207 banner("README CHECK — fetch README from knot git for each repo")
208 log("readmes", f"Concurrency: {workers}")
209 if repo_limit:
210 log("readmes", f"Repo limit: {repo_limit}")
211 skip_existing = _skip_existing()
212 if skip_existing:
213 log(
214 "readmes",
215 "Skip existing: on — found/missing rows kept (set TANGLED_README_REFRESH=1 to re-fetch)",
216 )
217 else:
218 log("readmes", "Skip existing: off — re-fetching all")
219
220 with connect(dsn) as conn:
221 reachable = {
222 r["hostname"]
223 for r in conn.execute(
224 "select hostname from tangled_knots where reachable = true"
225 ).fetchall()
226 }
227 total_eligible = conn.execute(
228 """
229 select count(*) as n from tangled_repos
230 where repo_did is not null and knot_hostname is not null
231 """
232 ).fetchone()["n"]
233 repos = conn.execute(
234 _repos_query(skip_existing=skip_existing, repo_limit=repo_limit)
235 ).fetchall()
236
237 if not repos:
238 if skip_existing:
239 log("readmes", "Nothing to fetch — all eligible repos already checked.")
240 return {
241 "found": 0,
242 "missing": 0,
243 "error": 0,
244 "skipped": 0,
245 "already_in_db": total_eligible,
246 }
247 raise RuntimeError("No repos with repo_did in tangled_repos.")
248
249 already_in_db = total_eligible - len(repos) if skip_existing else 0
250 if skip_existing:
251 metric("Eligible repos", total_eligible)
252 metric("Already in DB (skipped)", already_in_db)
253 metric("To fetch", len(repos))
254 log("readmes", f"Checking READMEs for {len(repos)} repos …")
255
256 stats = {
257 "found": 0,
258 "missing": 0,
259 "error": 0,
260 "skipped": 0,
261 "already_in_db": already_in_db,
262 }
263 stats_lock = threading.Lock()
264 done = 0
265 done_lock = threading.Lock()
266
267 phase(1, "Parallel tree + blob fetch on knots")
268
269 def work(repo: dict[str, Any]) -> ReadmeResult:
270 knot = repo["knot_hostname"]
271 repo_did = repo["repo_did"]
272 if knot not in reachable:
273 return ReadmeResult(
274 repo_did=repo_did,
275 repo_uri=repo.get("uri"),
276 owner_handle=repo.get("owner_handle"),
277 repo_name=repo.get("name"),
278 knot_hostname=knot or "",
279 status="skipped",
280 error_message=f"knot not reachable: {knot}",
281 )
282 with httpx.Client(timeout=60.0, follow_redirects=True) as client:
283 result = fetch_readme(client, knot_hostname=knot, repo_did=repo_did)
284 result.repo_uri = repo.get("uri")
285 result.owner_handle = repo.get("owner_handle")
286 result.repo_name = repo.get("name")
287 return result
288
289 with connect(dsn) as conn:
290 set_crawl_state(
291 conn,
292 key=CRAWL_KEY,
293 status="running",
294 meta={"repo_count": len(repos), "workers": workers},
295 )
296 conn.commit()
297
298 with ThreadPoolExecutor(max_workers=workers) as pool:
299 futures = {pool.submit(work, dict(repo)): repo for repo in repos}
300
301 for future in as_completed(futures):
302 repo = futures[future]
303 label = f"{repo.get('owner_handle') or '?'}/{repo.get('name') or repo['repo_did'][:16]}"
304
305 try:
306 result = future.result()
307 except Exception as exc:
308 result = ReadmeResult(
309 repo_did=repo["repo_did"],
310 repo_uri=repo.get("uri"),
311 owner_handle=repo.get("owner_handle"),
312 repo_name=repo.get("name"),
313 knot_hostname=repo.get("knot_hostname") or "",
314 status="error",
315 error_message=str(exc),
316 )
317
318 upsert_readme(conn, result)
319
320 with stats_lock:
321 stats[result.status if result.status in stats else "error"] += 1
322
323 with done_lock:
324 done += 1
325 n = done
326
327 if result.status == "found":
328 if n <= 10 or n % 50 == 0:
329 step(
330 "readmes",
331 n,
332 len(repos),
333 f"OK {label} {result.readme_path} ({result.size_bytes} B)",
334 )
335 elif n <= 10 or n % 100 == 0:
336 step(
337 "readmes",
338 n,
339 len(repos),
340 f"{result.status.upper()} {label} {result.error_message or ''}",
341 )
342
343 if n % 50 == 0:
344 conn.commit()
345
346 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats)
347 conn.commit()
348
349 summary_block(
350 "README check complete",
351 [
352 f"Repos checked: {len(repos)}",
353 f"Already in DB: {stats['already_in_db']}",
354 f"Found README: {stats['found']}",
355 f"Missing README: {stats['missing']}",
356 f"Errors: {stats['error']}",
357 f"Skipped knot: {stats['skipped']}",
358 "",
359 "Query: select status, count(*) from tangled_readmes group by 1;",
360 ],
361 )
362 return stats
363
364
365def main() -> None:
366 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"):
367 if candidate.exists():
368 load_dotenv(candidate)
369 break
370 else:
371 load_dotenv()
372
373 dsn = os.getenv("DB_CONNECTION_STRING", "").strip()
374 if not dsn:
375 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr)
376 raise SystemExit(1)
377
378 init_schema(dsn)
379 run_check_readmes(dsn)
380
381
382if __name__ == "__main__":
383 try:
384 main()
385 except KeyboardInterrupt:
386 print("\nInterrupted.", file=sys.stderr)
387 raise SystemExit(130) from None