This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 12 kB View raw
1#!/usr/bin/env python3 2"""Fetch and store README files from knot git for all scraped repos.""" 3 4from __future__ import annotations 5 6import os 7import sys 8import threading 9from concurrent.futures import ThreadPoolExecutor, as_completed 10from dataclasses import dataclass 11from pathlib import Path 12from typing import Any 13 14import httpx 15from dotenv import load_dotenv 16 17from db import connect, init_schema, set_crawl_state 18from parallel import concurrency_env 19from pds_client import knot_xrpc 20from progress import banner, log, metric, phase, step, summary_block 21 22REPO_ROOT = Path(__file__).resolve().parent.parent 23CRAWL_KEY = "readmes:check" 24README_NAMES = frozenset( 25 {"readme.md", "readme", "readme.markdown", "readme.mdown", "readme.mkd"} 26) 27 28 29@dataclass 30class ReadmeResult: 31 repo_did: str 32 repo_uri: str | None 33 owner_handle: str | None 34 repo_name: str | None 35 knot_hostname: str 36 status: str 37 readme_path: str | None = None 38 content: str | None = None 39 size_bytes: int | None = None 40 error_message: str | None = None 41 42 43def _repo_limit() -> int | None: 44 raw = os.getenv("TANGLED_README_REPO_LIMIT", "").strip() 45 if not raw: 46 return None 47 return max(1, int(raw)) 48 49 50def _skip_existing() -> bool: 51 return os.getenv("TANGLED_README_REFRESH", "").strip().lower() not in ( 52 "1", 53 "true", 54 "yes", 55 ) 56 57 58def _repos_query(*, skip_existing: bool, repo_limit: int | None) -> str: 59 skip_clause = "" 60 if skip_existing: 61 skip_clause = """ 62 and not exists ( 63 select 1 from tangled_readmes t 64 where t.repo_did = tangled_repos.repo_did 65 and t.status in ('found', 'missing') 66 ) 67 """ 68 query = f""" 69 select repo_did, uri, owner_handle, name, knot_hostname 70 from tangled_repos 71 where repo_did is not null 72 and knot_hostname is not null 73 {skip_clause} 74 order by uri 75 """ 76 if repo_limit: 77 query += f" limit {repo_limit}" 78 return query 79 80 81def _find_readme_in_tree(tree: dict[str, Any]) -> str | None: 82 for entry in tree.get("files") or []: 83 if not isinstance(entry, dict): 84 continue 85 name = entry.get("name") 86 if isinstance(name, str) and name.lower() in README_NAMES: 87 if entry.get("type") == "file" or entry.get("mode") in ( 88 "100644", 89 "100755", 90 "blob", 91 ): 92 return name 93 # tree listing uses name only for files 94 if entry.get("type") != "dir": 95 return name 96 return None 97 98 99def fetch_readme( 100 client: httpx.Client, 101 *, 102 knot_hostname: str, 103 repo_did: str, 104) -> ReadmeResult: 105 base = ReadmeResult( 106 repo_did=repo_did, 107 repo_uri=None, 108 owner_handle=None, 109 repo_name=None, 110 knot_hostname=knot_hostname, 111 status="error", 112 ) 113 114 status, tree = knot_xrpc( 115 client, 116 knot_hostname, 117 "sh.tangled.repo.tree", 118 {"repo": repo_did, "ref": "HEAD"}, 119 ) 120 if status != 200 or not isinstance(tree, dict): 121 base.status = "error" 122 base.error_message = f"tree HTTP {status}" 123 return base 124 125 readme_path = _find_readme_in_tree(tree) 126 if not readme_path: 127 base.status = "missing" 128 return base 129 130 status, blob = knot_xrpc( 131 client, 132 knot_hostname, 133 "sh.tangled.repo.blob", 134 {"repo": repo_did, "ref": "HEAD", "path": readme_path}, 135 ) 136 if status != 200 or not isinstance(blob, dict): 137 base.status = "error" 138 base.readme_path = readme_path 139 base.error_message = f"blob HTTP {status}" 140 return base 141 142 content = blob.get("content") 143 if not isinstance(content, str): 144 base.status = "error" 145 base.readme_path = readme_path 146 base.error_message = "blob response missing content" 147 return base 148 149 base.status = "found" 150 base.readme_path = readme_path 151 base.content = content 152 base.size_bytes = len(content.encode("utf-8")) 153 return base 154 155 156def upsert_readme(conn, row: ReadmeResult) -> None: 157 conn.execute( 158 """ 159 insert into tangled_readmes ( 160 repo_did, repo_uri, owner_handle, repo_name, knot_hostname, 161 readme_path, status, content, size_bytes, error_message, fetched_at 162 ) 163 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now()) 164 on conflict (repo_did) do update set 165 repo_uri = excluded.repo_uri, 166 owner_handle = excluded.owner_handle, 167 repo_name = excluded.repo_name, 168 knot_hostname = excluded.knot_hostname, 169 readme_path = excluded.readme_path, 170 status = excluded.status, 171 content = excluded.content, 172 size_bytes = excluded.size_bytes, 173 error_message = excluded.error_message, 174 fetched_at = now(), 175 embedding = case 176 when tangled_readmes.content is distinct from excluded.content then null 177 else tangled_readmes.embedding 178 end, 179 embedding_model = case 180 when tangled_readmes.content is distinct from excluded.content then null 181 else tangled_readmes.embedding_model 182 end, 183 embedded_at = case 184 when tangled_readmes.content is distinct from excluded.content then null 185 else tangled_readmes.embedded_at 186 end 187 """, 188 ( 189 row.repo_did, 190 row.repo_uri, 191 row.owner_handle, 192 row.repo_name, 193 row.knot_hostname, 194 row.readme_path, 195 row.status, 196 row.content, 197 row.size_bytes, 198 row.error_message, 199 ), 200 ) 201 202 203def run_check_readmes(dsn: str) -> dict[str, int]: 204 workers = concurrency_env("TANGLED_README_CONCURRENCY", default=20) 205 repo_limit = _repo_limit() 206 207 banner("README CHECK — fetch README from knot git for each repo") 208 log("readmes", f"Concurrency: {workers}") 209 if repo_limit: 210 log("readmes", f"Repo limit: {repo_limit}") 211 skip_existing = _skip_existing() 212 if skip_existing: 213 log( 214 "readmes", 215 "Skip existing: on — found/missing rows kept (set TANGLED_README_REFRESH=1 to re-fetch)", 216 ) 217 else: 218 log("readmes", "Skip existing: off — re-fetching all") 219 220 with connect(dsn) as conn: 221 reachable = { 222 r["hostname"] 223 for r in conn.execute( 224 "select hostname from tangled_knots where reachable = true" 225 ).fetchall() 226 } 227 total_eligible = conn.execute( 228 """ 229 select count(*) as n from tangled_repos 230 where repo_did is not null and knot_hostname is not null 231 """ 232 ).fetchone()["n"] 233 repos = conn.execute( 234 _repos_query(skip_existing=skip_existing, repo_limit=repo_limit) 235 ).fetchall() 236 237 if not repos: 238 if skip_existing: 239 log("readmes", "Nothing to fetch — all eligible repos already checked.") 240 return { 241 "found": 0, 242 "missing": 0, 243 "error": 0, 244 "skipped": 0, 245 "already_in_db": total_eligible, 246 } 247 raise RuntimeError("No repos with repo_did in tangled_repos.") 248 249 already_in_db = total_eligible - len(repos) if skip_existing else 0 250 if skip_existing: 251 metric("Eligible repos", total_eligible) 252 metric("Already in DB (skipped)", already_in_db) 253 metric("To fetch", len(repos)) 254 log("readmes", f"Checking READMEs for {len(repos)} repos …") 255 256 stats = { 257 "found": 0, 258 "missing": 0, 259 "error": 0, 260 "skipped": 0, 261 "already_in_db": already_in_db, 262 } 263 stats_lock = threading.Lock() 264 done = 0 265 done_lock = threading.Lock() 266 267 phase(1, "Parallel tree + blob fetch on knots") 268 269 def work(repo: dict[str, Any]) -> ReadmeResult: 270 knot = repo["knot_hostname"] 271 repo_did = repo["repo_did"] 272 if knot not in reachable: 273 return ReadmeResult( 274 repo_did=repo_did, 275 repo_uri=repo.get("uri"), 276 owner_handle=repo.get("owner_handle"), 277 repo_name=repo.get("name"), 278 knot_hostname=knot or "", 279 status="skipped", 280 error_message=f"knot not reachable: {knot}", 281 ) 282 with httpx.Client(timeout=60.0, follow_redirects=True) as client: 283 result = fetch_readme(client, knot_hostname=knot, repo_did=repo_did) 284 result.repo_uri = repo.get("uri") 285 result.owner_handle = repo.get("owner_handle") 286 result.repo_name = repo.get("name") 287 return result 288 289 with connect(dsn) as conn: 290 set_crawl_state( 291 conn, 292 key=CRAWL_KEY, 293 status="running", 294 meta={"repo_count": len(repos), "workers": workers}, 295 ) 296 conn.commit() 297 298 with ThreadPoolExecutor(max_workers=workers) as pool: 299 futures = {pool.submit(work, dict(repo)): repo for repo in repos} 300 301 for future in as_completed(futures): 302 repo = futures[future] 303 label = f"{repo.get('owner_handle') or '?'}/{repo.get('name') or repo['repo_did'][:16]}" 304 305 try: 306 result = future.result() 307 except Exception as exc: 308 result = ReadmeResult( 309 repo_did=repo["repo_did"], 310 repo_uri=repo.get("uri"), 311 owner_handle=repo.get("owner_handle"), 312 repo_name=repo.get("name"), 313 knot_hostname=repo.get("knot_hostname") or "", 314 status="error", 315 error_message=str(exc), 316 ) 317 318 upsert_readme(conn, result) 319 320 with stats_lock: 321 stats[result.status if result.status in stats else "error"] += 1 322 323 with done_lock: 324 done += 1 325 n = done 326 327 if result.status == "found": 328 if n <= 10 or n % 50 == 0: 329 step( 330 "readmes", 331 n, 332 len(repos), 333 f"OK {label} {result.readme_path} ({result.size_bytes} B)", 334 ) 335 elif n <= 10 or n % 100 == 0: 336 step( 337 "readmes", 338 n, 339 len(repos), 340 f"{result.status.upper()} {label} {result.error_message or ''}", 341 ) 342 343 if n % 50 == 0: 344 conn.commit() 345 346 set_crawl_state(conn, key=CRAWL_KEY, status="complete", meta=stats) 347 conn.commit() 348 349 summary_block( 350 "README check complete", 351 [ 352 f"Repos checked: {len(repos)}", 353 f"Already in DB: {stats['already_in_db']}", 354 f"Found README: {stats['found']}", 355 f"Missing README: {stats['missing']}", 356 f"Errors: {stats['error']}", 357 f"Skipped knot: {stats['skipped']}", 358 "", 359 "Query: select status, count(*) from tangled_readmes group by 1;", 360 ], 361 ) 362 return stats 363 364 365def main() -> None: 366 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"): 367 if candidate.exists(): 368 load_dotenv(candidate) 369 break 370 else: 371 load_dotenv() 372 373 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 374 if not dsn: 375 print("ERROR: DB_CONNECTION_STRING not set", file=sys.stderr) 376 raise SystemExit(1) 377 378 init_schema(dsn) 379 run_check_readmes(dsn) 380 381 382if __name__ == "__main__": 383 try: 384 main() 385 except KeyboardInterrupt: 386 print("\nInterrupted.", file=sys.stderr) 387 raise SystemExit(130) from None