an app to share curated trails
sidetrail.app
1import { eq } from "drizzle-orm";
2
3const log = process.env.NODE_ENV === "test" ? () => {} : console.log;
4import type { NodePgDatabase } from "drizzle-orm/node-postgres";
5import { trails, walks, completions, accounts } from "@sidetrail/db";
6import type { JetstreamEvent, AccountEvent } from "./jetstream.js";
7import { validateRecord, type IndexedCollection } from "./lexicons.js";
8
9export const COLLECTIONS = [
10 "app.sidetrail.trail",
11 "app.sidetrail.walk",
12 "app.sidetrail.completion",
13];
14
15export type IngesterDb = NodePgDatabase;
16
17// ============================================================================
18// Trail operations
19// ============================================================================
20
21export async function upsertTrail(
22 db: IngesterDb,
23 uri: string,
24 cid: string,
25 authorDid: string,
26 rkey: string,
27 record: unknown,
28 createdAt: string,
29): Promise<void> {
30 await db
31 .insert(trails)
32 .values({
33 uri,
34 cid,
35 authorDid,
36 rkey,
37 record: record as typeof trails.$inferInsert.record,
38 createdAt: new Date(createdAt),
39 indexedAt: new Date(),
40 })
41 .onConflictDoUpdate({
42 target: trails.uri,
43 set: {
44 cid,
45 record: record as typeof trails.$inferInsert.record,
46 indexedAt: new Date(),
47 },
48 });
49}
50
51export async function deleteTrail(db: IngesterDb, uri: string): Promise<void> {
52 await db.delete(trails).where(eq(trails.uri, uri));
53}
54
55// ============================================================================
56// Walk operations
57// ============================================================================
58
59export async function upsertWalk(
60 db: IngesterDb,
61 uri: string,
62 cid: string,
63 authorDid: string,
64 rkey: string,
65 trailUri: string,
66 record: unknown,
67 createdAt: string,
68): Promise<void> {
69 await db
70 .insert(walks)
71 .values({
72 uri,
73 cid,
74 authorDid,
75 rkey,
76 trailUri,
77 record: record as typeof walks.$inferInsert.record,
78 createdAt: new Date(createdAt),
79 indexedAt: new Date(),
80 })
81 .onConflictDoUpdate({
82 target: walks.uri,
83 set: {
84 cid,
85 record: record as typeof walks.$inferInsert.record,
86 indexedAt: new Date(),
87 },
88 });
89}
90
91export async function deleteWalk(db: IngesterDb, uri: string): Promise<void> {
92 await db.delete(walks).where(eq(walks.uri, uri));
93}
94
95// ============================================================================
96// Completion operations
97// ============================================================================
98
99export async function upsertCompletion(
100 db: IngesterDb,
101 uri: string,
102 cid: string,
103 authorDid: string,
104 rkey: string,
105 trailUri: string,
106 record: unknown,
107 createdAt: string,
108): Promise<void> {
109 await db
110 .insert(completions)
111 .values({
112 uri,
113 cid,
114 authorDid,
115 rkey,
116 trailUri,
117 record: record as typeof completions.$inferInsert.record,
118 createdAt: new Date(createdAt),
119 indexedAt: new Date(),
120 })
121 .onConflictDoUpdate({
122 target: completions.uri,
123 set: {
124 cid,
125 record: record as typeof completions.$inferInsert.record,
126 indexedAt: new Date(),
127 },
128 });
129}
130
131export async function deleteCompletion(db: IngesterDb, uri: string): Promise<void> {
132 await db.delete(completions).where(eq(completions.uri, uri));
133}
134
135// ============================================================================
136// Account operations
137// ============================================================================
138
139export async function deleteAllContentByDid(db: IngesterDb, did: string): Promise<void> {
140 await Promise.all([
141 db.delete(trails).where(eq(trails.authorDid, did)),
142 db.delete(walks).where(eq(walks.authorDid, did)),
143 db.delete(completions).where(eq(completions.authorDid, did)),
144 ]);
145}
146
147export async function ensureAccount(db: IngesterDb, did: string): Promise<void> {
148 await db
149 .insert(accounts)
150 .values({ did, active: 1, seq: 0, updatedAt: new Date() })
151 .onConflictDoNothing();
152}
153
154export async function handleAccountEvent(
155 db: IngesterDb,
156 evt: AccountEvent["account"],
157): Promise<void> {
158 const { did, active, seq, status } = evt;
159
160 const [existing] = await db
161 .select({ seq: accounts.seq })
162 .from(accounts)
163 .where(eq(accounts.did, did))
164 .limit(1);
165
166 if (!existing) {
167 return;
168 }
169
170 if (existing.seq >= seq) {
171 return;
172 }
173
174 await db
175 .update(accounts)
176 .set({
177 active: active ? 1 : 0,
178 status: active ? null : status,
179 seq,
180 updatedAt: new Date(),
181 })
182 .where(eq(accounts.did, did));
183
184 if (!active && (status === "takendown" || status === "deleted")) {
185 log(`Account ${did} ${status} - deleting all content`);
186 await deleteAllContentByDid(db, did);
187 } else if (!active) {
188 log(`Account ${did} ${status ?? "inactive"} - marking inactive`);
189 } else if (active) {
190 log(`Account ${did} reactivated`);
191 }
192}
193
194// ============================================================================
195// Event handler
196// ============================================================================
197
198export async function handleEvent(db: IngesterDb, evt: JetstreamEvent): Promise<void> {
199 if (evt.kind === "account") {
200 await handleAccountEvent(db, evt.account);
201 return;
202 }
203
204 if (evt.kind === "identity") return;
205 if (evt.kind !== "commit") return;
206
207 const { commit } = evt;
208 const { collection, rkey } = commit;
209 if (!COLLECTIONS.includes(collection)) return;
210
211 // ATProto spec: commits from inactive accounts should be ignored
212 const [accountStatus] = await db
213 .select({ active: accounts.active })
214 .from(accounts)
215 .where(eq(accounts.did, evt.did))
216 .limit(1);
217
218 if (accountStatus && !accountStatus.active) {
219 return;
220 }
221
222 const uri = `at://${evt.did}/${collection}/${rkey}`;
223
224 if (commit.operation === "delete") {
225 switch (collection) {
226 case "app.sidetrail.trail":
227 await deleteTrail(db, uri);
228 break;
229 case "app.sidetrail.walk":
230 await deleteWalk(db, uri);
231 break;
232 case "app.sidetrail.completion":
233 await deleteCompletion(db, uri);
234 break;
235 }
236 return;
237 }
238
239 const record = commit.record as Record<string, unknown>;
240
241 const validation = validateRecord(collection as IndexedCollection, record);
242 if (!validation.success) {
243 log(`Rejecting invalid ${collection} ${uri}: ${validation.reason}`);
244 return;
245 }
246
247 await ensureAccount(db, evt.did);
248
249 switch (collection) {
250 case "app.sidetrail.trail":
251 await upsertTrail(
252 db,
253 uri,
254 commit.cid,
255 evt.did,
256 rkey,
257 record,
258 (record.createdAt as string) || new Date().toISOString(),
259 );
260 break;
261
262 case "app.sidetrail.walk": {
263 const trailRef = record.trail as { uri: string } | undefined;
264 const trailUri = trailRef?.uri || "";
265 await upsertWalk(
266 db,
267 uri,
268 commit.cid,
269 evt.did,
270 rkey,
271 trailUri,
272 record,
273 (record.createdAt as string) || new Date().toISOString(),
274 );
275 break;
276 }
277
278 case "app.sidetrail.completion": {
279 const trailRef = record.trail as { uri: string } | undefined;
280 const trailUri = trailRef?.uri || "";
281 await upsertCompletion(
282 db,
283 uri,
284 commit.cid,
285 evt.did,
286 rkey,
287 trailUri,
288 record,
289 (record.createdAt as string) || new Date().toISOString(),
290 );
291 break;
292 }
293 }
294}