an app to share curated trails
sidetrail.app
1/**
2 * Sync the index with the live network.
3 *
4 * The jetstream ingester only sees records created while it's running, so a
5 * fresh database (or one whose ingester was down longer than jetstream's
6 * replay window) drifts from reality. This script reconciles: after it runs,
7 * the index mirrors the lexicon-valid sidetrail records currently on the
8 * network.
9 *
10 * - Discovers repos via the relay (plus authors already in the DB)
11 * - Fetches their records from each PDS and upserts the lexicon-valid ones,
12 * with the same semantics as ingester/src/handler.ts
13 * - Deletes local rows that are no longer on the network or fail validation
14 * - Never deletes rows belonging to a repo it failed to reach
15 *
16 * Idempotent and safe to rerun anytime.
17 *
18 * Usage:
19 * npm run sync # uses DATABASE_URL from .env
20 * DATABASE_URL=postgres://... npm run sync
21 */
22import fs from "node:fs";
23import pg from "pg";
24import { validateRecord, type IndexedCollection } from "../ingester/src/lexicons.js";
25
26const RELAY = "https://relay1.us-east.bsky.network";
27const COLLECTIONS: IndexedCollection[] = [
28 "app.sidetrail.trail",
29 "app.sidetrail.walk",
30 "app.sidetrail.completion",
31];
32const TABLE_FOR: Record<IndexedCollection, string> = {
33 "app.sidetrail.trail": "trails",
34 "app.sidetrail.walk": "walks",
35 "app.sidetrail.completion": "completions",
36};
37
38// PDS errors that definitively mean "this repo has no content to index"
39// (as opposed to transient failures, which must not trigger pruning)
40const REPO_GONE = /RepoNotFound|RepoDeactivated|RepoTakendown|RepoSuspended/;
41
42const databaseUrl =
43 process.env.DATABASE_URL ??
44 fs
45 .readFileSync(new URL("../.env", import.meta.url), "utf8")
46 .match(/^DATABASE_URL="?([^"\n]+)/m)?.[1];
47
48type PdsRecord = { uri: string; cid: string; value: Record<string, unknown> };
49
50async function getJson(url: URL | string): Promise<any> {
51 const res = await fetch(url, { signal: AbortSignal.timeout(15000) });
52 if (!res.ok) throw new Error(`${res.status} ${await res.text().catch(() => "")} ${url}`);
53 return res.json();
54}
55
56async function listReposByCollection(collection: string): Promise<Set<string>> {
57 const dids = new Set<string>();
58 let cursor: string | undefined;
59 do {
60 const url = new URL(`${RELAY}/xrpc/com.atproto.sync.listReposByCollection`);
61 url.searchParams.set("collection", collection);
62 url.searchParams.set("limit", "500");
63 if (cursor) url.searchParams.set("cursor", cursor);
64 const page = await getJson(url);
65 for (const r of page.repos) dids.add(r.did);
66 cursor = page.cursor;
67 } while (cursor);
68 return dids;
69}
70
71async function resolvePds(did: string): Promise<string> {
72 const doc = did.startsWith("did:web:")
73 ? await getJson(`https://${did.slice("did:web:".length)}/.well-known/did.json`)
74 : await getJson(`https://plc.directory/${did}`);
75 const endpoint = doc.service?.find((s: any) => s.id === "#atproto_pds")?.serviceEndpoint;
76 if (!endpoint) throw new Error(`no PDS endpoint for ${did}`);
77 return endpoint;
78}
79
80async function listRecords(
81 pdsEndpoint: string,
82 did: string,
83 collection: string,
84): Promise<PdsRecord[]> {
85 const records: PdsRecord[] = [];
86 let cursor: string | undefined;
87 do {
88 const url = new URL(`${pdsEndpoint}/xrpc/com.atproto.repo.listRecords`);
89 url.searchParams.set("repo", did);
90 url.searchParams.set("collection", collection);
91 url.searchParams.set("limit", "100");
92 if (cursor) url.searchParams.set("cursor", cursor);
93 const page = await getJson(url);
94 records.push(...page.records);
95 cursor = page.cursor;
96 } while (cursor);
97 return records;
98}
99
100async function upsertRecord(
101 db: pg.Client,
102 collection: IndexedCollection,
103 did: string,
104 { uri, cid, value }: PdsRecord,
105): Promise<void> {
106 const rkey = uri.split("/").pop();
107 const createdAt = new Date((value.createdAt as string) ?? Date.now());
108 const table = TABLE_FOR[collection];
109 if (table === "trails") {
110 await db.query(
111 `INSERT INTO trails (uri, cid, author_did, rkey, record, created_at, indexed_at)
112 VALUES ($1, $2, $3, $4, $5, $6, NOW())
113 ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $5, indexed_at = NOW()`,
114 [uri, cid, did, rkey, JSON.stringify(value), createdAt],
115 );
116 } else {
117 await db.query(
118 `INSERT INTO ${table} (uri, cid, author_did, rkey, trail_uri, record, created_at, indexed_at)
119 VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
120 ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $6, indexed_at = NOW()`,
121 [
122 uri,
123 cid,
124 did,
125 rkey,
126 (value.trail as { uri?: string } | undefined)?.uri ?? "",
127 JSON.stringify(value),
128 createdAt,
129 ],
130 );
131 }
132}
133
134async function main() {
135 const db = new pg.Client({ connectionString: databaseUrl });
136 await db.connect();
137
138 // Repos to sync: everything the relay knows about, plus authors already
139 // indexed locally (so records deleted from the network get pruned even if
140 // their author no longer appears in the relay's listing)
141 const [relayDidSets, localDids, inactive] = await Promise.all([
142 Promise.all(COLLECTIONS.map(listReposByCollection)),
143 db.query(
144 `SELECT DISTINCT author_did AS did FROM trails
145 UNION SELECT DISTINCT author_did FROM walks
146 UNION SELECT DISTINCT author_did FROM completions`,
147 ),
148 db.query(`SELECT did FROM accounts WHERE active = 0`),
149 ]);
150 const skip = new Set<string>(inactive.rows.map((r) => r.did));
151 const dids = [
152 ...new Set([...relayDidSets.flatMap((s) => [...s]), ...localDids.rows.map((r) => r.did)]),
153 ].filter((did) => !skip.has(did));
154 console.log(`syncing ${dids.length} repos`);
155
156 // uris seen on the network and valid, per collection; only repos in
157 // `fetchedOk` participate in pruning
158 const keep: Record<IndexedCollection, Set<string>> = {
159 "app.sidetrail.trail": new Set(),
160 "app.sidetrail.walk": new Set(),
161 "app.sidetrail.completion": new Set(),
162 };
163 const fetchedOk = new Set<string>();
164 const failures: string[] = [];
165 let rejected = 0;
166
167 const queue = [...dids];
168 let done = 0;
169 await Promise.all(
170 Array.from({ length: 8 }, async () => {
171 let did: string | undefined;
172 while ((did = queue.shift())) {
173 done++;
174 try {
175 let pdsEndpoint: string;
176 try {
177 pdsEndpoint = await resolvePds(did);
178 } catch (err) {
179 if (REPO_GONE.test(String(err))) {
180 fetchedOk.add(did); // identity gone: prune everything local
181 console.log(`[${done}/${dids.length}] ${did}: repo gone`);
182 continue;
183 }
184 throw err;
185 }
186 const counts: string[] = [];
187 for (const collection of COLLECTIONS) {
188 let records: PdsRecord[];
189 try {
190 records = await listRecords(pdsEndpoint, did, collection);
191 } catch (err) {
192 if (REPO_GONE.test(String(err))) {
193 records = []; // repo gone: nothing to keep, prune local rows
194 } else {
195 throw err;
196 }
197 }
198 for (const record of records) {
199 const validation = validateRecord(collection, record.value);
200 if (!validation.success) {
201 rejected++;
202 console.log(` rejecting ${collection} ${record.uri}: ${validation.reason}`);
203 continue;
204 }
205 await upsertRecord(db, collection, did, record);
206 keep[collection].add(record.uri);
207 }
208 if (records.length > 0) counts.push(`${collection.split(".").pop()}=${records.length}`);
209 }
210 fetchedOk.add(did);
211 await db.query(
212 `INSERT INTO accounts (did, active, seq, updated_at) VALUES ($1, 1, 0, NOW())
213 ON CONFLICT (did) DO NOTHING`,
214 [did],
215 );
216 console.log(`[${done}/${dids.length}] ${did}:`, counts.join(" ") || "no records");
217 } catch (err) {
218 failures.push(did);
219 console.error(`[${done}/${dids.length}] ${did}: FAILED - ${(err as Error).message}`);
220 }
221 }
222 }),
223 );
224
225 // Prune: rows from successfully-synced repos that aren't in the keep set
226 // (deleted from the network, or no longer lexicon-valid)
227 for (const collection of COLLECTIONS) {
228 const table = TABLE_FOR[collection];
229 const { rows } = await db.query(
230 `DELETE FROM ${table}
231 WHERE author_did = ANY($1) AND NOT (uri = ANY($2))
232 RETURNING uri`,
233 [[...fetchedOk], [...keep[collection]]],
234 );
235 for (const row of rows) console.log(`pruned ${row.uri}`);
236 const { rows: count } = await db.query(`SELECT COUNT(*) FROM ${table}`);
237 console.log(`${table}: ${count[0].count} rows (${rows.length} pruned)`);
238 }
239 console.log(`${rejected} records rejected by lexicon validation`);
240 if (failures.length > 0) {
241 console.error(
242 `\n${failures.length} repos unreachable, left untouched (rerun to retry): ${failures.join(", ")}`,
243 );
244 process.exitCode = 1;
245 }
246
247 await db.end();
248}
249
250main();