···
1
1
# Sidetrail
2
2
3
3
-
[Sidetrail](https://sidetrail.app/) is an app to create and share "trails". Create sequential paths with 2-12 stops, walk through them one step at a time, and share them with others.
3
3
+
[Sidetrail](https://sidetrail.app/) is an app to create and share "trails". Create sequential paths with 2-24 stops, walk through them one step at a time, and share them with others.
4
4
5
5
Built on [AT](https://atproto.com/) protocol: trails, walks, and completions are stored in users' repositories and synced via Jetstream.
6
6
···
26
26
# Install dependencies
27
27
npm install
28
28
29
29
+
# Start Redis
30
30
+
brew services start redis
31
31
+
32
32
+
# Create the database
33
33
+
createdb sidetrail
34
34
+
29
35
# Copy environment file and configure DATABASE_URL and REDIS_URL
30
36
cp .env.example .env
31
37
32
38
# Push database schema
33
39
npm run db:push
40
40
+
41
41
+
# Populate the database from the live network (see Syncing below)
42
42
+
npm run sync
34
43
35
44
# Start all services (run in separate terminals)
36
45
npm run dev # Next.js app on :3000
···
38
47
npm run dev:realtime # Realtime server
39
48
```
40
49
41
41
-
Then open `127.0.0.1:300` (it has to be `127.0.0.1`, not `localhost`).
50
50
+
Then open `127.0.0.1:3000` (it has to be `127.0.0.1`, not `localhost`).
51
51
+
52
52
+
### Syncing
53
53
+
54
54
+
The ingester only indexes records as they stream past on Jetstream, and
55
55
+
Jetstream's replay window is only a few days. Any fresh database — or one
56
56
+
whose ingester was down for longer than the window — has drifted from the
57
57
+
network.
58
58
+
59
59
+
`npm run sync` reconciles: it asks the relay which repos contain
60
60
+
`app.sidetrail.*` records, fetches them from each user's PDS, and makes the
61
61
+
index mirror the lexicon-valid records currently on the network — adding
62
62
+
what's missing and pruning what was deleted or doesn't validate. Repos it
63
63
+
fails to reach are left untouched. Idempotent, safe to rerun anytime
64
64
+
(including after changing a lexicon).
65
65
+
66
66
+
```bash
67
67
+
npm run sync # local DB (.env)
68
68
+
DATABASE_URL=postgres://... npm run sync # any other DB
69
69
+
```
42
70
43
71
### Environment Variables
44
72
···
82
110
83
111
npm run test # Run tests
84
112
npm run check # Lint + typecheck
113
113
+
npm run sync # Sync the index with the live network
85
114
```
86
115
87
116
## Deployment
88
117
89
89
-
Uses [Railway](https://railway.app/) with three services. Deploy with:
118
118
+
Uses [Railway](https://railway.app/) with three services. One-time setup:
119
119
+
120
120
+
```bash
121
121
+
railway link --project sidetrail
122
122
+
```
123
123
+
124
124
+
Then deploy with:
90
125
91
126
```bash
92
127
npm run deploy:app
93
128
npm run deploy:ingester
94
129
npm run deploy:realtime
130
130
+
npm run deploy:all # all three, sequentially
95
131
```
96
132
97
133
## License
···
4
4
5
5
## `app.sidetrail.trail`
6
6
7
7
-
A trail with 2-12 embedded stops.
7
7
+
A trail with 2-24 embedded stops.
8
8
9
9
```json
10
10
{
···
8
8
"realtime"
9
9
],
10
10
"scripts": {
11
11
-
"dev": "next dev --turbopack",
11
11
+
"dev": "next dev",
12
12
"dev:ingester": "npm run -w sidetrail-ingester dev",
13
13
"dev:realtime": "npm run -w sidetrail-realtime dev",
14
14
"build": "rm -rf .next/cache/* && next build --webpack",
···
23
23
"lex:build": "lex build --lexicons ./lexicons --out ./lib/lexicons --clear --importExt \"\"",
24
24
"db:push": "drizzle-kit push",
25
25
"db:studio": "drizzle-kit studio",
26
26
+
"sync": "tsx scripts/sync.ts",
26
27
"test": "npx vitest run",
27
28
"test:watch": "npx vitest",
28
29
"deploy:app": "railway up --service sidetrail --ci",
···
1
1
+
/**
2
2
+
* Sync the index with the live network.
3
3
+
*
4
4
+
* The jetstream ingester only sees records created while it's running, so a
5
5
+
* fresh database (or one whose ingester was down longer than jetstream's
6
6
+
* replay window) drifts from reality. This script reconciles: after it runs,
7
7
+
* the index mirrors the lexicon-valid sidetrail records currently on the
8
8
+
* network.
9
9
+
*
10
10
+
* - Discovers repos via the relay (plus authors already in the DB)
11
11
+
* - Fetches their records from each PDS and upserts the lexicon-valid ones,
12
12
+
* with the same semantics as ingester/src/handler.ts
13
13
+
* - Deletes local rows that are no longer on the network or fail validation
14
14
+
* - Never deletes rows belonging to a repo it failed to reach
15
15
+
*
16
16
+
* Idempotent and safe to rerun anytime.
17
17
+
*
18
18
+
* Usage:
19
19
+
* npm run sync # uses DATABASE_URL from .env
20
20
+
* DATABASE_URL=postgres://... npm run sync
21
21
+
*/
22
22
+
import fs from "node:fs";
23
23
+
import pg from "pg";
24
24
+
import { validateRecord, type IndexedCollection } from "../ingester/src/lexicons.js";
25
25
+
26
26
+
const RELAY = "https://relay1.us-east.bsky.network";
27
27
+
const COLLECTIONS: IndexedCollection[] = [
28
28
+
"app.sidetrail.trail",
29
29
+
"app.sidetrail.walk",
30
30
+
"app.sidetrail.completion",
31
31
+
];
32
32
+
const TABLE_FOR: Record<IndexedCollection, string> = {
33
33
+
"app.sidetrail.trail": "trails",
34
34
+
"app.sidetrail.walk": "walks",
35
35
+
"app.sidetrail.completion": "completions",
36
36
+
};
37
37
+
38
38
+
// PDS errors that definitively mean "this repo has no content to index"
39
39
+
// (as opposed to transient failures, which must not trigger pruning)
40
40
+
const REPO_GONE = /RepoNotFound|RepoDeactivated|RepoTakendown|RepoSuspended/;
41
41
+
42
42
+
const databaseUrl =
43
43
+
process.env.DATABASE_URL ??
44
44
+
fs
45
45
+
.readFileSync(new URL("../.env", import.meta.url), "utf8")
46
46
+
.match(/^DATABASE_URL="?([^"\n]+)/m)?.[1];
47
47
+
48
48
+
type PdsRecord = { uri: string; cid: string; value: Record<string, unknown> };
49
49
+
50
50
+
async function getJson(url: URL | string): Promise<any> {
51
51
+
const res = await fetch(url, { signal: AbortSignal.timeout(15000) });
52
52
+
if (!res.ok) throw new Error(`${res.status} ${await res.text().catch(() => "")} ${url}`);
53
53
+
return res.json();
54
54
+
}
55
55
+
56
56
+
async function listReposByCollection(collection: string): Promise<Set<string>> {
57
57
+
const dids = new Set<string>();
58
58
+
let cursor: string | undefined;
59
59
+
do {
60
60
+
const url = new URL(`${RELAY}/xrpc/com.atproto.sync.listReposByCollection`);
61
61
+
url.searchParams.set("collection", collection);
62
62
+
url.searchParams.set("limit", "500");
63
63
+
if (cursor) url.searchParams.set("cursor", cursor);
64
64
+
const page = await getJson(url);
65
65
+
for (const r of page.repos) dids.add(r.did);
66
66
+
cursor = page.cursor;
67
67
+
} while (cursor);
68
68
+
return dids;
69
69
+
}
70
70
+
71
71
+
async function resolvePds(did: string): Promise<string> {
72
72
+
const doc = did.startsWith("did:web:")
73
73
+
? await getJson(`https://${did.slice("did:web:".length)}/.well-known/did.json`)
74
74
+
: await getJson(`https://plc.directory/${did}`);
75
75
+
const endpoint = doc.service?.find((s: any) => s.id === "#atproto_pds")?.serviceEndpoint;
76
76
+
if (!endpoint) throw new Error(`no PDS endpoint for ${did}`);
77
77
+
return endpoint;
78
78
+
}
79
79
+
80
80
+
async function listRecords(
81
81
+
pdsEndpoint: string,
82
82
+
did: string,
83
83
+
collection: string,
84
84
+
): Promise<PdsRecord[]> {
85
85
+
const records: PdsRecord[] = [];
86
86
+
let cursor: string | undefined;
87
87
+
do {
88
88
+
const url = new URL(`${pdsEndpoint}/xrpc/com.atproto.repo.listRecords`);
89
89
+
url.searchParams.set("repo", did);
90
90
+
url.searchParams.set("collection", collection);
91
91
+
url.searchParams.set("limit", "100");
92
92
+
if (cursor) url.searchParams.set("cursor", cursor);
93
93
+
const page = await getJson(url);
94
94
+
records.push(...page.records);
95
95
+
cursor = page.cursor;
96
96
+
} while (cursor);
97
97
+
return records;
98
98
+
}
99
99
+
100
100
+
async function upsertRecord(
101
101
+
db: pg.Client,
102
102
+
collection: IndexedCollection,
103
103
+
did: string,
104
104
+
{ uri, cid, value }: PdsRecord,
105
105
+
): Promise<void> {
106
106
+
const rkey = uri.split("/").pop();
107
107
+
const createdAt = new Date((value.createdAt as string) ?? Date.now());
108
108
+
const table = TABLE_FOR[collection];
109
109
+
if (table === "trails") {
110
110
+
await db.query(
111
111
+
`INSERT INTO trails (uri, cid, author_did, rkey, record, created_at, indexed_at)
112
112
+
VALUES ($1, $2, $3, $4, $5, $6, NOW())
113
113
+
ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $5, indexed_at = NOW()`,
114
114
+
[uri, cid, did, rkey, JSON.stringify(value), createdAt],
115
115
+
);
116
116
+
} else {
117
117
+
await db.query(
118
118
+
`INSERT INTO ${table} (uri, cid, author_did, rkey, trail_uri, record, created_at, indexed_at)
119
119
+
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
120
120
+
ON CONFLICT (uri) DO UPDATE SET cid = $2, record = $6, indexed_at = NOW()`,
121
121
+
[
122
122
+
uri,
123
123
+
cid,
124
124
+
did,
125
125
+
rkey,
126
126
+
(value.trail as { uri?: string } | undefined)?.uri ?? "",
127
127
+
JSON.stringify(value),
128
128
+
createdAt,
129
129
+
],
130
130
+
);
131
131
+
}
132
132
+
}
133
133
+
134
134
+
async function main() {
135
135
+
const db = new pg.Client({ connectionString: databaseUrl });
136
136
+
await db.connect();
137
137
+
138
138
+
// Repos to sync: everything the relay knows about, plus authors already
139
139
+
// indexed locally (so records deleted from the network get pruned even if
140
140
+
// their author no longer appears in the relay's listing)
141
141
+
const [relayDidSets, localDids, inactive] = await Promise.all([
142
142
+
Promise.all(COLLECTIONS.map(listReposByCollection)),
143
143
+
db.query(
144
144
+
`SELECT DISTINCT author_did AS did FROM trails
145
145
+
UNION SELECT DISTINCT author_did FROM walks
146
146
+
UNION SELECT DISTINCT author_did FROM completions`,
147
147
+
),
148
148
+
db.query(`SELECT did FROM accounts WHERE active = 0`),
149
149
+
]);
150
150
+
const skip = new Set<string>(inactive.rows.map((r) => r.did));
151
151
+
const dids = [
152
152
+
...new Set([...relayDidSets.flatMap((s) => [...s]), ...localDids.rows.map((r) => r.did)]),
153
153
+
].filter((did) => !skip.has(did));
154
154
+
console.log(`syncing ${dids.length} repos`);
155
155
+
156
156
+
// uris seen on the network and valid, per collection; only repos in
157
157
+
// `fetchedOk` participate in pruning
158
158
+
const keep: Record<IndexedCollection, Set<string>> = {
159
159
+
"app.sidetrail.trail": new Set(),
160
160
+
"app.sidetrail.walk": new Set(),
161
161
+
"app.sidetrail.completion": new Set(),
162
162
+
};
163
163
+
const fetchedOk = new Set<string>();
164
164
+
const failures: string[] = [];
165
165
+
let rejected = 0;
166
166
+
167
167
+
const queue = [...dids];
168
168
+
let done = 0;
169
169
+
await Promise.all(
170
170
+
Array.from({ length: 8 }, async () => {
171
171
+
let did: string | undefined;
172
172
+
while ((did = queue.shift())) {
173
173
+
done++;
174
174
+
try {
175
175
+
let pdsEndpoint: string;
176
176
+
try {
177
177
+
pdsEndpoint = await resolvePds(did);
178
178
+
} catch (err) {
179
179
+
if (REPO_GONE.test(String(err))) {
180
180
+
fetchedOk.add(did); // identity gone: prune everything local
181
181
+
console.log(`[${done}/${dids.length}] ${did}: repo gone`);
182
182
+
continue;
183
183
+
}
184
184
+
throw err;
185
185
+
}
186
186
+
const counts: string[] = [];
187
187
+
for (const collection of COLLECTIONS) {
188
188
+
let records: PdsRecord[];
189
189
+
try {
190
190
+
records = await listRecords(pdsEndpoint, did, collection);
191
191
+
} catch (err) {
192
192
+
if (REPO_GONE.test(String(err))) {
193
193
+
records = []; // repo gone: nothing to keep, prune local rows
194
194
+
} else {
195
195
+
throw err;
196
196
+
}
197
197
+
}
198
198
+
for (const record of records) {
199
199
+
const validation = validateRecord(collection, record.value);
200
200
+
if (!validation.success) {
201
201
+
rejected++;
202
202
+
console.log(` rejecting ${collection} ${record.uri}: ${validation.reason}`);
203
203
+
continue;
204
204
+
}
205
205
+
await upsertRecord(db, collection, did, record);
206
206
+
keep[collection].add(record.uri);
207
207
+
}
208
208
+
if (records.length > 0) counts.push(`${collection.split(".").pop()}=${records.length}`);
209
209
+
}
210
210
+
fetchedOk.add(did);
211
211
+
await db.query(
212
212
+
`INSERT INTO accounts (did, active, seq, updated_at) VALUES ($1, 1, 0, NOW())
213
213
+
ON CONFLICT (did) DO NOTHING`,
214
214
+
[did],
215
215
+
);
216
216
+
console.log(`[${done}/${dids.length}] ${did}:`, counts.join(" ") || "no records");
217
217
+
} catch (err) {
218
218
+
failures.push(did);
219
219
+
console.error(`[${done}/${dids.length}] ${did}: FAILED - ${(err as Error).message}`);
220
220
+
}
221
221
+
}
222
222
+
}),
223
223
+
);
224
224
+
225
225
+
// Prune: rows from successfully-synced repos that aren't in the keep set
226
226
+
// (deleted from the network, or no longer lexicon-valid)
227
227
+
for (const collection of COLLECTIONS) {
228
228
+
const table = TABLE_FOR[collection];
229
229
+
const { rows } = await db.query(
230
230
+
`DELETE FROM ${table}
231
231
+
WHERE author_did = ANY($1) AND NOT (uri = ANY($2))
232
232
+
RETURNING uri`,
233
233
+
[[...fetchedOk], [...keep[collection]]],
234
234
+
);
235
235
+
for (const row of rows) console.log(`pruned ${row.uri}`);
236
236
+
const { rows: count } = await db.query(`SELECT COUNT(*) FROM ${table}`);
237
237
+
console.log(`${table}: ${count[0].count} rows (${rows.length} pruned)`);
238
238
+
}
239
239
+
console.log(`${rejected} records rejected by lexicon validation`);
240
240
+
if (failures.length > 0) {
241
241
+
console.error(
242
242
+
`\n${failures.length} repos unreachable, left untouched (rerun to retry): ${failures.join(", ")}`,
243
243
+
);
244
244
+
process.exitCode = 1;
245
245
+
}
246
246
+
247
247
+
await db.end();
248
248
+
}
249
249
+
250
250
+
main();