This repository has no description
1from __future__ import annotations
2
3import json
4from contextlib import contextmanager
5from pathlib import Path
6from typing import Any, Iterator
7
8import psycopg
9from psycopg.rows import dict_row
10
11MIGRATIONS_DIR = (
12 Path(__file__).resolve().parent.parent / "supabase" / "migrations"
13)
14
15
16def register_pgvector(conn: psycopg.Connection) -> None:
17 try:
18 from pgvector.psycopg import register_vector
19
20 register_vector(conn)
21 except (ImportError, psycopg.ProgrammingError):
22 pass
23
24
25@contextmanager
26def connect(dsn: str) -> Iterator[psycopg.Connection]:
27 with psycopg.connect(dsn, row_factory=dict_row) as conn:
28 register_pgvector(conn)
29 yield conn
30
31
32def init_schema(dsn: str) -> None:
33 paths = sorted(MIGRATIONS_DIR.glob("*.sql"))
34 if not paths:
35 raise RuntimeError(f"No migrations found in {MIGRATIONS_DIR}")
36 with connect(dsn) as conn:
37 for path in paths:
38 conn.execute(path.read_text())
39 conn.commit()
40
41
42def upsert_lexicon(
43 conn: psycopg.Connection,
44 *,
45 nsid: str,
46 lexicon_type: str,
47 definition: dict[str, Any],
48 source_path: str,
49) -> None:
50 conn.execute(
51 """
52 insert into tangled_lexicons (nsid, lexicon_type, definition, source_path, fetched_at)
53 values (%s, %s, %s::jsonb, %s, now())
54 on conflict (nsid) do update set
55 lexicon_type = excluded.lexicon_type,
56 definition = excluded.definition,
57 source_path = excluded.source_path,
58 fetched_at = now()
59 """,
60 (nsid, lexicon_type, json.dumps(definition), source_path),
61 )
62
63
64def upsert_knot(
65 conn: psycopg.Connection,
66 *,
67 hostname: str,
68 reachable: bool,
69 owner_did: str | None,
70 version: str | None,
71 capabilities: list[str] | None,
72 version_raw: dict[str, Any] | None,
73 owner_raw: dict[str, Any] | None,
74 probe_error: str | None,
75) -> None:
76 conn.execute(
77 """
78 insert into tangled_knots (
79 hostname, reachable, owner_did, version, capabilities,
80 version_raw, owner_raw, probe_error, last_probed_at
81 )
82 values (%s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s, now())
83 on conflict (hostname) do update set
84 reachable = excluded.reachable,
85 owner_did = excluded.owner_did,
86 version = excluded.version,
87 capabilities = excluded.capabilities,
88 version_raw = excluded.version_raw,
89 owner_raw = excluded.owner_raw,
90 probe_error = excluded.probe_error,
91 last_probed_at = now()
92 """,
93 (
94 hostname,
95 reachable,
96 owner_did,
97 version,
98 json.dumps(capabilities) if capabilities is not None else None,
99 json.dumps(version_raw) if version_raw is not None else None,
100 json.dumps(owner_raw) if owner_raw is not None else None,
101 probe_error,
102 ),
103 )
104
105
106def set_crawl_state(
107 conn: psycopg.Connection,
108 *,
109 key: str,
110 status: str,
111 meta: dict[str, Any] | None = None,
112 last_error: str | None = None,
113) -> None:
114 conn.execute(
115 """
116 insert into tangled_crawl_state (key, status, meta, last_error, updated_at)
117 values (%s, %s, %s::jsonb, %s, now())
118 on conflict (key) do update set
119 status = excluded.status,
120 meta = excluded.meta,
121 last_error = excluded.last_error,
122 updated_at = now()
123 """,
124 (key, status, json.dumps(meta) if meta else None, last_error),
125 )
126
127
128def count_lexicons(conn: psycopg.Connection) -> int:
129 row = conn.execute("select count(*) as n from tangled_lexicons").fetchone()
130 return int(row["n"]) if row else 0
131
132
133def count_knots(conn: psycopg.Connection) -> int:
134 row = conn.execute("select count(*) as n from tangled_knots").fetchone()
135 return int(row["n"]) if row else 0
136
137
138def count_pds_accounts(conn: psycopg.Connection) -> int:
139 row = conn.execute("select count(*) as n from tangled_pds_accounts").fetchone()
140 return int(row["n"]) if row else 0
141
142
143def count_repos(conn: psycopg.Connection) -> int:
144 row = conn.execute("select count(*) as n from tangled_repos").fetchone()
145 return int(row["n"]) if row else 0
146
147
148def count_accounts_with_repos(conn: psycopg.Connection) -> int:
149 row = conn.execute(
150 "select count(*) as n from tangled_pds_accounts where repo_record_count > 0"
151 ).fetchone()
152 return int(row["n"]) if row else 0
153
154
155def upsert_xrpc_snapshot(
156 conn: psycopg.Connection,
157 *,
158 method: str,
159 repo_did: str | None,
160 params: dict[str, Any],
161 params_hash: str,
162 payload: dict[str, Any] | list[Any] | None,
163 payload_encoding: str = "application/json",
164) -> None:
165 conn.execute(
166 """
167 insert into tangled_xrpc_snapshots (
168 method, repo_did, params, params_hash, payload, payload_encoding, fetched_at
169 )
170 values (%s, %s, %s::jsonb, %s, %s::jsonb, %s, now())
171 on conflict (method, repo_did, params_hash) do update set
172 params = excluded.params,
173 payload = excluded.payload,
174 payload_encoding = excluded.payload_encoding,
175 fetched_at = now()
176 """,
177 (
178 method,
179 repo_did,
180 json.dumps(params),
181 params_hash,
182 json.dumps(payload) if payload is not None else None,
183 payload_encoding,
184 ),
185 )
186
187
188def upsert_atproto_record(
189 conn: psycopg.Connection,
190 *,
191 uri: str,
192 author_did: str,
193 collection: str,
194 rkey: str,
195 payload: dict[str, Any],
196 cid: str | None = None,
197 repo_did: str | None = None,
198 subject_uri: str | None = None,
199) -> None:
200 conn.execute(
201 """
202 insert into tangled_atproto_records (
203 uri, author_did, collection, rkey, cid, payload, repo_did, subject_uri, fetched_at
204 )
205 values (%s, %s, %s, %s, %s, %s::jsonb, %s, %s, now())
206 on conflict (uri) do update set
207 cid = excluded.cid,
208 payload = excluded.payload,
209 repo_did = excluded.repo_did,
210 subject_uri = excluded.subject_uri,
211 fetched_at = now()
212 """,
213 (
214 uri,
215 author_did,
216 collection,
217 rkey,
218 cid,
219 json.dumps(payload),
220 repo_did,
221 subject_uri,
222 ),
223 )
224
225
226def count_xrpc_snapshots(conn: psycopg.Connection) -> int:
227 row = conn.execute("select count(*) as n from tangled_xrpc_snapshots").fetchone()
228 return int(row["n"]) if row else 0
229
230
231def table_counts(conn: psycopg.Connection) -> dict[str, int]:
232 tables = [
233 "tangled_lexicons",
234 "tangled_knots",
235 "tangled_pds_accounts",
236 "tangled_repos",
237 "tangled_identities",
238 "tangled_atproto_records",
239 "tangled_backlinks",
240 "tangled_xrpc_snapshots",
241 "tangled_git_archives",
242 "tangled_git_blobs",
243 "tangled_readmes",
244 "tangled_repo_collaborators",
245 "tangled_issues",
246 ]
247 counts: dict[str, int] = {}
248 for table in tables:
249 row = conn.execute(f"select count(*) as n from {table}").fetchone()
250 counts[table] = int(row["n"]) if row else 0
251 return counts