This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 7.3 kB View raw
1from __future__ import annotations 2 3import json 4from contextlib import contextmanager 5from pathlib import Path 6from typing import Any, Iterator 7 8import psycopg 9from psycopg.rows import dict_row 10 11MIGRATIONS_DIR = ( 12 Path(__file__).resolve().parent.parent / "supabase" / "migrations" 13) 14 15 16def register_pgvector(conn: psycopg.Connection) -> None: 17 try: 18 from pgvector.psycopg import register_vector 19 20 register_vector(conn) 21 except (ImportError, psycopg.ProgrammingError): 22 pass 23 24 25@contextmanager 26def connect(dsn: str) -> Iterator[psycopg.Connection]: 27 with psycopg.connect(dsn, row_factory=dict_row) as conn: 28 register_pgvector(conn) 29 yield conn 30 31 32def init_schema(dsn: str) -> None: 33 paths = sorted(MIGRATIONS_DIR.glob("*.sql")) 34 if not paths: 35 raise RuntimeError(f"No migrations found in {MIGRATIONS_DIR}") 36 with connect(dsn) as conn: 37 for path in paths: 38 conn.execute(path.read_text()) 39 conn.commit() 40 41 42def upsert_lexicon( 43 conn: psycopg.Connection, 44 *, 45 nsid: str, 46 lexicon_type: str, 47 definition: dict[str, Any], 48 source_path: str, 49) -> None: 50 conn.execute( 51 """ 52 insert into tangled_lexicons (nsid, lexicon_type, definition, source_path, fetched_at) 53 values (%s, %s, %s::jsonb, %s, now()) 54 on conflict (nsid) do update set 55 lexicon_type = excluded.lexicon_type, 56 definition = excluded.definition, 57 source_path = excluded.source_path, 58 fetched_at = now() 59 """, 60 (nsid, lexicon_type, json.dumps(definition), source_path), 61 ) 62 63 64def upsert_knot( 65 conn: psycopg.Connection, 66 *, 67 hostname: str, 68 reachable: bool, 69 owner_did: str | None, 70 version: str | None, 71 capabilities: list[str] | None, 72 version_raw: dict[str, Any] | None, 73 owner_raw: dict[str, Any] | None, 74 probe_error: str | None, 75) -> None: 76 conn.execute( 77 """ 78 insert into tangled_knots ( 79 hostname, reachable, owner_did, version, capabilities, 80 version_raw, owner_raw, probe_error, last_probed_at 81 ) 82 values (%s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s, now()) 83 on conflict (hostname) do update set 84 reachable = excluded.reachable, 85 owner_did = excluded.owner_did, 86 version = excluded.version, 87 capabilities = excluded.capabilities, 88 version_raw = excluded.version_raw, 89 owner_raw = excluded.owner_raw, 90 probe_error = excluded.probe_error, 91 last_probed_at = now() 92 """, 93 ( 94 hostname, 95 reachable, 96 owner_did, 97 version, 98 json.dumps(capabilities) if capabilities is not None else None, 99 json.dumps(version_raw) if version_raw is not None else None, 100 json.dumps(owner_raw) if owner_raw is not None else None, 101 probe_error, 102 ), 103 ) 104 105 106def set_crawl_state( 107 conn: psycopg.Connection, 108 *, 109 key: str, 110 status: str, 111 meta: dict[str, Any] | None = None, 112 last_error: str | None = None, 113) -> None: 114 conn.execute( 115 """ 116 insert into tangled_crawl_state (key, status, meta, last_error, updated_at) 117 values (%s, %s, %s::jsonb, %s, now()) 118 on conflict (key) do update set 119 status = excluded.status, 120 meta = excluded.meta, 121 last_error = excluded.last_error, 122 updated_at = now() 123 """, 124 (key, status, json.dumps(meta) if meta else None, last_error), 125 ) 126 127 128def count_lexicons(conn: psycopg.Connection) -> int: 129 row = conn.execute("select count(*) as n from tangled_lexicons").fetchone() 130 return int(row["n"]) if row else 0 131 132 133def count_knots(conn: psycopg.Connection) -> int: 134 row = conn.execute("select count(*) as n from tangled_knots").fetchone() 135 return int(row["n"]) if row else 0 136 137 138def count_pds_accounts(conn: psycopg.Connection) -> int: 139 row = conn.execute("select count(*) as n from tangled_pds_accounts").fetchone() 140 return int(row["n"]) if row else 0 141 142 143def count_repos(conn: psycopg.Connection) -> int: 144 row = conn.execute("select count(*) as n from tangled_repos").fetchone() 145 return int(row["n"]) if row else 0 146 147 148def count_accounts_with_repos(conn: psycopg.Connection) -> int: 149 row = conn.execute( 150 "select count(*) as n from tangled_pds_accounts where repo_record_count > 0" 151 ).fetchone() 152 return int(row["n"]) if row else 0 153 154 155def upsert_xrpc_snapshot( 156 conn: psycopg.Connection, 157 *, 158 method: str, 159 repo_did: str | None, 160 params: dict[str, Any], 161 params_hash: str, 162 payload: dict[str, Any] | list[Any] | None, 163 payload_encoding: str = "application/json", 164) -> None: 165 conn.execute( 166 """ 167 insert into tangled_xrpc_snapshots ( 168 method, repo_did, params, params_hash, payload, payload_encoding, fetched_at 169 ) 170 values (%s, %s, %s::jsonb, %s, %s::jsonb, %s, now()) 171 on conflict (method, repo_did, params_hash) do update set 172 params = excluded.params, 173 payload = excluded.payload, 174 payload_encoding = excluded.payload_encoding, 175 fetched_at = now() 176 """, 177 ( 178 method, 179 repo_did, 180 json.dumps(params), 181 params_hash, 182 json.dumps(payload) if payload is not None else None, 183 payload_encoding, 184 ), 185 ) 186 187 188def upsert_atproto_record( 189 conn: psycopg.Connection, 190 *, 191 uri: str, 192 author_did: str, 193 collection: str, 194 rkey: str, 195 payload: dict[str, Any], 196 cid: str | None = None, 197 repo_did: str | None = None, 198 subject_uri: str | None = None, 199) -> None: 200 conn.execute( 201 """ 202 insert into tangled_atproto_records ( 203 uri, author_did, collection, rkey, cid, payload, repo_did, subject_uri, fetched_at 204 ) 205 values (%s, %s, %s, %s, %s, %s::jsonb, %s, %s, now()) 206 on conflict (uri) do update set 207 cid = excluded.cid, 208 payload = excluded.payload, 209 repo_did = excluded.repo_did, 210 subject_uri = excluded.subject_uri, 211 fetched_at = now() 212 """, 213 ( 214 uri, 215 author_did, 216 collection, 217 rkey, 218 cid, 219 json.dumps(payload), 220 repo_did, 221 subject_uri, 222 ), 223 ) 224 225 226def count_xrpc_snapshots(conn: psycopg.Connection) -> int: 227 row = conn.execute("select count(*) as n from tangled_xrpc_snapshots").fetchone() 228 return int(row["n"]) if row else 0 229 230 231def table_counts(conn: psycopg.Connection) -> dict[str, int]: 232 tables = [ 233 "tangled_lexicons", 234 "tangled_knots", 235 "tangled_pds_accounts", 236 "tangled_repos", 237 "tangled_identities", 238 "tangled_atproto_records", 239 "tangled_backlinks", 240 "tangled_xrpc_snapshots", 241 "tangled_git_archives", 242 "tangled_git_blobs", 243 "tangled_readmes", 244 "tangled_repo_collaborators", 245 "tangled_issues", 246 ] 247 counts: dict[str, int] = {} 248 for table in tables: 249 row = conn.execute(f"select count(*) as n from {table}").fetchone() 250 counts[table] = int(row["n"]) if row else 0 251 return counts