This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 17 kB View raw
1#!/usr/bin/env python3 2"""Full ingest for one Tangled handle: identity → repos → READMEs + embeddings → issues + embeddings. 3 4Onboards a single user for recommendations/testing without a network-wide crawl. 5 6Usage (from scraper/, with repo-root .env): 7 python ingest_handle.py arsenii.tngl.sh 8 python ingest_handle.py did:plc:abc123 9 python ingest_handle.py arsenii.tngl.sh --skip-issues 10 python ingest_handle.py arsenii.tngl.sh --force-embed 11 12Requires: DB_CONNECTION_STRING, GEMINI_API_KEY (for embeddings). 13""" 14 15from __future__ import annotations 16 17import argparse 18import json 19import os 20import sys 21from pathlib import Path 22 23import httpx 24from dotenv import load_dotenv 25 26from db import connect, init_schema, register_pgvector 27from embeddings import ( 28 batch_size, 29 embed_texts, 30 embedding_model, 31 gemini_api_key, 32 truncate, 33) 34from fetch_issues import ( 35 UserIssueResult, 36 _fetch_user_issues, 37 _issue_state, 38 _mark_user_synced, 39 _PdsCache, 40 _rkey_from_uri, 41 _state_map, 42 upsert_issue, 43) 44from progress import banner, log, summary_block 45 46REPO_ROOT = Path(__file__).resolve().parent.parent 47REPO_COLLECTION = "sh.tangled.repo" 48RESOLVE_PDS = ( 49 "https://tngl.sh", 50 "https://bsky.social", 51 "https://public.api.bsky.app", 52) 53 54 55def load_env() -> None: 56 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"): 57 if candidate.exists(): 58 load_dotenv(candidate) 59 return 60 load_dotenv() 61 62 63def normalize_handle(raw: str) -> str: 64 return raw.strip().lstrip("@") 65 66 67def resolve_handle_http(client: httpx.Client, handle: str) -> str | None: 68 for base in RESOLVE_PDS: 69 try: 70 resp = client.get( 71 f"{base.rstrip('/')}/xrpc/com.atproto.identity.resolveHandle", 72 params={"handle": handle}, 73 timeout=20.0, 74 ) 75 if resp.status_code == 200: 76 did = resp.json().get("did") 77 if isinstance(did, str) and did.startswith("did:"): 78 return did 79 except httpx.HTTPError: 80 continue 81 return None 82 83 84def resolve_did(client: httpx.Client, conn, handle_or_did: str) -> str: 85 raw = handle_or_did.strip() 86 if raw.startswith("did:"): 87 return raw 88 handle = normalize_handle(raw) 89 did = resolve_handle_http(client, handle) 90 if did: 91 return did 92 row = conn.execute( 93 "select did from tangled_identities where handle = %s limit 1", 94 (handle,), 95 ).fetchone() 96 if row: 97 return row["did"] 98 raise SystemExit( 99 f"ERROR: could not resolve handle {handle!r} " 100 f"(tried {', '.join(RESOLVE_PDS)} and tangled_identities)" 101 ) 102 103 104def resolve_identity(client: httpx.Client, did: str) -> tuple[str, str | None]: 105 """Return (pds_endpoint, handle) from the PLC DID document.""" 106 resp = client.get(f"https://plc.directory/{did}", timeout=20.0) 107 resp.raise_for_status() 108 doc = resp.json() 109 pds = next( 110 s["serviceEndpoint"] 111 for s in doc["service"] 112 if s.get("id") == "#atproto_pds" 113 ) 114 handle = None 115 for aka in doc.get("alsoKnownAs", []): 116 if isinstance(aka, str) and aka.startswith("at://"): 117 handle = aka.removeprefix("at://") 118 break 119 return pds.rstrip("/"), handle 120 121 122def list_repos(client: httpx.Client, pds: str, did: str) -> list[dict]: 123 records: list[dict] = [] 124 cursor: str | None = None 125 while True: 126 params: dict[str, str | int] = { 127 "repo": did, 128 "collection": REPO_COLLECTION, 129 "limit": 100, 130 } 131 if cursor: 132 params["cursor"] = cursor 133 resp = client.get( 134 f"{pds}/xrpc/com.atproto.repo.listRecords", 135 params=params, 136 timeout=30.0, 137 ) 138 resp.raise_for_status() 139 data = resp.json() 140 page = data.get("records") or [] 141 records.extend(rec for rec in page if isinstance(rec, dict)) 142 cursor = data.get("cursor") 143 if not cursor or not page: 144 break 145 return records 146 147 148def fetch_readme( 149 client: httpx.Client, knot: str, repo_did: str 150) -> tuple[str | None, str | None]: 151 resp = client.get( 152 f"https://{knot}/xrpc/sh.tangled.repo.tree", 153 params={"repo": repo_did, "path": ""}, 154 timeout=30.0, 155 ) 156 if resp.status_code != 200: 157 return None, None 158 readme = (resp.json() or {}).get("readme") 159 if not isinstance(readme, dict): 160 return None, None 161 contents = readme.get("contents") 162 if not isinstance(contents, str) or not contents.strip(): 163 return None, None 164 filename = readme.get("filename") 165 return (filename if isinstance(filename, str) else None), contents 166 167 168def vector_literal(vec: list[float]) -> str: 169 return "[" + ",".join(repr(x) for x in vec) + "]" 170 171 172def ingest_repos_and_readmes( 173 conn, 174 *, 175 http: httpx.Client, 176 did: str, 177 handle: str | None, 178 pds: str, 179 api_key: str, 180 model: str, 181 force_embed: bool, 182) -> dict[str, int]: 183 stats = {"repos": 0, "readmes_found": 0, "readmes_embedded": 0, "readmes_missing": 0} 184 185 conn.execute( 186 """ 187 insert into tangled_identities (did, handle, pds_host, last_synced_at) 188 values (%s, %s, %s, now()) 189 on conflict (did) do update set 190 handle = coalesce(excluded.handle, tangled_identities.handle), 191 pds_host = coalesce(excluded.pds_host, tangled_identities.pds_host), 192 last_synced_at = now() 193 """, 194 (did, handle, pds), 195 ) 196 197 records = list_repos(http, pds, did) 198 log("repos", f"Found {len(records)} sh.tangled.repo record(s) on PDS") 199 200 ingested: list[dict] = [] 201 for rec in records: 202 uri = rec["uri"] 203 value = rec["value"] 204 if not isinstance(value, dict): 205 continue 206 rkey = uri.rsplit("/", 1)[-1] 207 repo_did = value.get("repoDid") 208 knot = value.get("knot") 209 name = value.get("name") or rkey 210 if not repo_did or not knot: 211 log("repos", f" SKIP {name}: missing repoDid/knot") 212 continue 213 path, content = fetch_readme(http, knot, repo_did) 214 status = "found" if content else "missing" 215 if status == "found": 216 stats["readmes_found"] += 1 217 else: 218 stats["readmes_missing"] += 1 219 log( 220 "repos", 221 f" {name:20} readme={status}" 222 + (f" ({len(content)} chars)" if content else ""), 223 ) 224 ingested.append( 225 { 226 "uri": uri, 227 "value": value, 228 "rkey": rkey, 229 "repo_did": repo_did, 230 "knot": knot, 231 "name": name, 232 "cid": rec.get("cid"), 233 "readme_path": path, 234 "content": content, 235 "status": status, 236 } 237 ) 238 stats["repos"] += 1 239 240 found_rows = [r for r in ingested if r["status"] == "found"] 241 if force_embed: 242 to_embed = found_rows 243 else: 244 dids = [r["repo_did"] for r in found_rows] 245 if dids: 246 existing = { 247 row["repo_did"] 248 for row in conn.execute( 249 "select repo_did from tangled_readmes " 250 "where repo_did = any(%s) and embedding is not null", 251 (dids,), 252 ).fetchall() 253 } 254 else: 255 existing = set() 256 to_embed = [r for r in found_rows if r["repo_did"] not in existing] 257 vectors: dict[str, str] = {} 258 if to_embed: 259 vecs = embed_texts( 260 http, 261 api_key=api_key, 262 texts=[truncate(r["content"]) for r in to_embed], 263 ) 264 vectors = {r["repo_did"]: vector_literal(v) for r, v in zip(to_embed, vecs, strict=True)} 265 stats["readmes_embedded"] = len(vectors) 266 log("embed", f"Embedded {len(vectors)} README(s) ({model}, 1536-d, L2)") 267 268 for r in ingested: 269 conn.execute( 270 """ 271 insert into tangled_repos ( 272 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname, 273 cid, record_raw, discovered_via, last_synced_at 274 ) 275 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, 'ingest_handle', now()) 276 on conflict (uri) do update set 277 owner_did = excluded.owner_did, 278 owner_handle = excluded.owner_handle, 279 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did), 280 name = coalesce(excluded.name, tangled_repos.name), 281 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname), 282 cid = excluded.cid, 283 record_raw = excluded.record_raw, 284 last_synced_at = now() 285 """, 286 ( 287 r["uri"], 288 did, 289 handle, 290 r["rkey"], 291 r["repo_did"], 292 r["name"], 293 r["knot"], 294 r["cid"], 295 json.dumps(r["value"]), 296 ), 297 ) 298 299 vec = vectors.get(r["repo_did"]) 300 conn.execute( 301 """ 302 insert into tangled_readmes ( 303 repo_did, repo_uri, owner_handle, repo_name, knot_hostname, 304 readme_path, status, content, size_bytes, fetched_at, 305 embedding, embedding_model, embedded_at 306 ) 307 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, now(), 308 %s::vector, %s, case when %s::text is null then null else now() end) 309 on conflict (repo_did) do update set 310 repo_uri = excluded.repo_uri, 311 owner_handle = excluded.owner_handle, 312 repo_name = excluded.repo_name, 313 knot_hostname = excluded.knot_hostname, 314 readme_path = excluded.readme_path, 315 status = excluded.status, 316 content = excluded.content, 317 size_bytes = excluded.size_bytes, 318 fetched_at = now(), 319 embedding = excluded.embedding, 320 embedding_model = excluded.embedding_model, 321 embedded_at = excluded.embedded_at 322 """, 323 ( 324 r["repo_did"], 325 r["uri"], 326 handle, 327 r["name"], 328 r["knot"], 329 r["readme_path"], 330 r["status"], 331 r["content"], 332 len(r["content"].encode()) if r["content"] else None, 333 vec, 334 model if vec else None, 335 vec, 336 ), 337 ) 338 339 return stats 340 341 342def ingest_issues( 343 conn, 344 *, 345 did: str, 346 handle: str | None, 347 pds: str, 348 max_pages: int, 349) -> dict[str, int]: 350 stats = {"issues": 0, "open": 0, "errors": 0} 351 cache = _PdsCache() 352 result: UserIssueResult = _fetch_user_issues( 353 did, handle, pds, cache, max_pages=max_pages 354 ) 355 if result.status != "ok": 356 stats["errors"] = 1 357 log("issues", f"ERROR fetching issues: {result.error}") 358 _mark_user_synced( 359 conn, 360 user_did=did, 361 issue_count=0, 362 status="error", 363 error_message=result.error, 364 ) 365 return stats 366 367 states = _state_map(result.states) 368 for rec in result.issues: 369 if not isinstance(rec.get("uri"), str) or not isinstance(rec.get("value"), dict): 370 continue 371 rkey = _rkey_from_uri(rec["uri"]) 372 state = _issue_state(rec["uri"], rkey, states) 373 upsert_issue( 374 conn, 375 record=rec, 376 author_did=did, 377 author_handle=handle, 378 state=state, 379 ) 380 stats["issues"] += 1 381 if state == "open": 382 stats["open"] += 1 383 384 _mark_user_synced(conn, user_did=did, issue_count=stats["issues"], status="ok") 385 log("issues", f"Upserted {stats['issues']} issue(s) ({stats['open']} open)") 386 return stats 387 388 389def embed_user_issues( 390 conn, 391 *, 392 http: httpx.Client, 393 did: str, 394 api_key: str, 395 model: str, 396 force: bool, 397) -> int: 398 where = "repo_did in (select repo_did from tangled_repos where owner_did = %s)" 399 params: list = [did] 400 if not force: 401 where += " and embedding is null" 402 rows = conn.execute( 403 f""" 404 select uri, title, body 405 from tangled_issues 406 where {where} 407 and coalesce(nullif(trim(title), ''), nullif(trim(body), '')) is not null 408 order by fetched_at desc 409 """, 410 params, 411 ).fetchall() 412 if not rows: 413 log("embed-issues", "No issues to embed for this user") 414 return 0 415 416 bs = batch_size() 417 embedded = 0 418 for start in range(0, len(rows), bs): 419 batch = rows[start : start + bs] 420 texts = [ 421 truncate("\n\n".join(p for p in (r.get("title"), r.get("body")) if p and p.strip())) 422 for r in batch 423 ] 424 vectors = embed_texts(http, api_key=api_key, texts=texts) 425 for row, vec in zip(batch, vectors, strict=True): 426 conn.execute( 427 """ 428 update tangled_issues 429 set embedding = %s::vector, 430 embedding_model = %s, 431 embedded_at = now() 432 where uri = %s 433 """, 434 (vector_literal(vec), model, row["uri"]), 435 ) 436 embedded += len(batch) 437 log("embed-issues", f"Embedded {embedded} issue(s)") 438 return embedded 439 440 441def run( 442 handle_or_did: str, 443 *, 444 skip_issues: bool, 445 force_embed: bool, 446 max_pages: int, 447 init_db: bool, 448) -> int: 449 load_env() 450 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 451 if not dsn: 452 print("ERROR: DB_CONNECTION_STRING is not set", file=sys.stderr) 453 return 1 454 455 api_key = gemini_api_key() 456 model = embedding_model() 457 458 banner(f"INGEST HANDLE — {handle_or_did}") 459 if init_db: 460 log("setup", "Applying migrations…") 461 init_schema(dsn) 462 463 repo_stats: dict[str, int] = {} 464 issue_stats: dict[str, int] = {} 465 issues_embedded = 0 466 467 with httpx.Client(timeout=60.0, follow_redirects=True) as http, connect(dsn) as conn: 468 did = resolve_did(http, conn, handle_or_did) 469 pds, handle = resolve_identity(http, did) 470 log("identity", f"DID={did}") 471 log("identity", f"handle={handle} pds={pds}") 472 473 repo_stats = ingest_repos_and_readmes( 474 conn, 475 http=http, 476 did=did, 477 handle=handle, 478 pds=pds, 479 api_key=api_key, 480 model=model, 481 force_embed=force_embed, 482 ) 483 484 if not skip_issues: 485 issue_stats = ingest_issues( 486 conn, did=did, handle=handle, pds=pds, max_pages=max_pages 487 ) 488 issues_embedded = embed_user_issues( 489 conn, 490 http=http, 491 did=did, 492 api_key=api_key, 493 model=model, 494 force=force_embed, 495 ) 496 497 conn.commit() 498 499 summary_block( 500 f"Ingest complete — {handle or did}", 501 [ 502 f"DID: {did}", 503 f"Handle: {handle or '(unknown)'}", 504 f"Repos: {repo_stats.get('repos', 0)}", 505 f"READMEs found: {repo_stats.get('readmes_found', 0)}", 506 f"READMEs embedded: {repo_stats.get('readmes_embedded', 0)}", 507 f"READMEs missing: {repo_stats.get('readmes_missing', 0)}", 508 f"Issues upserted: {issue_stats.get('issues', 0)}", 509 f"Open issues: {issue_stats.get('open', 0)}", 510 f"Issues embedded: {issues_embedded}", 511 "", 512 "Test recommendations:", 513 f" curl 'http://localhost:8000/recommendations?handle={did}'", 514 ], 515 ) 516 return 0 517 518 519def main(argv: list[str] | None = None) -> int: 520 parser = argparse.ArgumentParser( 521 description="Ingest one Tangled user by handle: repos, README embeddings, issues." 522 ) 523 parser.add_argument("handle", help="Handle (e.g. arsenii.tngl.sh) or did:plc:…") 524 parser.add_argument( 525 "--skip-issues", 526 action="store_true", 527 help="Only ingest repos + README embeddings", 528 ) 529 parser.add_argument( 530 "--force-embed", 531 action="store_true", 532 help="Re-embed READMEs and issues even if vectors already exist", 533 ) 534 parser.add_argument( 535 "--max-pages", 536 type=int, 537 default=int(os.getenv("TANGLED_ISSUE_MAX_PAGES", "50")), 538 help="Max listRecords pages per issue collection (default: 50)", 539 ) 540 parser.add_argument( 541 "--init-db", 542 action="store_true", 543 help="Run supabase migrations before ingest", 544 ) 545 args = parser.parse_args(argv) 546 return run( 547 args.handle, 548 skip_issues=args.skip_issues, 549 force_embed=args.force_embed, 550 max_pages=max(1, args.max_pages), 551 init_db=args.init_db, 552 ) 553 554 555if __name__ == "__main__": 556 raise SystemExit(main())