This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 7.9 kB View raw
1"""Export embeddings from the shared Postgres into the embeddings git repo. 2 3This is the "transfer" step that publishes the Discover engine's embeddings to the 4network: it reads the precomputed vectors from Postgres (READ-ONLY) and writes the 5files consumed by `tangled-discover-embeddings` (a knot-hosted git repo) — a single 6`.npy` matrix + a `.jsonl` sidecar per section, plus a manifest. Commit + push that 7repo afterwards (the push emits `sh.tangled.git.refUpdate`, the consumers' re-pull 8signal). 9 10This is the canonical, pipeline-wireable copy. An identical-logic, self-contained 11copy also lives in the embeddings repo at `scripts/export_embeddings.py`; the only 12difference here is that the OUTPUT directory is configurable (this script lives in the 13backend repo, not in the embeddings repo). 14 15 # writes into ../tangled-discover-embeddings by default: 16 python scraper/export_embeddings.py 17 # or point it anywhere: 18 EMBEDDINGS_REPO_DIR=/path/to/tangled-discover-embeddings python scraper/export_embeddings.py 19 python scraper/export_embeddings.py /path/to/tangled-discover-embeddings 20 21Vectors read as pgvector text literals ('[v1,...]') exactly like recommendation/app/db.py 22and scraper/seed_user.py; they are already 1536-d and L2-normalized. No DB writes. 23""" 24 25from __future__ import annotations 26 27import datetime as dt 28import hashlib 29import json 30import os 31import sys 32from pathlib import Path 33 34import numpy as np 35import psycopg 36from psycopg.rows import dict_row 37 38try: 39 from dotenv import load_dotenv 40except ImportError: # dotenv optional if the var is already in env 41 def load_dotenv(*_a, **_k): # type: ignore 42 return False 43 44BACKEND_ROOT = Path(__file__).resolve().parent.parent # the sunsteadhack repo 45DIM = 1536 46MODEL = "gemini-embedding-001" 47 48 49def _out_dir() -> Path: 50 """Where to write the embeddings repo files. Precedence: argv[1] > env > default 51 sibling repo (../tangled-discover-embeddings).""" 52 if len(sys.argv) > 1: 53 return Path(sys.argv[1]).expanduser().resolve() 54 env = os.environ.get("EMBEDDINGS_REPO_DIR") 55 if env: 56 return Path(env).expanduser().resolve() 57 return (BACKEND_ROOT.parent / "tangled-discover-embeddings").resolve() 58 59 60# Repos: mirror recommendation/app/db.py joins so description/topics/created_at/handle 61# resolve the same way the engine sees them. content stays in the DB — we ship only its 62# length (for the min-chars gate) and md5(first 500 chars) (for fork dedup). 63_REPOS_SQL = """ 64 select r.repo_did, 65 r.repo_uri, 66 coalesce(r.owner_handle, ti.handle) as owner_handle, 67 r.repo_name, 68 tr.record_raw->>'description' as description, 69 tr.record_raw->'topics' as topics, 70 tr.record_raw->>'createdAt' as created_at, 71 length(trim(coalesce(r.content, ''))) as content_len, 72 md5(substring(coalesce(r.content, '') for 500)) as content_sha500, 73 r.embedding_model, 74 r.embedded_at, 75 r.embedding::text as etext 76 from tangled_readmes r 77 left join tangled_repos tr 78 on coalesce(tr.repo_did, tr.record_raw->>'repoDid') = r.repo_did 79 left join tangled_identities ti 80 on ti.did = split_part(replace(r.repo_uri, 'at://', ''), '/', 1) 81 where r.embedding is not null 82 order by r.repo_did 83""" 84 85# Issues: only those whose identity fully resolves (same inner joins as _KNN_ISSUES_SQL), 86# i.e. exactly the set the engine can emit. 87_ISSUES_SQL = """ 88 select i.uri, 89 i.rkey, 90 i.repo_did, 91 i.repo_uri, 92 i.author_did, 93 i.title, 94 i.body, 95 ti.handle as owner_handle, 96 tr.name as repo_name, 97 tr.record_raw->>'description' as repo_description, 98 i.issue_created_at as created_at, 99 i.embedding_model, 100 i.embedding::text as etext 101 from tangled_open_issues i 102 join tangled_identities ti 103 on ti.did = split_part(replace(i.repo_uri, 'at://', ''), '/', 1) 104 join tangled_repos tr 105 on tr.owner_did = split_part(replace(i.repo_uri, 'at://', ''), '/', 1) 106 and tr.rkey = split_part(i.repo_uri, '/', 5) 107 where i.embedding is not null 108 and i.repo_uri is not null 109 and ti.handle is not null 110 and tr.name is not null 111 order by i.uri 112""" 113 114 115def _dsn() -> str: 116 for candidate in (BACKEND_ROOT / ".env", BACKEND_ROOT / "recommendation" / ".env", BACKEND_ROOT / "scraper" / ".env"): 117 if candidate.exists(): 118 load_dotenv(candidate) 119 break 120 else: 121 load_dotenv() 122 conn = os.environ.get("DB_CONNECTION_STRING", "").strip() 123 if not conn: 124 raise SystemExit("DB_CONNECTION_STRING not set (env or .env)") 125 if "sslmode=" not in conn: # Cloud SQL public IP, self-signed cert 126 conn += ("&" if "?" in conn else "?") + "sslmode=require" 127 return conn 128 129 130def _parse_vec(etext: str) -> np.ndarray: 131 v = np.fromstring(etext.strip()[1:-1], sep=",", dtype=np.float32) 132 if v.shape[0] != DIM: 133 raise ValueError(f"expected dim {DIM}, got {v.shape[0]}") 134 return v 135 136 137def _json_default(o): 138 if isinstance(o, (dt.datetime, dt.date)): 139 return o.isoformat() 140 return str(o) 141 142 143def _export_section(conn, data_dir: Path, name: str, sql: str, meta_fields: list[str]) -> dict: 144 rows = conn.execute(sql).fetchall() 145 if not rows: 146 raise SystemExit(f"{name}: no embedded rows found") 147 matrix = np.vstack([_parse_vec(r["etext"]) for r in rows]).astype(np.float32) 148 149 npy_path = data_dir / f"{name}.f32.npy" 150 jsonl_path = data_dir / f"{name}.jsonl" 151 np.save(npy_path, matrix) 152 with open(jsonl_path, "w", encoding="utf-8") as fh: 153 for i, r in enumerate(rows): 154 rec = {"row": i, "subject_uri": r["uri"] if "uri" in r else r["repo_uri"]} 155 rec.update({k: r[k] for k in meta_fields}) 156 fh.write(json.dumps(rec, default=_json_default, ensure_ascii=False) + "\n") 157 158 sha = hashlib.sha256(npy_path.read_bytes()).hexdigest() 159 print(f" {name}: {matrix.shape[0]} vectors -> {npy_path} ({npy_path.stat().st_size // 1024} KiB)") 160 return { 161 "count": int(matrix.shape[0]), 162 "vectors": f"data/{name}.f32.npy", 163 "meta": f"data/{name}.jsonl", 164 "sha256": sha, 165 } 166 167 168def main() -> int: 169 out = _out_dir() 170 data_dir = out / "data" 171 data_dir.mkdir(parents=True, exist_ok=True) 172 print(f"exporting embeddings (read-only) -> {out}") 173 with psycopg.connect(_dsn(), row_factory=dict_row) as conn: 174 repos = _export_section( 175 conn, data_dir, "repos", _REPOS_SQL, 176 ["repo_did", "repo_name", "owner_handle", "description", "topics", 177 "created_at", "content_len", "content_sha500", "embedding_model", "embedded_at"], 178 ) 179 issues = _export_section( 180 conn, data_dir, "issues", _ISSUES_SQL, 181 ["repo_did", "rkey", "repo_uri", "author_did", "title", "body", 182 "owner_handle", "repo_name", "repo_description", "created_at", "embedding_model"], 183 ) 184 185 manifest = { 186 "schema_version": 1, 187 "model": MODEL, 188 "dim": DIM, 189 "metric": "cosine", 190 "normalized": True, 191 "task_type": "RETRIEVAL_DOCUMENT", 192 "generated_at": dt.datetime.now(dt.timezone.utc).isoformat(), 193 "sections": {"repos": repos, "issues": issues}, 194 } 195 (out / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n") 196 print(f"wrote {out / 'manifest.json'} (repos={repos['count']}, issues={issues['count']})") 197 print("next: cd into the embeddings repo, then git add -A && git commit && git push") 198 return 0 199 200 201if __name__ == "__main__": 202 raise SystemExit(main())