an app to share curated trails
sidetrail.app
1import { eq } from "drizzle-orm";
2
3const log = process.env.NODE_ENV === "test" ? () => {} : console.log;
4import type { NodePgDatabase } from "drizzle-orm/node-postgres";
5import { trails, walks, completions, accounts } from "@sidetrail/db";
6import type { JetstreamEvent, AccountEvent } from "./jetstream.js";
7
8export const COLLECTIONS = [
9 "app.sidetrail.trail",
10 "app.sidetrail.walk",
11 "app.sidetrail.completion",
12];
13
14export type IngesterDb = NodePgDatabase;
15
16// ============================================================================
17// Trail operations
18// ============================================================================
19
20export async function upsertTrail(
21 db: IngesterDb,
22 uri: string,
23 cid: string,
24 authorDid: string,
25 rkey: string,
26 record: unknown,
27 createdAt: string,
28): Promise<void> {
29 await db
30 .insert(trails)
31 .values({
32 uri,
33 cid,
34 authorDid,
35 rkey,
36 record: record as typeof trails.$inferInsert.record,
37 createdAt: new Date(createdAt),
38 indexedAt: new Date(),
39 })
40 .onConflictDoUpdate({
41 target: trails.uri,
42 set: {
43 cid,
44 record: record as typeof trails.$inferInsert.record,
45 indexedAt: new Date(),
46 },
47 });
48}
49
50export async function deleteTrail(db: IngesterDb, uri: string): Promise<void> {
51 await db.delete(trails).where(eq(trails.uri, uri));
52}
53
54// ============================================================================
55// Walk operations
56// ============================================================================
57
58export async function upsertWalk(
59 db: IngesterDb,
60 uri: string,
61 cid: string,
62 authorDid: string,
63 rkey: string,
64 trailUri: string,
65 record: unknown,
66 createdAt: string,
67): Promise<void> {
68 await db
69 .insert(walks)
70 .values({
71 uri,
72 cid,
73 authorDid,
74 rkey,
75 trailUri,
76 record: record as typeof walks.$inferInsert.record,
77 createdAt: new Date(createdAt),
78 indexedAt: new Date(),
79 })
80 .onConflictDoUpdate({
81 target: walks.uri,
82 set: {
83 cid,
84 record: record as typeof walks.$inferInsert.record,
85 indexedAt: new Date(),
86 },
87 });
88}
89
90export async function deleteWalk(db: IngesterDb, uri: string): Promise<void> {
91 await db.delete(walks).where(eq(walks.uri, uri));
92}
93
94// ============================================================================
95// Completion operations
96// ============================================================================
97
98export async function upsertCompletion(
99 db: IngesterDb,
100 uri: string,
101 cid: string,
102 authorDid: string,
103 rkey: string,
104 trailUri: string,
105 record: unknown,
106 createdAt: string,
107): Promise<void> {
108 await db
109 .insert(completions)
110 .values({
111 uri,
112 cid,
113 authorDid,
114 rkey,
115 trailUri,
116 record: record as typeof completions.$inferInsert.record,
117 createdAt: new Date(createdAt),
118 indexedAt: new Date(),
119 })
120 .onConflictDoUpdate({
121 target: completions.uri,
122 set: {
123 cid,
124 record: record as typeof completions.$inferInsert.record,
125 indexedAt: new Date(),
126 },
127 });
128}
129
130export async function deleteCompletion(db: IngesterDb, uri: string): Promise<void> {
131 await db.delete(completions).where(eq(completions.uri, uri));
132}
133
134// ============================================================================
135// Account operations
136// ============================================================================
137
138export async function deleteAllContentByDid(db: IngesterDb, did: string): Promise<void> {
139 await Promise.all([
140 db.delete(trails).where(eq(trails.authorDid, did)),
141 db.delete(walks).where(eq(walks.authorDid, did)),
142 db.delete(completions).where(eq(completions.authorDid, did)),
143 ]);
144}
145
146export async function ensureAccount(db: IngesterDb, did: string): Promise<void> {
147 await db
148 .insert(accounts)
149 .values({ did, active: 1, seq: 0, updatedAt: new Date() })
150 .onConflictDoNothing();
151}
152
153export async function handleAccountEvent(
154 db: IngesterDb,
155 evt: AccountEvent["account"],
156): Promise<void> {
157 const { did, active, seq, status } = evt;
158
159 const [existing] = await db
160 .select({ seq: accounts.seq })
161 .from(accounts)
162 .where(eq(accounts.did, did))
163 .limit(1);
164
165 if (!existing) {
166 return;
167 }
168
169 if (existing.seq >= seq) {
170 return;
171 }
172
173 await db
174 .update(accounts)
175 .set({
176 active: active ? 1 : 0,
177 status: active ? null : status,
178 seq,
179 updatedAt: new Date(),
180 })
181 .where(eq(accounts.did, did));
182
183 if (!active && (status === "takendown" || status === "deleted")) {
184 log(`Account ${did} ${status} - deleting all content`);
185 await deleteAllContentByDid(db, did);
186 } else if (!active) {
187 log(`Account ${did} ${status ?? "inactive"} - marking inactive`);
188 } else if (active) {
189 log(`Account ${did} reactivated`);
190 }
191}
192
193// ============================================================================
194// Event handler
195// ============================================================================
196
197export async function handleEvent(db: IngesterDb, evt: JetstreamEvent): Promise<void> {
198 if (evt.kind === "account") {
199 await handleAccountEvent(db, evt.account);
200 return;
201 }
202
203 if (evt.kind === "identity") return;
204 if (evt.kind !== "commit") return;
205
206 const { commit } = evt;
207 const { collection, rkey } = commit;
208 if (!COLLECTIONS.includes(collection)) return;
209
210 // ATProto spec: commits from inactive accounts should be ignored
211 const [accountStatus] = await db
212 .select({ active: accounts.active })
213 .from(accounts)
214 .where(eq(accounts.did, evt.did))
215 .limit(1);
216
217 if (accountStatus && !accountStatus.active) {
218 return;
219 }
220
221 const uri = `at://${evt.did}/${collection}/${rkey}`;
222
223 if (commit.operation === "delete") {
224 switch (collection) {
225 case "app.sidetrail.trail":
226 await deleteTrail(db, uri);
227 break;
228 case "app.sidetrail.walk":
229 await deleteWalk(db, uri);
230 break;
231 case "app.sidetrail.completion":
232 await deleteCompletion(db, uri);
233 break;
234 }
235 return;
236 }
237
238 const record = commit.record as Record<string, unknown>;
239 await ensureAccount(db, evt.did);
240
241 switch (collection) {
242 case "app.sidetrail.trail":
243 await upsertTrail(
244 db,
245 uri,
246 commit.cid,
247 evt.did,
248 rkey,
249 record,
250 (record.createdAt as string) || new Date().toISOString(),
251 );
252 break;
253
254 case "app.sidetrail.walk": {
255 const trailRef = record.trail as { uri: string } | undefined;
256 const trailUri = trailRef?.uri || "";
257 await upsertWalk(
258 db,
259 uri,
260 commit.cid,
261 evt.did,
262 rkey,
263 trailUri,
264 record,
265 (record.createdAt as string) || new Date().toISOString(),
266 );
267 break;
268 }
269
270 case "app.sidetrail.completion": {
271 const trailRef = record.trail as { uri: string } | undefined;
272 const trailUri = trailRef?.uri || "";
273 await upsertCompletion(
274 db,
275 uri,
276 commit.cid,
277 evt.did,
278 rkey,
279 trailUri,
280 record,
281 (record.createdAt as string) || new Date().toISOString(),
282 );
283 break;
284 }
285 }
286}