Sunstead trust scoring project
1"""Embeddings via the Featherless API (OpenAI-compatible) using Qwen3-Embedding-4B.
2
3Feeds the diff-embedding / slop-similarity path (PRD 6.x): embed a PR diff, then
4cosine-k-NN it against known-bad diffs. Returns None when no FEATHERLESS_API_KEY
5is set — exactly like review.py, so the caller treats it as "signal unavailable"
6rather than crashing.
7
8OpenAI-compatible, so this is a thin POST to /embeddings; no openai SDK needed.
9"""
10
11from __future__ import annotations
12
13import math
14import os
15
16from .config import CFG
17
18
19def _key() -> str | None:
20 return os.environ.get(CFG.embed.api_key_env)
21
22
23def embed(texts: str | list[str], model: str | None = None) -> list[list[float]] | None:
24 """Embed text(s) -> list of float vectors, input order preserved.
25
26 Returns None if no API key is configured. Long inputs are truncated to
27 CFG.embed.max_chars; sent in batches of CFG.embed.batch.
28 """
29 if _key() is None:
30 return None
31 if isinstance(texts, str):
32 texts = [texts]
33 if not texts:
34 return []
35
36 import httpx
37
38 model = model or CFG.embed.model
39 out: list[list[float]] = []
40 with httpx.Client(base_url=CFG.embed.base_url, timeout=CFG.embed.timeout,
41 headers={"Authorization": f"Bearer {_key()}"}) as client:
42 for i in range(0, len(texts), CFG.embed.batch):
43 chunk = [t[: CFG.embed.max_chars] for t in texts[i : i + CFG.embed.batch]]
44 body: dict = {"model": model, "input": chunk}
45 if CFG.embed.dimensions:
46 body["dimensions"] = CFG.embed.dimensions # MRL truncation, if the server honors it
47 r = client.post("/embeddings", json=body)
48 r.raise_for_status()
49 # /embeddings does not guarantee order; sort by the returned index.
50 data = sorted(r.json()["data"], key=lambda d: d["index"])
51 out.extend(d["embedding"] for d in data)
52 return out
53
54
55def cosine(a: list[float], b: list[float]) -> float:
56 """Cosine similarity. 0.0 if either vector is zero-length."""
57 dot = sum(x * y for x, y in zip(a, b))
58 na = math.sqrt(sum(x * x for x in a))
59 nb = math.sqrt(sum(y * y for y in b))
60 return dot / (na * nb) if na and nb else 0.0
61
62
63def _max_cosine(qv: list[float], vecs: list[list[float]]) -> float | None:
64 """Nearest-neighbour similarity of qv to a corpus, clamped to [0,1]. None if empty."""
65 m = max((cosine(qv, v) for v in vecs), default=None)
66 return None if m is None else max(0.0, min(1.0, m))
67
68
69# --- diff/slop-similarity path (PRD 6.12) ---------------------------------
70# Embed every scraped PR diff once into diff_vectors; "known-bad" is decided at
71# query time by joining pr_labels (clean_merge=0), so re-labelling never needs a
72# re-embed. Search is a cosine scan in Python. ponytail: linear scan -- swap to
73# DuckDB's list_cosine_similarity / VSS HNSW index if the corpus grows large.
74
75
76def index_diffs(limit: int = 256) -> int:
77 """Embed up to `limit` PR diffs not yet in diff_vectors (one pass). Idempotent
78 and resumable: pr_id NOT IN diff_vectors means a re-run only embeds new diffs,
79 so call it repeatedly while the scraper fills pull_requests. Returns the count
80 embedded; 0 when caught up or no API key (signal stays absent, like review.py).
81
82 Opens its own short-lived connections: read the batch, release, embed off-lock
83 (the network call is slow), then take the write lock only for the insert -- so a
84 concurrently-running scraper is never blocked while we wait on Featherless."""
85 if _key() is None:
86 return 0
87 from .db import connection
88
89 with connection(read_only=True) as con:
90 rows = con.execute(
91 "SELECT pr_id, diff_text FROM pull_requests "
92 "WHERE diff_text IS NOT NULL AND length(diff_text) > 0 "
93 "AND pr_id NOT IN (SELECT pr_id FROM diff_vectors) LIMIT ?",
94 [limit],
95 ).fetchall()
96 if not rows:
97 return 0
98 vecs = embed([d for _, d in rows]) # network call, NO db lock held
99 if vecs is None:
100 return 0
101 with connection(read_only=False) as con:
102 con.executemany(
103 "INSERT INTO diff_vectors (pr_id, label, embedding) VALUES (?, 'pr', ?) "
104 "ON CONFLICT (pr_id) DO UPDATE SET embedding = excluded.embedding",
105 [[pr_id, v] for (pr_id, _), v in zip(rows, vecs)],
106 )
107 return len(rows)
108
109
110def slop_score(con, diff: str, exclude_pr_id: str | None = None) -> float | None:
111 """Similarity of `diff` to the nearest *currently* known-bad diff (clean_merge=0),
112 in [0,1]. None if the diff is empty, no key is set, or nothing bad is embedded yet.
113 Advisory only -- fed to Claude as a machine finding; never decides a PR on its own."""
114 if not diff:
115 return None
116 q = embed(diff)
117 if not q: # None (no key) or empty
118 return None
119 sql = ("SELECT d.embedding FROM diff_vectors d JOIN pr_labels l USING (pr_id) "
120 "WHERE l.clean_merge = 0")
121 params: list = []
122 if exclude_pr_id: # a PR must not match itself
123 sql += " AND d.pr_id <> ?"
124 params.append(exclude_pr_id)
125 vecs = [r[0] for r in con.execute(sql, params).fetchall()]
126 return _max_cosine(q[0], vecs)
127
128
129def demo() -> None:
130 """Offline: cosine identities. Live (key set): near-duplicate code embeds
131 closer than unrelated prose."""
132 v = [1.0, 2.0, 3.0]
133 assert abs(cosine(v, v) - 1.0) < 1e-9, "cosine(v,v) must be 1"
134 assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9, "orthogonal -> 0"
135
136 # slop-path ranking: a near-duplicate of a corpus vector outranks an unrelated one.
137 corpus = [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]
138 assert _max_cosine([1.0, 0.05, 0.0], corpus) > 0.99, "near-dup -> ~1"
139 assert _max_cosine([0.0, 0.0, 1.0], corpus) < 0.2, "unrelated -> low"
140 assert _max_cosine([1.0], []) is None, "empty corpus -> None"
141
142 if _key() is None:
143 print(f"cosine ok; no {CFG.embed.api_key_env} -> live embedding skipped")
144 return
145 a, b, c = embed([
146 "def add(x, y): return x + y",
147 "def sum_two(p, q): return p + q",
148 "the cat sat quietly on the warm windowsill",
149 ])
150 assert len(a) > 0, "empty embedding"
151 assert cosine(a, b) > cosine(a, c), "near-duplicate code should embed closer than prose"
152 print(f"dim={len(a)} sim(code,code)={cosine(a, b):.3f} > sim(code,prose)={cosine(a, c):.3f} ok")
153
154
155def main() -> None:
156 import argparse
157 import sys
158
159 ap = argparse.ArgumentParser(description="Featherless/Qwen embeddings of scraped Tangled diffs")
160 ap.add_argument("text", nargs="*", help="strings to embed; runs the self-check if omitted")
161 ap.add_argument("--build", action="store_true",
162 help="embed all PR diffs into diff_vectors (idempotent; safe to re-run as the scrape fills)")
163 ap.add_argument("--chunk", type=int, default=256, help="diffs per pass / write-lock window")
164 ap.add_argument("--limit", type=int, default=None, help="stop after this many this run (quick test)")
165 ap.add_argument("--watch", action="store_true",
166 help="keep embedding new diffs as the scraper adds them (sleep when caught up)")
167 ap.add_argument("--interval", type=float, default=10.0, help="--watch poll seconds")
168 args = ap.parse_args()
169 if args.build:
170 import time
171
172 from .db import connection, ensure_schema
173
174 ensure_schema()
175 if _key() is None:
176 print(f"[embed] no {CFG.embed.api_key_env} -> nothing embedded (slop signal stays absent)")
177 return
178 total = 0
179 while True:
180 # index_diffs manages its own short-lived connections (read -> embed off-lock -> write)
181 n = index_diffs(limit=args.chunk)
182 if n:
183 total += n
184 print(f"[embed] {total} diffs embedded", flush=True)
185 if args.limit and total >= args.limit:
186 break
187 continue
188 if args.watch: # caught up; wait for the scraper to add more
189 time.sleep(args.interval)
190 continue
191 break
192 with connection(read_only=True) as con:
193 done, remaining = con.execute(
194 "SELECT (SELECT count(*) FROM diff_vectors), "
195 "(SELECT count(*) FROM pull_requests WHERE diff_text IS NOT NULL "
196 " AND length(diff_text) > 0 AND pr_id NOT IN (SELECT pr_id FROM diff_vectors))"
197 ).fetchone()
198 print(f"[embed] done: +{total} this run; {done} embedded, {remaining} remaining ({CFG.embed.model})")
199 return
200 if not args.text:
201 demo()
202 return
203 vecs = embed(args.text)
204 if vecs is None:
205 sys.exit(f"set {CFG.embed.api_key_env} to embed")
206 for t, v in zip(args.text, vecs):
207 print(f"[{len(v)}d] {v[:4]}... :: {t[:60]}")
208
209
210if __name__ == "__main__":
211 main()