an app to share curated trails sidetrail.app
1

Configure Feed

Select the types of activity you want to include in your feed.

1import { eq } from "drizzle-orm"; 2 3const log = process.env.NODE_ENV === "test" ? () => {} : console.log; 4import type { NodePgDatabase } from "drizzle-orm/node-postgres"; 5import { trails, walks, completions, accounts } from "@sidetrail/db"; 6import type { JetstreamEvent, AccountEvent } from "./jetstream.js"; 7import { validateRecord, type IndexedCollection } from "./lexicons.js"; 8 9export const COLLECTIONS = [ 10 "app.sidetrail.trail", 11 "app.sidetrail.walk", 12 "app.sidetrail.completion", 13]; 14 15export type IngesterDb = NodePgDatabase; 16 17// ============================================================================ 18// Trail operations 19// ============================================================================ 20 21export async function upsertTrail( 22 db: IngesterDb, 23 uri: string, 24 cid: string, 25 authorDid: string, 26 rkey: string, 27 record: unknown, 28 createdAt: string, 29): Promise<void> { 30 await db 31 .insert(trails) 32 .values({ 33 uri, 34 cid, 35 authorDid, 36 rkey, 37 record: record as typeof trails.$inferInsert.record, 38 createdAt: new Date(createdAt), 39 indexedAt: new Date(), 40 }) 41 .onConflictDoUpdate({ 42 target: trails.uri, 43 set: { 44 cid, 45 record: record as typeof trails.$inferInsert.record, 46 indexedAt: new Date(), 47 }, 48 }); 49} 50 51export async function deleteTrail(db: IngesterDb, uri: string): Promise<void> { 52 await db.delete(trails).where(eq(trails.uri, uri)); 53} 54 55// ============================================================================ 56// Walk operations 57// ============================================================================ 58 59export async function upsertWalk( 60 db: IngesterDb, 61 uri: string, 62 cid: string, 63 authorDid: string, 64 rkey: string, 65 trailUri: string, 66 record: unknown, 67 createdAt: string, 68): Promise<void> { 69 await db 70 .insert(walks) 71 .values({ 72 uri, 73 cid, 74 authorDid, 75 rkey, 76 trailUri, 77 record: record as typeof walks.$inferInsert.record, 78 createdAt: new Date(createdAt), 79 indexedAt: new Date(), 80 }) 81 .onConflictDoUpdate({ 82 target: walks.uri, 83 set: { 84 cid, 85 record: record as typeof walks.$inferInsert.record, 86 indexedAt: new Date(), 87 }, 88 }); 89} 90 91export async function deleteWalk(db: IngesterDb, uri: string): Promise<void> { 92 await db.delete(walks).where(eq(walks.uri, uri)); 93} 94 95// ============================================================================ 96// Completion operations 97// ============================================================================ 98 99export async function upsertCompletion( 100 db: IngesterDb, 101 uri: string, 102 cid: string, 103 authorDid: string, 104 rkey: string, 105 trailUri: string, 106 record: unknown, 107 createdAt: string, 108): Promise<void> { 109 await db 110 .insert(completions) 111 .values({ 112 uri, 113 cid, 114 authorDid, 115 rkey, 116 trailUri, 117 record: record as typeof completions.$inferInsert.record, 118 createdAt: new Date(createdAt), 119 indexedAt: new Date(), 120 }) 121 .onConflictDoUpdate({ 122 target: completions.uri, 123 set: { 124 cid, 125 record: record as typeof completions.$inferInsert.record, 126 indexedAt: new Date(), 127 }, 128 }); 129} 130 131export async function deleteCompletion(db: IngesterDb, uri: string): Promise<void> { 132 await db.delete(completions).where(eq(completions.uri, uri)); 133} 134 135// ============================================================================ 136// Account operations 137// ============================================================================ 138 139export async function deleteAllContentByDid(db: IngesterDb, did: string): Promise<void> { 140 await Promise.all([ 141 db.delete(trails).where(eq(trails.authorDid, did)), 142 db.delete(walks).where(eq(walks.authorDid, did)), 143 db.delete(completions).where(eq(completions.authorDid, did)), 144 ]); 145} 146 147export async function ensureAccount(db: IngesterDb, did: string): Promise<void> { 148 await db 149 .insert(accounts) 150 .values({ did, active: 1, seq: 0, updatedAt: new Date() }) 151 .onConflictDoNothing(); 152} 153 154export async function handleAccountEvent( 155 db: IngesterDb, 156 evt: AccountEvent["account"], 157): Promise<void> { 158 const { did, active, seq, status } = evt; 159 160 const [existing] = await db 161 .select({ seq: accounts.seq }) 162 .from(accounts) 163 .where(eq(accounts.did, did)) 164 .limit(1); 165 166 if (!existing) { 167 return; 168 } 169 170 if (existing.seq >= seq) { 171 return; 172 } 173 174 await db 175 .update(accounts) 176 .set({ 177 active: active ? 1 : 0, 178 status: active ? null : status, 179 seq, 180 updatedAt: new Date(), 181 }) 182 .where(eq(accounts.did, did)); 183 184 if (!active && (status === "takendown" || status === "deleted")) { 185 log(`Account ${did} ${status} - deleting all content`); 186 await deleteAllContentByDid(db, did); 187 } else if (!active) { 188 log(`Account ${did} ${status ?? "inactive"} - marking inactive`); 189 } else if (active) { 190 log(`Account ${did} reactivated`); 191 } 192} 193 194// ============================================================================ 195// Event handler 196// ============================================================================ 197 198export async function handleEvent(db: IngesterDb, evt: JetstreamEvent): Promise<void> { 199 if (evt.kind === "account") { 200 await handleAccountEvent(db, evt.account); 201 return; 202 } 203 204 if (evt.kind === "identity") return; 205 if (evt.kind !== "commit") return; 206 207 const { commit } = evt; 208 const { collection, rkey } = commit; 209 if (!COLLECTIONS.includes(collection)) return; 210 211 // ATProto spec: commits from inactive accounts should be ignored 212 const [accountStatus] = await db 213 .select({ active: accounts.active }) 214 .from(accounts) 215 .where(eq(accounts.did, evt.did)) 216 .limit(1); 217 218 if (accountStatus && !accountStatus.active) { 219 return; 220 } 221 222 const uri = `at://${evt.did}/${collection}/${rkey}`; 223 224 if (commit.operation === "delete") { 225 switch (collection) { 226 case "app.sidetrail.trail": 227 await deleteTrail(db, uri); 228 break; 229 case "app.sidetrail.walk": 230 await deleteWalk(db, uri); 231 break; 232 case "app.sidetrail.completion": 233 await deleteCompletion(db, uri); 234 break; 235 } 236 return; 237 } 238 239 const record = commit.record as Record<string, unknown>; 240 241 const validation = validateRecord(collection as IndexedCollection, record); 242 if (!validation.success) { 243 log(`Rejecting invalid ${collection} ${uri}: ${validation.reason}`); 244 return; 245 } 246 247 await ensureAccount(db, evt.did); 248 249 switch (collection) { 250 case "app.sidetrail.trail": 251 await upsertTrail( 252 db, 253 uri, 254 commit.cid, 255 evt.did, 256 rkey, 257 record, 258 (record.createdAt as string) || new Date().toISOString(), 259 ); 260 break; 261 262 case "app.sidetrail.walk": { 263 const trailRef = record.trail as { uri: string } | undefined; 264 const trailUri = trailRef?.uri || ""; 265 await upsertWalk( 266 db, 267 uri, 268 commit.cid, 269 evt.did, 270 rkey, 271 trailUri, 272 record, 273 (record.createdAt as string) || new Date().toISOString(), 274 ); 275 break; 276 } 277 278 case "app.sidetrail.completion": { 279 const trailRef = record.trail as { uri: string } | undefined; 280 const trailUri = trailRef?.uri || ""; 281 await upsertCompletion( 282 db, 283 uri, 284 commit.cid, 285 evt.did, 286 rkey, 287 trailUri, 288 record, 289 (record.createdAt as string) || new Date().toISOString(), 290 ); 291 break; 292 } 293 } 294}