an app to share curated trails sidetrail.app
1

Configure Feed

Select the types of activity you want to include in your feed.

1import { eq } from "drizzle-orm"; 2 3const log = process.env.NODE_ENV === "test" ? () => {} : console.log; 4import type { NodePgDatabase } from "drizzle-orm/node-postgres"; 5import { trails, walks, completions, accounts } from "@sidetrail/db"; 6import type { JetstreamEvent, AccountEvent } from "./jetstream.js"; 7 8export const COLLECTIONS = [ 9 "app.sidetrail.trail", 10 "app.sidetrail.walk", 11 "app.sidetrail.completion", 12]; 13 14export type IngesterDb = NodePgDatabase; 15 16// ============================================================================ 17// Trail operations 18// ============================================================================ 19 20export async function upsertTrail( 21 db: IngesterDb, 22 uri: string, 23 cid: string, 24 authorDid: string, 25 rkey: string, 26 record: unknown, 27 createdAt: string, 28): Promise<void> { 29 await db 30 .insert(trails) 31 .values({ 32 uri, 33 cid, 34 authorDid, 35 rkey, 36 record: record as typeof trails.$inferInsert.record, 37 createdAt: new Date(createdAt), 38 indexedAt: new Date(), 39 }) 40 .onConflictDoUpdate({ 41 target: trails.uri, 42 set: { 43 cid, 44 record: record as typeof trails.$inferInsert.record, 45 indexedAt: new Date(), 46 }, 47 }); 48} 49 50export async function deleteTrail(db: IngesterDb, uri: string): Promise<void> { 51 await db.delete(trails).where(eq(trails.uri, uri)); 52} 53 54// ============================================================================ 55// Walk operations 56// ============================================================================ 57 58export async function upsertWalk( 59 db: IngesterDb, 60 uri: string, 61 cid: string, 62 authorDid: string, 63 rkey: string, 64 trailUri: string, 65 record: unknown, 66 createdAt: string, 67): Promise<void> { 68 await db 69 .insert(walks) 70 .values({ 71 uri, 72 cid, 73 authorDid, 74 rkey, 75 trailUri, 76 record: record as typeof walks.$inferInsert.record, 77 createdAt: new Date(createdAt), 78 indexedAt: new Date(), 79 }) 80 .onConflictDoUpdate({ 81 target: walks.uri, 82 set: { 83 cid, 84 record: record as typeof walks.$inferInsert.record, 85 indexedAt: new Date(), 86 }, 87 }); 88} 89 90export async function deleteWalk(db: IngesterDb, uri: string): Promise<void> { 91 await db.delete(walks).where(eq(walks.uri, uri)); 92} 93 94// ============================================================================ 95// Completion operations 96// ============================================================================ 97 98export async function upsertCompletion( 99 db: IngesterDb, 100 uri: string, 101 cid: string, 102 authorDid: string, 103 rkey: string, 104 trailUri: string, 105 record: unknown, 106 createdAt: string, 107): Promise<void> { 108 await db 109 .insert(completions) 110 .values({ 111 uri, 112 cid, 113 authorDid, 114 rkey, 115 trailUri, 116 record: record as typeof completions.$inferInsert.record, 117 createdAt: new Date(createdAt), 118 indexedAt: new Date(), 119 }) 120 .onConflictDoUpdate({ 121 target: completions.uri, 122 set: { 123 cid, 124 record: record as typeof completions.$inferInsert.record, 125 indexedAt: new Date(), 126 }, 127 }); 128} 129 130export async function deleteCompletion(db: IngesterDb, uri: string): Promise<void> { 131 await db.delete(completions).where(eq(completions.uri, uri)); 132} 133 134// ============================================================================ 135// Account operations 136// ============================================================================ 137 138export async function deleteAllContentByDid(db: IngesterDb, did: string): Promise<void> { 139 await Promise.all([ 140 db.delete(trails).where(eq(trails.authorDid, did)), 141 db.delete(walks).where(eq(walks.authorDid, did)), 142 db.delete(completions).where(eq(completions.authorDid, did)), 143 ]); 144} 145 146export async function ensureAccount(db: IngesterDb, did: string): Promise<void> { 147 await db 148 .insert(accounts) 149 .values({ did, active: 1, seq: 0, updatedAt: new Date() }) 150 .onConflictDoNothing(); 151} 152 153export async function handleAccountEvent( 154 db: IngesterDb, 155 evt: AccountEvent["account"], 156): Promise<void> { 157 const { did, active, seq, status } = evt; 158 159 const [existing] = await db 160 .select({ seq: accounts.seq }) 161 .from(accounts) 162 .where(eq(accounts.did, did)) 163 .limit(1); 164 165 if (!existing) { 166 return; 167 } 168 169 if (existing.seq >= seq) { 170 return; 171 } 172 173 await db 174 .update(accounts) 175 .set({ 176 active: active ? 1 : 0, 177 status: active ? null : status, 178 seq, 179 updatedAt: new Date(), 180 }) 181 .where(eq(accounts.did, did)); 182 183 if (!active && (status === "takendown" || status === "deleted")) { 184 log(`Account ${did} ${status} - deleting all content`); 185 await deleteAllContentByDid(db, did); 186 } else if (!active) { 187 log(`Account ${did} ${status ?? "inactive"} - marking inactive`); 188 } else if (active) { 189 log(`Account ${did} reactivated`); 190 } 191} 192 193// ============================================================================ 194// Event handler 195// ============================================================================ 196 197export async function handleEvent(db: IngesterDb, evt: JetstreamEvent): Promise<void> { 198 if (evt.kind === "account") { 199 await handleAccountEvent(db, evt.account); 200 return; 201 } 202 203 if (evt.kind === "identity") return; 204 if (evt.kind !== "commit") return; 205 206 const { commit } = evt; 207 const { collection, rkey } = commit; 208 if (!COLLECTIONS.includes(collection)) return; 209 210 // ATProto spec: commits from inactive accounts should be ignored 211 const [accountStatus] = await db 212 .select({ active: accounts.active }) 213 .from(accounts) 214 .where(eq(accounts.did, evt.did)) 215 .limit(1); 216 217 if (accountStatus && !accountStatus.active) { 218 return; 219 } 220 221 const uri = `at://${evt.did}/${collection}/${rkey}`; 222 223 if (commit.operation === "delete") { 224 switch (collection) { 225 case "app.sidetrail.trail": 226 await deleteTrail(db, uri); 227 break; 228 case "app.sidetrail.walk": 229 await deleteWalk(db, uri); 230 break; 231 case "app.sidetrail.completion": 232 await deleteCompletion(db, uri); 233 break; 234 } 235 return; 236 } 237 238 const record = commit.record as Record<string, unknown>; 239 await ensureAccount(db, evt.did); 240 241 switch (collection) { 242 case "app.sidetrail.trail": 243 await upsertTrail( 244 db, 245 uri, 246 commit.cid, 247 evt.did, 248 rkey, 249 record, 250 (record.createdAt as string) || new Date().toISOString(), 251 ); 252 break; 253 254 case "app.sidetrail.walk": { 255 const trailRef = record.trail as { uri: string } | undefined; 256 const trailUri = trailRef?.uri || ""; 257 await upsertWalk( 258 db, 259 uri, 260 commit.cid, 261 evt.did, 262 rkey, 263 trailUri, 264 record, 265 (record.createdAt as string) || new Date().toISOString(), 266 ); 267 break; 268 } 269 270 case "app.sidetrail.completion": { 271 const trailRef = record.trail as { uri: string } | undefined; 272 const trailUri = trailRef?.uri || ""; 273 await upsertCompletion( 274 db, 275 uri, 276 commit.cid, 277 evt.did, 278 rkey, 279 trailUri, 280 record, 281 (record.createdAt as string) || new Date().toISOString(), 282 ); 283 break; 284 } 285 } 286}