This repository has no description
1"""Export embeddings from the shared Postgres into the embeddings git repo.
2
3This is the "transfer" step that publishes the Discover engine's embeddings to the
4network: it reads the precomputed vectors from Postgres (READ-ONLY) and writes the
5files consumed by `tangled-discover-embeddings` (a knot-hosted git repo) — a single
6`.npy` matrix + a `.jsonl` sidecar per section, plus a manifest. Commit + push that
7repo afterwards (the push emits `sh.tangled.git.refUpdate`, the consumers' re-pull
8signal).
9
10This is the canonical, pipeline-wireable copy. An identical-logic, self-contained
11copy also lives in the embeddings repo at `scripts/export_embeddings.py`; the only
12difference here is that the OUTPUT directory is configurable (this script lives in the
13backend repo, not in the embeddings repo).
14
15 # writes into ../tangled-discover-embeddings by default:
16 python scraper/export_embeddings.py
17 # or point it anywhere:
18 EMBEDDINGS_REPO_DIR=/path/to/tangled-discover-embeddings python scraper/export_embeddings.py
19 python scraper/export_embeddings.py /path/to/tangled-discover-embeddings
20
21Vectors read as pgvector text literals ('[v1,...]') exactly like recommendation/app/db.py
22and scraper/seed_user.py; they are already 1536-d and L2-normalized. No DB writes.
23"""
24
25from __future__ import annotations
26
27import datetime as dt
28import hashlib
29import json
30import os
31import sys
32from pathlib import Path
33
34import numpy as np
35import psycopg
36from psycopg.rows import dict_row
37
38try:
39 from dotenv import load_dotenv
40except ImportError: # dotenv optional if the var is already in env
41 def load_dotenv(*_a, **_k): # type: ignore
42 return False
43
44BACKEND_ROOT = Path(__file__).resolve().parent.parent # the sunsteadhack repo
45DIM = 1536
46MODEL = "gemini-embedding-001"
47
48
49def _out_dir() -> Path:
50 """Where to write the embeddings repo files. Precedence: argv[1] > env > default
51 sibling repo (../tangled-discover-embeddings)."""
52 if len(sys.argv) > 1:
53 return Path(sys.argv[1]).expanduser().resolve()
54 env = os.environ.get("EMBEDDINGS_REPO_DIR")
55 if env:
56 return Path(env).expanduser().resolve()
57 return (BACKEND_ROOT.parent / "tangled-discover-embeddings").resolve()
58
59
60# Repos: mirror recommendation/app/db.py joins so description/topics/created_at/handle
61# resolve the same way the engine sees them. content stays in the DB — we ship only its
62# length (for the min-chars gate) and md5(first 500 chars) (for fork dedup).
63_REPOS_SQL = """
64 select r.repo_did,
65 r.repo_uri,
66 coalesce(r.owner_handle, ti.handle) as owner_handle,
67 r.repo_name,
68 tr.record_raw->>'description' as description,
69 tr.record_raw->'topics' as topics,
70 tr.record_raw->>'createdAt' as created_at,
71 length(trim(coalesce(r.content, ''))) as content_len,
72 md5(substring(coalesce(r.content, '') for 500)) as content_sha500,
73 r.embedding_model,
74 r.embedded_at,
75 r.embedding::text as etext
76 from tangled_readmes r
77 left join tangled_repos tr
78 on coalesce(tr.repo_did, tr.record_raw->>'repoDid') = r.repo_did
79 left join tangled_identities ti
80 on ti.did = split_part(replace(r.repo_uri, 'at://', ''), '/', 1)
81 where r.embedding is not null
82 order by r.repo_did
83"""
84
85# Issues: only those whose identity fully resolves (same inner joins as _KNN_ISSUES_SQL),
86# i.e. exactly the set the engine can emit.
87_ISSUES_SQL = """
88 select i.uri,
89 i.rkey,
90 i.repo_did,
91 i.repo_uri,
92 i.author_did,
93 i.title,
94 i.body,
95 ti.handle as owner_handle,
96 tr.name as repo_name,
97 tr.record_raw->>'description' as repo_description,
98 i.issue_created_at as created_at,
99 i.embedding_model,
100 i.embedding::text as etext
101 from tangled_open_issues i
102 join tangled_identities ti
103 on ti.did = split_part(replace(i.repo_uri, 'at://', ''), '/', 1)
104 join tangled_repos tr
105 on tr.owner_did = split_part(replace(i.repo_uri, 'at://', ''), '/', 1)
106 and tr.rkey = split_part(i.repo_uri, '/', 5)
107 where i.embedding is not null
108 and i.repo_uri is not null
109 and ti.handle is not null
110 and tr.name is not null
111 order by i.uri
112"""
113
114
115def _dsn() -> str:
116 for candidate in (BACKEND_ROOT / ".env", BACKEND_ROOT / "recommendation" / ".env", BACKEND_ROOT / "scraper" / ".env"):
117 if candidate.exists():
118 load_dotenv(candidate)
119 break
120 else:
121 load_dotenv()
122 conn = os.environ.get("DB_CONNECTION_STRING", "").strip()
123 if not conn:
124 raise SystemExit("DB_CONNECTION_STRING not set (env or .env)")
125 if "sslmode=" not in conn: # Cloud SQL public IP, self-signed cert
126 conn += ("&" if "?" in conn else "?") + "sslmode=require"
127 return conn
128
129
130def _parse_vec(etext: str) -> np.ndarray:
131 v = np.fromstring(etext.strip()[1:-1], sep=",", dtype=np.float32)
132 if v.shape[0] != DIM:
133 raise ValueError(f"expected dim {DIM}, got {v.shape[0]}")
134 return v
135
136
137def _json_default(o):
138 if isinstance(o, (dt.datetime, dt.date)):
139 return o.isoformat()
140 return str(o)
141
142
143def _export_section(conn, data_dir: Path, name: str, sql: str, meta_fields: list[str]) -> dict:
144 rows = conn.execute(sql).fetchall()
145 if not rows:
146 raise SystemExit(f"{name}: no embedded rows found")
147 matrix = np.vstack([_parse_vec(r["etext"]) for r in rows]).astype(np.float32)
148
149 npy_path = data_dir / f"{name}.f32.npy"
150 jsonl_path = data_dir / f"{name}.jsonl"
151 np.save(npy_path, matrix)
152 with open(jsonl_path, "w", encoding="utf-8") as fh:
153 for i, r in enumerate(rows):
154 rec = {"row": i, "subject_uri": r["uri"] if "uri" in r else r["repo_uri"]}
155 rec.update({k: r[k] for k in meta_fields})
156 fh.write(json.dumps(rec, default=_json_default, ensure_ascii=False) + "\n")
157
158 sha = hashlib.sha256(npy_path.read_bytes()).hexdigest()
159 print(f" {name}: {matrix.shape[0]} vectors -> {npy_path} ({npy_path.stat().st_size // 1024} KiB)")
160 return {
161 "count": int(matrix.shape[0]),
162 "vectors": f"data/{name}.f32.npy",
163 "meta": f"data/{name}.jsonl",
164 "sha256": sha,
165 }
166
167
168def main() -> int:
169 out = _out_dir()
170 data_dir = out / "data"
171 data_dir.mkdir(parents=True, exist_ok=True)
172 print(f"exporting embeddings (read-only) -> {out}")
173 with psycopg.connect(_dsn(), row_factory=dict_row) as conn:
174 repos = _export_section(
175 conn, data_dir, "repos", _REPOS_SQL,
176 ["repo_did", "repo_name", "owner_handle", "description", "topics",
177 "created_at", "content_len", "content_sha500", "embedding_model", "embedded_at"],
178 )
179 issues = _export_section(
180 conn, data_dir, "issues", _ISSUES_SQL,
181 ["repo_did", "rkey", "repo_uri", "author_did", "title", "body",
182 "owner_handle", "repo_name", "repo_description", "created_at", "embedding_model"],
183 )
184
185 manifest = {
186 "schema_version": 1,
187 "model": MODEL,
188 "dim": DIM,
189 "metric": "cosine",
190 "normalized": True,
191 "task_type": "RETRIEVAL_DOCUMENT",
192 "generated_at": dt.datetime.now(dt.timezone.utc).isoformat(),
193 "sections": {"repos": repos, "issues": issues},
194 }
195 (out / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n")
196 print(f"wrote {out / 'manifest.json'} (repos={repos['count']}, issues={issues['count']})")
197 print("next: cd into the embeddings repo, then git add -A && git commit && git push")
198 return 0
199
200
201if __name__ == "__main__":
202 raise SystemExit(main())