an app to share curated trails sidetrail.app
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.1 kB View raw
1/** 2 * Sync the index with the live network. 3 * 4 * The jetstream ingester only sees records created while it's running, so a 5 * fresh database (or one whose ingester was down longer than jetstream's 6 * replay window) drifts from reality. This script reconciles: after it runs, 7 * the index mirrors the lexicon-valid sidetrail records currently on the 8 * network. 9 * 10 * - Discovers repos via the relay (plus authors already in the DB) 11 * - Fetches their records from each PDS and upserts the lexicon-valid ones, 12 * with the same semantics as ingester/src/handler.ts 13 * - Deletes local rows that are no longer on the network or fail validation 14 * - Never deletes rows belonging to a repo it failed to reach 15 * 16 * Idempotent and safe to rerun anytime. 17 * 18 * Usage: 19 * npm run sync # uses DATABASE_URL from .env 20 * DATABASE_URL=postgres://... npm run sync 21 */ 22import fs from "node:fs"; 23import pg from "pg"; 24import { validateRecord, type IndexedCollection } from "../ingester/src/lexicons.js"; 25 26const RELAY = "https://relay1.us-east.bsky.network"; 27const COLLECTIONS: IndexedCollection[] = [ 28 "app.sidetrail.trail", 29 "app.sidetrail.walk", 30 "app.sidetrail.completion", 31]; 32const TABLE_FOR: Record<IndexedCollection, string> = { 33 "app.sidetrail.trail": "trails", 34 "app.sidetrail.walk": "walks", 35 "app.sidetrail.completion": "completions", 36}; 37 38// PDS errors that definitively mean "this repo has no content to index" 39// (as opposed to transient failures, which must not trigger pruning) 40const REPO_GONE = /RepoNotFound|RepoDeactivated|RepoTakendown|RepoSuspended/; 41 42const databaseUrl = 43 process.env.DATABASE_URL ?? 44 fs 45 .readFileSync(new URL("../.env", import.meta.url), "utf8") 46 .match(/^DATABASE_URL="?([^"\n]+)/m)?.[1]; 47 48type PdsRecord = { uri: string; cid: string; value: Record<string, unknown> }; 49 50async function getJson(url: URL | string): Promise<any> { 51 const res = await fetch(url, { signal: AbortSignal.timeout(15000) }); 52 if (!res.ok) throw new Error(`${res.status} ${await res.text().catch(() => "")} ${url}`); 53 return res.json(); 54} 55 56async function listReposByCollection(collection: string): Promise<Set<string>> { 57 const dids = new Set<string>(); 58 let cursor: string | undefined; 59 do { 60 const url = new URL(`${RELAY}/xrpc/com.atproto.sync.listReposByCollection`); 61 url.searchParams.set("collection", collection); 62 url.searchParams.set("limit", "500"); 63 if (cursor) url.searchParams.set("cursor", cursor); 64 const page = await getJson(url); 65 for (const r of page.repos) dids.add(r.did); 66 cursor = page.cursor; 67 } while (cursor); 68 return dids; 69} 70 71async function resolvePds(did: string): Promise<string> { 72 const doc = did.startsWith("did:web:") 73 ? await getJson(`https://${did.slice("did:web:".length)}/.well-known/did.json`) 74 : await getJson(`https://plc.directory/${did}`); 75 const endpoint = doc.service?.find((s: any) => s.id === "#atproto_pds")?.serviceEndpoint; 76 if (!endpoint) throw new Error(`no PDS endpoint for ${did}`); 77 return endpoint; 78} 79 80async function listRecords( 81 pdsEndpoint: string, 82 did: string, 83 collection: string, 84): Promise<PdsRecord[]> { 85 const records: PdsRecord[] = []; 86 let cursor: string | undefined; 87 do { 88 const url = new URL(`${pdsEndpoint}/xrpc/com.atproto.repo.listRecords`); 89 url.searchParams.set("repo", did); 90 url.searchParams.set("collection", collection); 91 url.searchParams.set("limit", "100"); 92 if (cursor) url.searchParams.set("cursor", cursor); 93 const page = await getJson(url); 94 records.push(...page.records); 95 cursor = page.cursor; 96 } while (cursor); 97 return records; 98} 99 100async function upsertRecord( 101 db: pg.Client, 102 collection: IndexedCollection, 103 did: string, 104 { uri, cid, value }: PdsRecord, 105): Promise<void> { 106 const rkey = uri.split("/").pop(); 107 const createdAt = new Date((value.createdAt as string) ?? Date.now()); 108 const table = TABLE_FOR[collection]; 109 if (table === "trails") { 110 await db.query( 111 `INSERT INTO trails (uri, cid, author_did, rkey, record, created_at, indexed_at) 112 VALUES ($1, $2, $3, $4, $5, $6, NOW()) 113 ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $5, indexed_at = NOW()`, 114 [uri, cid, did, rkey, JSON.stringify(value), createdAt], 115 ); 116 } else { 117 await db.query( 118 `INSERT INTO ${table} (uri, cid, author_did, rkey, trail_uri, record, created_at, indexed_at) 119 VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) 120 ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $6, indexed_at = NOW()`, 121 [ 122 uri, 123 cid, 124 did, 125 rkey, 126 (value.trail as { uri?: string } | undefined)?.uri ?? "", 127 JSON.stringify(value), 128 createdAt, 129 ], 130 ); 131 } 132} 133 134async function main() { 135 const db = new pg.Client({ connectionString: databaseUrl }); 136 await db.connect(); 137 138 // Repos to sync: everything the relay knows about, plus authors already 139 // indexed locally (so records deleted from the network get pruned even if 140 // their author no longer appears in the relay's listing) 141 const [relayDidSets, localDids, inactive] = await Promise.all([ 142 Promise.all(COLLECTIONS.map(listReposByCollection)), 143 db.query( 144 `SELECT DISTINCT author_did AS did FROM trails 145 UNION SELECT DISTINCT author_did FROM walks 146 UNION SELECT DISTINCT author_did FROM completions`, 147 ), 148 db.query(`SELECT did FROM accounts WHERE active = 0`), 149 ]); 150 const skip = new Set<string>(inactive.rows.map((r) => r.did)); 151 const dids = [ 152 ...new Set([...relayDidSets.flatMap((s) => [...s]), ...localDids.rows.map((r) => r.did)]), 153 ].filter((did) => !skip.has(did)); 154 console.log(`syncing ${dids.length} repos`); 155 156 // uris seen on the network and valid, per collection; only repos in 157 // `fetchedOk` participate in pruning 158 const keep: Record<IndexedCollection, Set<string>> = { 159 "app.sidetrail.trail": new Set(), 160 "app.sidetrail.walk": new Set(), 161 "app.sidetrail.completion": new Set(), 162 }; 163 const fetchedOk = new Set<string>(); 164 const failures: string[] = []; 165 let rejected = 0; 166 167 const queue = [...dids]; 168 let done = 0; 169 await Promise.all( 170 Array.from({ length: 8 }, async () => { 171 let did: string | undefined; 172 while ((did = queue.shift())) { 173 done++; 174 try { 175 let pdsEndpoint: string; 176 try { 177 pdsEndpoint = await resolvePds(did); 178 } catch (err) { 179 if (REPO_GONE.test(String(err))) { 180 fetchedOk.add(did); // identity gone: prune everything local 181 console.log(`[${done}/${dids.length}] ${did}: repo gone`); 182 continue; 183 } 184 throw err; 185 } 186 const counts: string[] = []; 187 for (const collection of COLLECTIONS) { 188 let records: PdsRecord[]; 189 try { 190 records = await listRecords(pdsEndpoint, did, collection); 191 } catch (err) { 192 if (REPO_GONE.test(String(err))) { 193 records = []; // repo gone: nothing to keep, prune local rows 194 } else { 195 throw err; 196 } 197 } 198 for (const record of records) { 199 const validation = validateRecord(collection, record.value); 200 if (!validation.success) { 201 rejected++; 202 console.log(` rejecting ${collection} ${record.uri}: ${validation.reason}`); 203 continue; 204 } 205 await upsertRecord(db, collection, did, record); 206 keep[collection].add(record.uri); 207 } 208 if (records.length > 0) counts.push(`${collection.split(".").pop()}=${records.length}`); 209 } 210 fetchedOk.add(did); 211 await db.query( 212 `INSERT INTO accounts (did, active, seq, updated_at) VALUES ($1, 1, 0, NOW()) 213 ON CONFLICT (did) DO NOTHING`, 214 [did], 215 ); 216 console.log(`[${done}/${dids.length}] ${did}:`, counts.join(" ") || "no records"); 217 } catch (err) { 218 failures.push(did); 219 console.error(`[${done}/${dids.length}] ${did}: FAILED - ${(err as Error).message}`); 220 } 221 } 222 }), 223 ); 224 225 // Prune: rows from successfully-synced repos that aren't in the keep set 226 // (deleted from the network, or no longer lexicon-valid) 227 for (const collection of COLLECTIONS) { 228 const table = TABLE_FOR[collection]; 229 const { rows } = await db.query( 230 `DELETE FROM ${table} 231 WHERE author_did = ANY($1) AND NOT (uri = ANY($2)) 232 RETURNING uri`, 233 [[...fetchedOk], [...keep[collection]]], 234 ); 235 for (const row of rows) console.log(`pruned ${row.uri}`); 236 const { rows: count } = await db.query(`SELECT COUNT(*) FROM ${table}`); 237 console.log(`${table}: ${count[0].count} rows (${rows.length} pruned)`); 238 } 239 console.log(`${rejected} records rejected by lexicon validation`); 240 if (failures.length > 0) { 241 console.error( 242 `\n${failures.length} repos unreachable, left untouched (rerun to retry): ${failures.join(", ")}`, 243 ); 244 process.exitCode = 1; 245 } 246 247 await db.end(); 248} 249 250main();