an app to share curated trails
sidetrail.app
1import WebSocket from "ws";
2
3export type JetstreamEvent = {
4 did: string;
5 time_us: number;
6} & (CommitEvent | AccountEvent | IdentityEvent);
7
8type CommitEvent = {
9 kind: "commit";
10 commit:
11 | {
12 operation: "create" | "update";
13 record: unknown;
14 rev: string;
15 collection: string;
16 rkey: string;
17 cid: string;
18 }
19 | {
20 operation: "delete";
21 rev: string;
22 collection: string;
23 rkey: string;
24 };
25};
26
27type IdentityEvent = {
28 kind: "identity";
29 identity: {
30 did: string;
31 handle: string;
32 seq: number;
33 time: string;
34 };
35};
36
37export type AccountEvent = {
38 kind: "account";
39 account: {
40 active: boolean;
41 did: string;
42 seq: number;
43 time: string;
44 status?: "takendown" | "suspended" | "deleted" | "deactivated";
45 };
46};
47
48export interface JetstreamConfig {
49 instanceUrl: string;
50 wantedCollections: string[];
51 cursor?: number;
52 onEvent: (evt: JetstreamEvent) => Promise<void>;
53 onError: (err: unknown) => void;
54 setCursor?: (cursorUs: number) => Promise<void>;
55}
56
57export class Jetstream {
58 private ws?: WebSocket;
59 private config: JetstreamConfig;
60 private isStarted = false;
61 private isDestroyed = false;
62 private cursor?: number;
63 private lastCursorWrite = 0;
64
65 constructor(config: JetstreamConfig) {
66 this.config = config;
67 this.cursor = config.cursor;
68 }
69
70 private buildUrl(): string {
71 const params = new URLSearchParams();
72 for (const collection of this.config.wantedCollections) {
73 params.append("wantedCollections", collection);
74 }
75 if (this.cursor !== undefined) {
76 params.append("cursor", this.cursor.toString());
77 }
78 return `${this.config.instanceUrl}?${params.toString()}`;
79 }
80
81 start(): void {
82 if (this.isStarted) return;
83 this.isStarted = true;
84 this.isDestroyed = false;
85
86 const url = this.buildUrl();
87 console.log(`Connecting to Jetstream: ${url}`);
88 this.ws = new WebSocket(url);
89
90 this.ws.on("open", () => {
91 console.log("Jetstream connection opened");
92 });
93
94 this.ws.on("message", async (data) => {
95 try {
96 const event: JetstreamEvent = JSON.parse(data.toString());
97
98 // Update cursor (throttled to every 30s)
99 if (event.time_us !== undefined && this.config.setCursor) {
100 this.cursor = event.time_us;
101 const now = Date.now();
102 if (now - this.lastCursorWrite >= 30000) {
103 this.lastCursorWrite = now;
104 console.log(`Saving cursor: ${event.time_us}`);
105 await this.config.setCursor(event.time_us);
106 }
107 }
108
109 await this.config.onEvent(event);
110 } catch (err) {
111 this.config.onError(err);
112 }
113 });
114
115 this.ws.on("error", (err) => {
116 this.config.onError(err);
117 });
118
119 this.ws.on("close", (code, reason) => {
120 this.isStarted = false;
121 if (!this.isDestroyed) {
122 console.log(`Jetstream closed (code: ${code}, reason: ${reason}). Reconnecting in 5s...`);
123 setTimeout(() => this.start(), 5000);
124 }
125 });
126 }
127
128 destroy(): void {
129 this.isDestroyed = true;
130 if (this.ws) {
131 this.ws.close();
132 this.isStarted = false;
133 console.log("Jetstream destroyed");
134 }
135 }
136}