This repository has no description
1#!/usr/bin/env python3
2"""Full ingest for one Tangled handle: identity → repos → READMEs + embeddings → issues + embeddings.
3
4Onboards a single user for recommendations/testing without a network-wide crawl.
5
6Usage (from scraper/, with repo-root .env):
7 python ingest_handle.py arsenii.tngl.sh
8 python ingest_handle.py did:plc:abc123
9 python ingest_handle.py arsenii.tngl.sh --skip-issues
10 python ingest_handle.py arsenii.tngl.sh --force-embed
11
12Requires: DB_CONNECTION_STRING, GEMINI_API_KEY (for embeddings).
13"""
14
15from __future__ import annotations
16
17import argparse
18import json
19import os
20import sys
21from pathlib import Path
22
23import httpx
24from dotenv import load_dotenv
25
26from db import connect, init_schema, register_pgvector
27from embeddings import (
28 batch_size,
29 embed_texts,
30 embedding_model,
31 gemini_api_key,
32 truncate,
33)
34from fetch_issues import (
35 UserIssueResult,
36 _fetch_user_issues,
37 _issue_state,
38 _mark_user_synced,
39 _PdsCache,
40 _rkey_from_uri,
41 _state_map,
42 upsert_issue,
43)
44from progress import banner, log, summary_block
45
46REPO_ROOT = Path(__file__).resolve().parent.parent
47REPO_COLLECTION = "sh.tangled.repo"
48RESOLVE_PDS = (
49 "https://tngl.sh",
50 "https://bsky.social",
51 "https://public.api.bsky.app",
52)
53
54
55def load_env() -> None:
56 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"):
57 if candidate.exists():
58 load_dotenv(candidate)
59 return
60 load_dotenv()
61
62
63def normalize_handle(raw: str) -> str:
64 return raw.strip().lstrip("@")
65
66
67def resolve_handle_http(client: httpx.Client, handle: str) -> str | None:
68 for base in RESOLVE_PDS:
69 try:
70 resp = client.get(
71 f"{base.rstrip('/')}/xrpc/com.atproto.identity.resolveHandle",
72 params={"handle": handle},
73 timeout=20.0,
74 )
75 if resp.status_code == 200:
76 did = resp.json().get("did")
77 if isinstance(did, str) and did.startswith("did:"):
78 return did
79 except httpx.HTTPError:
80 continue
81 return None
82
83
84def resolve_did(client: httpx.Client, conn, handle_or_did: str) -> str:
85 raw = handle_or_did.strip()
86 if raw.startswith("did:"):
87 return raw
88 handle = normalize_handle(raw)
89 did = resolve_handle_http(client, handle)
90 if did:
91 return did
92 row = conn.execute(
93 "select did from tangled_identities where handle = %s limit 1",
94 (handle,),
95 ).fetchone()
96 if row:
97 return row["did"]
98 raise SystemExit(
99 f"ERROR: could not resolve handle {handle!r} "
100 f"(tried {', '.join(RESOLVE_PDS)} and tangled_identities)"
101 )
102
103
104def resolve_identity(client: httpx.Client, did: str) -> tuple[str, str | None]:
105 """Return (pds_endpoint, handle) from the PLC DID document."""
106 resp = client.get(f"https://plc.directory/{did}", timeout=20.0)
107 resp.raise_for_status()
108 doc = resp.json()
109 pds = next(
110 s["serviceEndpoint"]
111 for s in doc["service"]
112 if s.get("id") == "#atproto_pds"
113 )
114 handle = None
115 for aka in doc.get("alsoKnownAs", []):
116 if isinstance(aka, str) and aka.startswith("at://"):
117 handle = aka.removeprefix("at://")
118 break
119 return pds.rstrip("/"), handle
120
121
122def list_repos(client: httpx.Client, pds: str, did: str) -> list[dict]:
123 records: list[dict] = []
124 cursor: str | None = None
125 while True:
126 params: dict[str, str | int] = {
127 "repo": did,
128 "collection": REPO_COLLECTION,
129 "limit": 100,
130 }
131 if cursor:
132 params["cursor"] = cursor
133 resp = client.get(
134 f"{pds}/xrpc/com.atproto.repo.listRecords",
135 params=params,
136 timeout=30.0,
137 )
138 resp.raise_for_status()
139 data = resp.json()
140 page = data.get("records") or []
141 records.extend(rec for rec in page if isinstance(rec, dict))
142 cursor = data.get("cursor")
143 if not cursor or not page:
144 break
145 return records
146
147
148def fetch_readme(
149 client: httpx.Client, knot: str, repo_did: str
150) -> tuple[str | None, str | None]:
151 resp = client.get(
152 f"https://{knot}/xrpc/sh.tangled.repo.tree",
153 params={"repo": repo_did, "path": ""},
154 timeout=30.0,
155 )
156 if resp.status_code != 200:
157 return None, None
158 readme = (resp.json() or {}).get("readme")
159 if not isinstance(readme, dict):
160 return None, None
161 contents = readme.get("contents")
162 if not isinstance(contents, str) or not contents.strip():
163 return None, None
164 filename = readme.get("filename")
165 return (filename if isinstance(filename, str) else None), contents
166
167
168def vector_literal(vec: list[float]) -> str:
169 return "[" + ",".join(repr(x) for x in vec) + "]"
170
171
172def ingest_repos_and_readmes(
173 conn,
174 *,
175 http: httpx.Client,
176 did: str,
177 handle: str | None,
178 pds: str,
179 api_key: str,
180 model: str,
181 force_embed: bool,
182) -> dict[str, int]:
183 stats = {"repos": 0, "readmes_found": 0, "readmes_embedded": 0, "readmes_missing": 0}
184
185 conn.execute(
186 """
187 insert into tangled_identities (did, handle, pds_host, last_synced_at)
188 values (%s, %s, %s, now())
189 on conflict (did) do update set
190 handle = coalesce(excluded.handle, tangled_identities.handle),
191 pds_host = coalesce(excluded.pds_host, tangled_identities.pds_host),
192 last_synced_at = now()
193 """,
194 (did, handle, pds),
195 )
196
197 records = list_repos(http, pds, did)
198 log("repos", f"Found {len(records)} sh.tangled.repo record(s) on PDS")
199
200 ingested: list[dict] = []
201 for rec in records:
202 uri = rec["uri"]
203 value = rec["value"]
204 if not isinstance(value, dict):
205 continue
206 rkey = uri.rsplit("/", 1)[-1]
207 repo_did = value.get("repoDid")
208 knot = value.get("knot")
209 name = value.get("name") or rkey
210 if not repo_did or not knot:
211 log("repos", f" SKIP {name}: missing repoDid/knot")
212 continue
213 path, content = fetch_readme(http, knot, repo_did)
214 status = "found" if content else "missing"
215 if status == "found":
216 stats["readmes_found"] += 1
217 else:
218 stats["readmes_missing"] += 1
219 log(
220 "repos",
221 f" {name:20} readme={status}"
222 + (f" ({len(content)} chars)" if content else ""),
223 )
224 ingested.append(
225 {
226 "uri": uri,
227 "value": value,
228 "rkey": rkey,
229 "repo_did": repo_did,
230 "knot": knot,
231 "name": name,
232 "cid": rec.get("cid"),
233 "readme_path": path,
234 "content": content,
235 "status": status,
236 }
237 )
238 stats["repos"] += 1
239
240 found_rows = [r for r in ingested if r["status"] == "found"]
241 if force_embed:
242 to_embed = found_rows
243 else:
244 dids = [r["repo_did"] for r in found_rows]
245 if dids:
246 existing = {
247 row["repo_did"]
248 for row in conn.execute(
249 "select repo_did from tangled_readmes "
250 "where repo_did = any(%s) and embedding is not null",
251 (dids,),
252 ).fetchall()
253 }
254 else:
255 existing = set()
256 to_embed = [r for r in found_rows if r["repo_did"] not in existing]
257 vectors: dict[str, str] = {}
258 if to_embed:
259 vecs = embed_texts(
260 http,
261 api_key=api_key,
262 texts=[truncate(r["content"]) for r in to_embed],
263 )
264 vectors = {r["repo_did"]: vector_literal(v) for r, v in zip(to_embed, vecs, strict=True)}
265 stats["readmes_embedded"] = len(vectors)
266 log("embed", f"Embedded {len(vectors)} README(s) ({model}, 1536-d, L2)")
267
268 for r in ingested:
269 conn.execute(
270 """
271 insert into tangled_repos (
272 uri, owner_did, owner_handle, rkey, repo_did, name, knot_hostname,
273 cid, record_raw, discovered_via, last_synced_at
274 )
275 values (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, 'ingest_handle', now())
276 on conflict (uri) do update set
277 owner_did = excluded.owner_did,
278 owner_handle = excluded.owner_handle,
279 repo_did = coalesce(excluded.repo_did, tangled_repos.repo_did),
280 name = coalesce(excluded.name, tangled_repos.name),
281 knot_hostname = coalesce(excluded.knot_hostname, tangled_repos.knot_hostname),
282 cid = excluded.cid,
283 record_raw = excluded.record_raw,
284 last_synced_at = now()
285 """,
286 (
287 r["uri"],
288 did,
289 handle,
290 r["rkey"],
291 r["repo_did"],
292 r["name"],
293 r["knot"],
294 r["cid"],
295 json.dumps(r["value"]),
296 ),
297 )
298
299 vec = vectors.get(r["repo_did"])
300 conn.execute(
301 """
302 insert into tangled_readmes (
303 repo_did, repo_uri, owner_handle, repo_name, knot_hostname,
304 readme_path, status, content, size_bytes, fetched_at,
305 embedding, embedding_model, embedded_at
306 )
307 values (%s, %s, %s, %s, %s, %s, %s, %s, %s, now(),
308 %s::vector, %s, case when %s::text is null then null else now() end)
309 on conflict (repo_did) do update set
310 repo_uri = excluded.repo_uri,
311 owner_handle = excluded.owner_handle,
312 repo_name = excluded.repo_name,
313 knot_hostname = excluded.knot_hostname,
314 readme_path = excluded.readme_path,
315 status = excluded.status,
316 content = excluded.content,
317 size_bytes = excluded.size_bytes,
318 fetched_at = now(),
319 embedding = excluded.embedding,
320 embedding_model = excluded.embedding_model,
321 embedded_at = excluded.embedded_at
322 """,
323 (
324 r["repo_did"],
325 r["uri"],
326 handle,
327 r["name"],
328 r["knot"],
329 r["readme_path"],
330 r["status"],
331 r["content"],
332 len(r["content"].encode()) if r["content"] else None,
333 vec,
334 model if vec else None,
335 vec,
336 ),
337 )
338
339 return stats
340
341
342def ingest_issues(
343 conn,
344 *,
345 did: str,
346 handle: str | None,
347 pds: str,
348 max_pages: int,
349) -> dict[str, int]:
350 stats = {"issues": 0, "open": 0, "errors": 0}
351 cache = _PdsCache()
352 result: UserIssueResult = _fetch_user_issues(
353 did, handle, pds, cache, max_pages=max_pages
354 )
355 if result.status != "ok":
356 stats["errors"] = 1
357 log("issues", f"ERROR fetching issues: {result.error}")
358 _mark_user_synced(
359 conn,
360 user_did=did,
361 issue_count=0,
362 status="error",
363 error_message=result.error,
364 )
365 return stats
366
367 states = _state_map(result.states)
368 for rec in result.issues:
369 if not isinstance(rec.get("uri"), str) or not isinstance(rec.get("value"), dict):
370 continue
371 rkey = _rkey_from_uri(rec["uri"])
372 state = _issue_state(rec["uri"], rkey, states)
373 upsert_issue(
374 conn,
375 record=rec,
376 author_did=did,
377 author_handle=handle,
378 state=state,
379 )
380 stats["issues"] += 1
381 if state == "open":
382 stats["open"] += 1
383
384 _mark_user_synced(conn, user_did=did, issue_count=stats["issues"], status="ok")
385 log("issues", f"Upserted {stats['issues']} issue(s) ({stats['open']} open)")
386 return stats
387
388
389def embed_user_issues(
390 conn,
391 *,
392 http: httpx.Client,
393 did: str,
394 api_key: str,
395 model: str,
396 force: bool,
397) -> int:
398 where = "repo_did in (select repo_did from tangled_repos where owner_did = %s)"
399 params: list = [did]
400 if not force:
401 where += " and embedding is null"
402 rows = conn.execute(
403 f"""
404 select uri, title, body
405 from tangled_issues
406 where {where}
407 and coalesce(nullif(trim(title), ''), nullif(trim(body), '')) is not null
408 order by fetched_at desc
409 """,
410 params,
411 ).fetchall()
412 if not rows:
413 log("embed-issues", "No issues to embed for this user")
414 return 0
415
416 bs = batch_size()
417 embedded = 0
418 for start in range(0, len(rows), bs):
419 batch = rows[start : start + bs]
420 texts = [
421 truncate("\n\n".join(p for p in (r.get("title"), r.get("body")) if p and p.strip()))
422 for r in batch
423 ]
424 vectors = embed_texts(http, api_key=api_key, texts=texts)
425 for row, vec in zip(batch, vectors, strict=True):
426 conn.execute(
427 """
428 update tangled_issues
429 set embedding = %s::vector,
430 embedding_model = %s,
431 embedded_at = now()
432 where uri = %s
433 """,
434 (vector_literal(vec), model, row["uri"]),
435 )
436 embedded += len(batch)
437 log("embed-issues", f"Embedded {embedded} issue(s)")
438 return embedded
439
440
441def run(
442 handle_or_did: str,
443 *,
444 skip_issues: bool,
445 force_embed: bool,
446 max_pages: int,
447 init_db: bool,
448) -> int:
449 load_env()
450 dsn = os.getenv("DB_CONNECTION_STRING", "").strip()
451 if not dsn:
452 print("ERROR: DB_CONNECTION_STRING is not set", file=sys.stderr)
453 return 1
454
455 api_key = gemini_api_key()
456 model = embedding_model()
457
458 banner(f"INGEST HANDLE — {handle_or_did}")
459 if init_db:
460 log("setup", "Applying migrations…")
461 init_schema(dsn)
462
463 repo_stats: dict[str, int] = {}
464 issue_stats: dict[str, int] = {}
465 issues_embedded = 0
466
467 with httpx.Client(timeout=60.0, follow_redirects=True) as http, connect(dsn) as conn:
468 did = resolve_did(http, conn, handle_or_did)
469 pds, handle = resolve_identity(http, did)
470 log("identity", f"DID={did}")
471 log("identity", f"handle={handle} pds={pds}")
472
473 repo_stats = ingest_repos_and_readmes(
474 conn,
475 http=http,
476 did=did,
477 handle=handle,
478 pds=pds,
479 api_key=api_key,
480 model=model,
481 force_embed=force_embed,
482 )
483
484 if not skip_issues:
485 issue_stats = ingest_issues(
486 conn, did=did, handle=handle, pds=pds, max_pages=max_pages
487 )
488 issues_embedded = embed_user_issues(
489 conn,
490 http=http,
491 did=did,
492 api_key=api_key,
493 model=model,
494 force=force_embed,
495 )
496
497 conn.commit()
498
499 summary_block(
500 f"Ingest complete — {handle or did}",
501 [
502 f"DID: {did}",
503 f"Handle: {handle or '(unknown)'}",
504 f"Repos: {repo_stats.get('repos', 0)}",
505 f"READMEs found: {repo_stats.get('readmes_found', 0)}",
506 f"READMEs embedded: {repo_stats.get('readmes_embedded', 0)}",
507 f"READMEs missing: {repo_stats.get('readmes_missing', 0)}",
508 f"Issues upserted: {issue_stats.get('issues', 0)}",
509 f"Open issues: {issue_stats.get('open', 0)}",
510 f"Issues embedded: {issues_embedded}",
511 "",
512 "Test recommendations:",
513 f" curl 'http://localhost:8000/recommendations?handle={did}'",
514 ],
515 )
516 return 0
517
518
519def main(argv: list[str] | None = None) -> int:
520 parser = argparse.ArgumentParser(
521 description="Ingest one Tangled user by handle: repos, README embeddings, issues."
522 )
523 parser.add_argument("handle", help="Handle (e.g. arsenii.tngl.sh) or did:plc:…")
524 parser.add_argument(
525 "--skip-issues",
526 action="store_true",
527 help="Only ingest repos + README embeddings",
528 )
529 parser.add_argument(
530 "--force-embed",
531 action="store_true",
532 help="Re-embed READMEs and issues even if vectors already exist",
533 )
534 parser.add_argument(
535 "--max-pages",
536 type=int,
537 default=int(os.getenv("TANGLED_ISSUE_MAX_PAGES", "50")),
538 help="Max listRecords pages per issue collection (default: 50)",
539 )
540 parser.add_argument(
541 "--init-db",
542 action="store_true",
543 help="Run supabase migrations before ingest",
544 )
545 args = parser.parse_args(argv)
546 return run(
547 args.handle,
548 skip_issues=args.skip_issues,
549 force_embed=args.force_embed,
550 max_pages=max(1, args.max_pages),
551 init_db=args.init_db,
552 )
553
554
555if __name__ == "__main__":
556 raise SystemExit(main())