This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 6.8 kB View raw
1// Embed all unembedded READMEs in tangled_readmes using Google Gemini embeddings. 2// 3// - Reads the worklist (status='found' AND content IS NOT NULL AND embedding IS NULL), 4// the exact predicate behind tangled_readmes_unembedded_idx. 5// - Embeds doc = "# <name>\n\n<description>\n\n<README>" with gemini-embedding-001 at 6// outputDimensionality=1536 (matches the vector(1536) column), task RETRIEVAL_DOCUMENT. 7// - L2-normalizes (sub-3072 MRL dims aren't auto-normalized) so the HNSW cosine index is happy. 8// - UPDATEs only the embedding columns, only where embedding IS NULL → idempotent / re-runnable. 9// 10// Env: DB_CONNECTION_STRING (or ../.env), GEMINI_API_KEY (required). 11// Optional: LIMIT (0=all), CONCURRENCY (default 4), DRY_RUN=1 (count only), MAX_CHARS (default 8000). 12 13import pg from "pg"; 14import { readFileSync } from "node:fs"; 15 16function fromEnvFile(key) { 17 for (const p of ["../.env", ".env", "../../.env"]) { 18 try { 19 const m = readFileSync(p, "utf8").match(new RegExp(`^\\s*${key}\\s*=\\s*(.+)\\s*$`, "m")); 20 if (m) return m[1].trim().replace(/^["']|["']$/g, ""); 21 } catch {} 22 } 23 return undefined; 24} 25 26const CONN = process.env.DB_CONNECTION_STRING || fromEnvFile("DB_CONNECTION_STRING"); 27const API_KEY = process.env.GEMINI_API_KEY || fromEnvFile("GEMINI_API_KEY"); 28const MODEL = process.env.GEMINI_EMBED_MODEL || fromEnvFile("GEMINI_EMBED_MODEL") || "gemini-embedding-001"; 29const DIMS = 1536; 30const LIMIT = parseInt(process.env.LIMIT ?? "0", 10); 31const CONCURRENCY = parseInt(process.env.CONCURRENCY ?? "4", 10); 32const MAX_CHARS = parseInt(process.env.MAX_CHARS ?? "8000", 10); 33const DRY_RUN = process.env.DRY_RUN === "1"; 34 35if (!CONN) { console.error("DB_CONNECTION_STRING not set"); process.exit(1); } 36if (!API_KEY && !DRY_RUN) { console.error("GEMINI_API_KEY not set (add it to recommendation/.env)"); process.exit(1); } 37 38const pool = new pg.Pool({ connectionString: CONN, ssl: { rejectUnauthorized: false }, max: 5 }); 39const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); 40 41function buildDoc({ repo_name, description, content }) { 42 const parts = []; 43 if (repo_name) parts.push(`# ${repo_name}`); 44 if (description && description.trim()) parts.push(description.trim()); 45 parts.push(content); 46 return parts.join("\n\n").slice(0, MAX_CHARS); 47} 48 49function l2normalize(v) { 50 let s = 0; 51 for (const x of v) s += x * x; 52 const n = Math.sqrt(s) || 1; 53 return v.map((x) => x / n); 54} 55 56async function embedOnce(text, dims) { 57 const url = `https://generativelanguage.googleapis.com/v1beta/models/${MODEL}:embedContent`; 58 const body = { 59 model: `models/${MODEL}`, 60 content: { parts: [{ text }] }, 61 taskType: "RETRIEVAL_DOCUMENT", 62 outputDimensionality: dims, 63 }; 64 const resp = await fetch(url, { 65 method: "POST", 66 headers: { "content-type": "application/json", "x-goog-api-key": API_KEY }, 67 body: JSON.stringify(body), 68 }); 69 const txt = await resp.text(); 70 if (!resp.ok) { 71 const err = new Error(`HTTP ${resp.status}: ${txt.slice(0, 200)}`); 72 err.status = resp.status; 73 throw err; 74 } 75 const j = JSON.parse(txt); 76 const values = j?.embedding?.values; 77 if (!Array.isArray(values)) throw new Error(`no embedding in response: ${txt.slice(0, 150)}`); 78 return values; 79} 80 81// Embed with retries; on 400 (often too-long input) retry once with a hard truncation. 82async function embedWithRetry(text) { 83 let attempt = 0; 84 let input = text; 85 while (true) { 86 try { 87 const v = await embedOnce(input, DIMS); 88 return l2normalize(v); 89 } catch (e) { 90 attempt++; 91 if (e.status === 400 && input.length > 2000) { 92 input = input.slice(0, Math.floor(input.length / 2)); 93 continue; 94 } 95 if (attempt >= 5 || (e.status && e.status >= 400 && e.status < 500 && e.status !== 429)) { 96 throw e; 97 } 98 const backoff = Math.min(30000, 800 * 2 ** (attempt - 1)); 99 await sleep(backoff); 100 } 101 } 102} 103 104async function main() { 105 const worklistSql = ` 106 select r.repo_did, r.repo_name, r.content, 107 coalesce(tr.record_raw->>'description', '') as description, 108 length(r.content) as len 109 from tangled_readmes r 110 left join tangled_repos tr 111 on coalesce(tr.repo_did, tr.record_raw->>'repoDid') = r.repo_did 112 where r.status = 'found' and r.content is not null and r.embedding is null 113 order by r.repo_did 114 ${LIMIT > 0 ? `limit ${LIMIT}` : ""}`; 115 116 const { rows } = await pool.query(worklistSql); 117 const totalReadmes = (await pool.query(`select count(*)::int n from tangled_readmes`)).rows[0].n; 118 const alreadyEmbedded = (await pool.query(`select count(*)::int n from tangled_readmes where embedding is not null`)).rows[0].n; 119 120 console.log(`tangled_readmes total=${totalReadmes} already embedded=${alreadyEmbedded}`); 121 console.log(`worklist (to embed now)=${rows.length} model=${MODEL} dims=${DIMS} concurrency=${CONCURRENCY}${LIMIT ? ` limit=${LIMIT}` : ""}`); 122 if (DRY_RUN) { console.log("\nDRY_RUN=1 → not embedding, not writing."); await pool.end(); return; } 123 if (rows.length === 0) { console.log("\nNothing to embed. ✔"); await pool.end(); return; } 124 125 let done = 0, ok = 0, failed = 0; 126 const errors = []; 127 const queue = rows.slice(); 128 129 async function worker(id) { 130 while (queue.length) { 131 const r = queue.pop(); 132 try { 133 const doc = buildDoc(r); 134 const vec = await embedWithRetry(doc); 135 const literal = `[${vec.join(",")}]`; 136 const res = await pool.query( 137 `update tangled_readmes 138 set embedding = $1::vector, embedding_model = $2, embedded_at = now() 139 where repo_did = $3 and embedding is null`, 140 [literal, MODEL, r.repo_did], 141 ); 142 if (res.rowCount > 0) ok++; 143 } catch (e) { 144 failed++; 145 errors.push({ repo_did: r.repo_did, name: r.repo_name, err: e.message }); 146 } 147 if (++done % 25 === 0 || done === rows.length) { 148 process.stderr.write(` ...${done}/${rows.length} (ok=${ok} fail=${failed})\n`); 149 } 150 } 151 } 152 153 await Promise.all(Array.from({ length: CONCURRENCY }, (_, i) => worker(i))); 154 155 console.log(`\n================ EMBEDDING DONE ================`); 156 console.log(`embedded ok : ${ok}`); 157 console.log(`failed : ${failed}`); 158 if (errors.length) { 159 console.log("\nfirst errors:"); 160 for (const e of errors.slice(0, 10)) console.log(` ${e.name ?? e.repo_did}: ${e.err}`); 161 } 162 const remaining = (await pool.query( 163 `select count(*)::int n from tangled_readmes where status='found' and content is not null and embedding is null`, 164 )).rows[0].n; 165 console.log(`\nremaining unembedded (status=found): ${remaining}`); 166 await pool.end(); 167} 168 169main().catch((e) => { console.error("FATAL:", e); process.exit(1); });