an app to share curated trails sidetrail.app
1

Configure Feed

Select the types of activity you want to include in your feed.

1import WebSocket from "ws"; 2 3export type JetstreamEvent = { 4 did: string; 5 time_us: number; 6} & (CommitEvent | AccountEvent | IdentityEvent); 7 8type CommitEvent = { 9 kind: "commit"; 10 commit: 11 | { 12 operation: "create" | "update"; 13 record: unknown; 14 rev: string; 15 collection: string; 16 rkey: string; 17 cid: string; 18 } 19 | { 20 operation: "delete"; 21 rev: string; 22 collection: string; 23 rkey: string; 24 }; 25}; 26 27type IdentityEvent = { 28 kind: "identity"; 29 identity: { 30 did: string; 31 handle: string; 32 seq: number; 33 time: string; 34 }; 35}; 36 37export type AccountEvent = { 38 kind: "account"; 39 account: { 40 active: boolean; 41 did: string; 42 seq: number; 43 time: string; 44 status?: "takendown" | "suspended" | "deleted" | "deactivated"; 45 }; 46}; 47 48export interface JetstreamConfig { 49 instanceUrl: string; 50 wantedCollections: string[]; 51 cursor?: number; 52 onEvent: (evt: JetstreamEvent) => Promise<void>; 53 onError: (err: unknown) => void; 54 setCursor?: (cursorUs: number) => Promise<void>; 55} 56 57export class Jetstream { 58 private ws?: WebSocket; 59 private config: JetstreamConfig; 60 private isStarted = false; 61 private isDestroyed = false; 62 private cursor?: number; 63 private lastCursorWrite = 0; 64 65 constructor(config: JetstreamConfig) { 66 this.config = config; 67 this.cursor = config.cursor; 68 } 69 70 private buildUrl(): string { 71 const params = new URLSearchParams(); 72 for (const collection of this.config.wantedCollections) { 73 params.append("wantedCollections", collection); 74 } 75 if (this.cursor !== undefined) { 76 params.append("cursor", this.cursor.toString()); 77 } 78 return `${this.config.instanceUrl}?${params.toString()}`; 79 } 80 81 start(): void { 82 if (this.isStarted) return; 83 this.isStarted = true; 84 this.isDestroyed = false; 85 86 const url = this.buildUrl(); 87 console.log(`Connecting to Jetstream: ${url}`); 88 this.ws = new WebSocket(url); 89 90 this.ws.on("open", () => { 91 console.log("Jetstream connection opened"); 92 }); 93 94 this.ws.on("message", async (data) => { 95 try { 96 const event: JetstreamEvent = JSON.parse(data.toString()); 97 98 // Update cursor (throttled to every 30s) 99 if (event.time_us !== undefined && this.config.setCursor) { 100 this.cursor = event.time_us; 101 const now = Date.now(); 102 if (now - this.lastCursorWrite >= 30000) { 103 this.lastCursorWrite = now; 104 console.log(`Saving cursor: ${event.time_us}`); 105 await this.config.setCursor(event.time_us); 106 } 107 } 108 109 await this.config.onEvent(event); 110 } catch (err) { 111 this.config.onError(err); 112 } 113 }); 114 115 this.ws.on("error", (err) => { 116 this.config.onError(err); 117 }); 118 119 this.ws.on("close", (code, reason) => { 120 this.isStarted = false; 121 if (!this.isDestroyed) { 122 console.log(`Jetstream closed (code: ${code}, reason: ${reason}). Reconnecting in 5s...`); 123 setTimeout(() => this.start(), 5000); 124 } 125 }); 126 } 127 128 destroy(): void { 129 this.isDestroyed = true; 130 if (this.ws) { 131 this.ws.close(); 132 this.isStarted = false; 133 console.log("Jetstream destroyed"); 134 } 135 } 136}