···11+# Supabase configuration
22+SUPABASE_URL=your-supabase-url
33+SUPABASE_SERVICE_ROLE_KEY=your-service-role-key
44+55+# Bluesky Jetstream configuration
66+JETSTREAM_URL=wss://jetstream2.us-west.bsky.network/subscribe
77+FLUSHING_STATUS_NSID=im.flushing.right.now
88+99+# Optional: Bluesky API configuration
1010+# Only needed if you want to authenticate with the Bluesky API
1111+# BLUESKY_API_USERNAME=your-bluesky-username
1212+# BLUESKY_API_PASSWORD=your-bluesky-password
+5-1
app/.env.example
···11# Supabase configuration
22-NEXT_PUBLIC_SUPABASE_URL=your-supabase-url
22+SUPABASE_URL=your-supabase-url
33SUPABASE_SERVICE_ROLE_KEY=your-service-role-key
44+55+# Bluesky Jetstream configuration
66+JETSTREAM_URL=wss://jetstream2.us-west.bsky.network/subscribe
77+FLUSHING_STATUS_NSID=im.flushing.right.now
4859# Optional: Bluesky API configuration
610# Only needed if you want to authenticate with the Bluesky API
···11-const WebSocket = require('ws');
22-const cbor = require('cbor-web');
33-const { createClient } = require('@supabase/supabase-js');
44-require('dotenv').config();
11+const WebSocket = require("ws");
22+const { createClient } = require("@supabase/supabase-js");
33+const path = require("path");
44+require("dotenv").config({ path: path.resolve(__dirname, "../../.env") }); // Load environment variables from .env file in app root
5566-// Constants
77-const FIREHOSE_URL = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos';
88-const FLUSHING_STATUS_NSID = 'im.flushing.right.now';
66+const JETSTREAM_URL = "wss://jetstream2.us-west.bsky.network/subscribe";
77+const FLUSHING_STATUS_NSID = "im.flushing.right.now";
981010-// Supabase setup - ensure you have these set in your .env file
99+// Supabase setup from .env file
1110const supabaseUrl = process.env.SUPABASE_URL;
1211const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY;
1313-const supabase = createClient(supabaseUrl, supabaseKey);
14121515-// Reconnection parameters
1616-const MAX_RECONNECT_DELAY = 30000; // 30 seconds
1717-let reconnectAttempts = 0;
1818-let ws = null;
1313+if (!supabaseUrl || !supabaseKey) {
1414+ console.error("Missing Supabase credentials. Add SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY to your .env file");
1515+ process.exit(1);
1616+}
19172020-// Connect to the firehose
2121-function connectToFirehose() {
2222- console.log('Connecting to Bluesky firehose...');
2323-2424- ws = new WebSocket(FIREHOSE_URL);
2525-2626- ws.on('open', () => {
2727- console.log('Connected to firehose.');
2828- // Reset reconnect counter on successful connection
2929- reconnectAttempts = 0;
3030- });
3131-3232- ws.on('message', async (data) => {
1818+const supabase = createClient(supabaseUrl, supabaseKey);
1919+2020+// Ensure the table exists
2121+async function setupDatabase() {
3322 try {
3434- // In a real implementation, parse CBOR data to extract repo operations
3535- // For now, log the message to track activity
3636- console.log('Received message from firehose');
3737-3838- // Decode the CBOR message (this is a simplified version)
3939- // The actual implementation would need to handle the header and payload separately
4040- const decoded = cbor.decode(data);
4141-4242- // Process the message if it's a commit
4343- if (decoded.op === 1 && decoded.t === '#commit') {
4444- // Process repo commit
4545- const commit = decoded.payload;
2323+ console.log("Setting up database...");
46244747- // Check if this commit contains a flushing record
4848- const flushingOps = commit.ops.filter(op => {
4949- return op.path.startsWith(FLUSHING_STATUS_NSID) && op.action === 'create';
5050- });
2525+ // Check if the table already exists
2626+ const { error: queryError } = await supabase
2727+ .from('flushing_records')
2828+ .select('id', { count: 'exact', head: true });
51295252- if (flushingOps.length > 0) {
5353- console.log(`Found ${flushingOps.length} flushing records in commit from ${commit.repo}`);
5454-5555- // Process each flushing record
5656- for (const op of flushingOps) {
5757- await processFlushingRecord(commit.repo, op.path, op.cid, commit.blocks);
5858- }
3030+ // If no error, table exists
3131+ if (!queryError) {
3232+ console.log("Table 'flushing_records' already exists");
3333+ return;
3434+ }
3535+3636+ // Create the table using SQL
3737+ const { error: sqlError } = await supabase.sql`
3838+ CREATE TABLE IF NOT EXISTS flushing_records (
3939+ id SERIAL PRIMARY KEY,
4040+ did TEXT NOT NULL,
4141+ collection TEXT NOT NULL,
4242+ type TEXT NOT NULL,
4343+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
4444+ emoji TEXT,
4545+ text TEXT,
4646+ cid TEXT NOT NULL,
4747+ uri TEXT UNIQUE NOT NULL,
4848+ indexed_at TIMESTAMP WITH TIME ZONE DEFAULT now()
4949+ );
5050+5151+ CREATE INDEX IF NOT EXISTS flushing_records_did_idx ON flushing_records(did);
5252+ `;
5353+5454+ if (sqlError) {
5555+ console.error("Error creating table:", sqlError);
5656+ process.exit(1);
5957 }
6060- }
6161- } catch (error) {
6262- console.error('Error processing message:', error);
5858+ console.log("Table created successfully");
5959+ } catch (err) {
6060+ console.error("Error setting up database:", err);
6161+ process.exit(1);
6362 }
6464- });
6565-6666- ws.on('error', (error) => {
6767- console.error('WebSocket error:', error);
6868- });
6969-7070- ws.on('close', (code, reason) => {
7171- console.log(`Connection closed: ${code} - ${reason}`);
7272-7373- // Implement exponential backoff for reconnection
7474- reconnectAttempts++;
7575- const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), MAX_RECONNECT_DELAY);
7676-7777- console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts})...`);
7878- setTimeout(connectToFirehose, delay);
7979- });
8063}
81648282-// Process a flushing record and store it in Supabase
8383-async function processFlushingRecord(authorDid, recordPath, cid, blocks) {
8484- try {
8585- // Extract the record data from the blocks (simplified)
8686- // In a real implementation, you would need to properly decode the IPLD blocks
8787- const recordData = blocks[cid];
8888-8989- if (!recordData) {
9090- console.error('Record data not found in blocks');
9191- return;
9292- }
9393-9494- // Extract the record URI
9595- const uri = `at://${authorDid}/${recordPath}`;
9696-9797- // Check if we already have this record
9898- const { data: existingRecord } = await supabase
9999- .from('flushing_entries')
100100- .select('id')
101101- .eq('uri', uri)
102102- .single();
103103-104104- if (existingRecord) {
105105- console.log('Record already exists, skipping');
106106- return;
107107- }
108108-109109- // Create a new entry in Supabase
110110- const newEntry = {
111111- uri,
112112- cid,
113113- author_did: authorDid,
114114- text: recordData.text,
115115- emoji: recordData.emoji,
116116- created_at: recordData.createdAt
117117- };
118118-119119- const { error } = await supabase
120120- .from('flushing_entries')
121121- .insert(newEntry);
122122-123123- if (error) {
124124- console.error('Error inserting record:', error);
125125- } else {
126126- console.log('Successfully stored new flushing record');
127127- }
128128-129129- // Also try to resolve the author's handle if we don't have it
130130- const { data: authorData } = await supabase
131131- .from('users')
132132- .select('handle')
133133- .eq('did', authorDid)
134134- .single();
135135-136136- if (!authorData || !authorData.handle) {
137137- // TODO: Use the Bluesky API to resolve the handle from the DID
138138- // This would be done with the BskyAgent or direct API call
139139- console.log('Need to resolve handle for DID:', authorDid);
140140- }
141141- } catch (error) {
142142- console.error('Error processing flushing record:', error);
143143- }
144144-}
6565+let messageCount = 0;
6666+let flushingFoundCount = 0;
6767+6868+function connect() {
6969+ console.log("Connecting to Jetstream");
7070+7171+ const wsUrl = JETSTREAM_URL + "?wantedCollections=" + FLUSHING_STATUS_NSID;
7272+ const ws = new WebSocket(wsUrl);
7373+7474+ ws.on("open", () => {
7575+ console.log("Connected to Jetstream");
7676+ });
7777+7878+ ws.on("message", async (data) => {
7979+ messageCount++;
8080+ if (messageCount % 1000 === 0) {
8181+ console.log("Messages:", messageCount);
8282+ }
8383+8484+ try {
8585+ const message = JSON.parse(data.toString());
8686+8787+ if (message.kind === "commit" &&
8888+ message.commit &&
8989+ message.commit.collection === FLUSHING_STATUS_NSID) {
9090+9191+ flushingFoundCount++;
9292+ console.log("Found flushing record:", flushingFoundCount);
9393+ console.log(JSON.stringify(message, null, 2));
9494+9595+ const recordPath = message.commit.collection + "/" + message.commit.rkey;
9696+ const authorDid = message.did;
9797+ const cid = message.commit.cid || "cid_" + Date.now();
9898+9999+ let recordText = "No text found";
100100+ let recordEmoji = "🚽";
101101+ let recordCreatedAt = new Date().toISOString();
102102+ let recordType = FLUSHING_STATUS_NSID;
103103+104104+ if (message.commit.record) {
105105+ if (message.commit.record.text) {
106106+ recordText = message.commit.record.text;
107107+ }
108108+ if (message.commit.record.emoji) {
109109+ recordEmoji = message.commit.record.emoji;
110110+ }
111111+ if (message.commit.record.createdAt) {
112112+ recordCreatedAt = message.commit.record.createdAt;
113113+ }
114114+ if (message.commit.record.$type) {
115115+ recordType = message.commit.record.$type;
116116+ }
117117+ }
118118+119119+ console.log("Author:", authorDid);
120120+ console.log("Path:", recordPath);
121121+ console.log("Text:", recordText);
122122+ console.log("Emoji:", recordEmoji);
123123+ console.log("Created at:", recordCreatedAt);
124124+125125+ const uri = "at://" + authorDid + "/" + recordPath;
126126+ console.log("URI:", uri);
145127146146-// Create the necessary tables if they don't exist
147147-async function setupDatabase() {
148148- try {
149149- // Create flushing_entries table
150150- const { error: entriesError } = await supabase.rpc('create_flushing_entries_table_if_not_exists');
151151- if (entriesError) {
152152- console.error('Error creating flushing_entries table:', entriesError);
153153- }
154154-155155- // Create users table
156156- const { error: usersError } = await supabase.rpc('create_users_table_if_not_exists');
157157- if (usersError) {
158158- console.error('Error creating users table:', usersError);
159159- }
160160- } catch (error) {
161161- console.error('Error setting up database:', error);
162162- }
128128+ // Save to Supabase
129129+ try {
130130+ // Check if record already exists
131131+ const { data: existingData, error: checkError } = await supabase
132132+ .from("flushing_records")
133133+ .select("id")
134134+ .eq("uri", uri)
135135+ .limit(1);
136136+137137+ if (checkError) {
138138+ console.error("Error checking for existing record:", checkError.message);
139139+ return;
140140+ }
141141+142142+ if (existingData && existingData.length > 0) {
143143+ console.log("Record already exists, skipping");
144144+ return;
145145+ }
146146+147147+ // Insert new record
148148+ const newRecord = {
149149+ did: authorDid,
150150+ collection: message.commit.collection,
151151+ type: recordType,
152152+ created_at: recordCreatedAt,
153153+ emoji: recordEmoji,
154154+ text: recordText,
155155+ cid: cid,
156156+ uri: uri
157157+ };
158158+159159+ const { error: insertError } = await supabase
160160+ .from("flushing_records")
161161+ .insert(newRecord);
162162+163163+ if (insertError) {
164164+ console.error("Error saving record:", insertError.message);
165165+ } else {
166166+ console.log("Record saved successfully");
167167+ }
168168+ } catch (err) {
169169+ console.error("Error interacting with database:", err.message);
170170+ }
171171+ }
172172+ } catch (err) {
173173+ console.error("Error processing message:", err.message);
174174+ }
175175+ });
176176+177177+ ws.on("error", (error) => {
178178+ console.error("WebSocket error:", error.message);
179179+ });
180180+181181+ ws.on("close", () => {
182182+ console.log("Connection closed, reconnecting in 5s");
183183+ setTimeout(connect, 5000);
184184+ });
163185}
164186165187// Start the worker
166188async function start() {
167167- console.log('Starting firehose worker...');
168168-169169- // Check if we have the required environment variables
170170- if (!supabaseUrl || !supabaseKey) {
171171- console.error('Missing Supabase credentials. Please set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY environment variables.');
172172- process.exit(1);
173173- }
174174-175175- // Setup the database
176176- await setupDatabase();
177177-178178- // Connect to the firehose
179179- connectToFirehose();
189189+ await setupDatabase();
190190+ connect();
180191}
181192182182-// Handle process termination
183183-process.on('SIGINT', () => {
184184- console.log('Shutting down...');
185185- if (ws) {
186186- ws.close();
187187- }
188188- process.exit(0);
189189-});
193193+// Run the worker
194194+start();
190195191191-// Start the worker
192192-start();196196+process.on("SIGINT", () => {
197197+ console.log("Shutting down");
198198+ process.exit(0);
199199+});
+92-31
app/src/app/api/bluesky/feed/route.ts
···3636 if (supabaseUrl && supabaseKey) {
3737 const supabase = createClient(supabaseUrl, supabaseKey);
38383939- // Fetch the latest entries from Supabase
4040- const { data: entries, error } = await supabase
4141- .from('flushing_entries')
4242- .select(`
4343- id,
4444- uri,
4545- cid,
4646- author_did,
4747- author_handle,
4848- text,
4949- emoji,
5050- created_at
5151- `)
5252- .order('created_at', { ascending: false })
5353- .limit(MAX_ENTRIES);
3939+ // First check if we're using the new flushing_records table
4040+ const { data: recordsExists, error: checkError } = await supabase
4141+ .from('flushing_records')
4242+ .select('id', { count: 'exact', head: true });
4343+4444+ let entries;
4545+ let error;
4646+4747+ if (!checkError) {
4848+ // Use the new flushing_records table
4949+ console.log('Using flushing_records table');
5050+ ({ data: entries, error } = await supabase
5151+ .from('flushing_records')
5252+ .select(`
5353+ id,
5454+ uri,
5555+ cid,
5656+ did,
5757+ text,
5858+ emoji,
5959+ created_at
6060+ `)
6161+ .order('created_at', { ascending: false })
6262+ .limit(MAX_ENTRIES));
6363+ } else {
6464+ // Fall back to the old flushing_entries table
6565+ console.log('Falling back to flushing_entries table');
6666+ ({ data: entries, error } = await supabase
6767+ .from('flushing_entries')
6868+ .select(`
6969+ id,
7070+ uri,
7171+ cid,
7272+ author_did as did,
7373+ author_handle,
7474+ text,
7575+ emoji,
7676+ created_at
7777+ `)
7878+ .order('created_at', { ascending: false })
7979+ .limit(MAX_ENTRIES));
8080+ }
54815582 if (error) {
5683 throw new Error(`Supabase error: ${error.message}`);
5784 }
58855986 // Transform the data to match our client-side model
6060- const processedEntries = (entries || []).map(entry => ({
6161- id: entry.id,
6262- uri: entry.uri,
6363- cid: entry.cid,
6464- authorDid: entry.author_did,
6565- authorHandle: entry.author_handle || 'unknown',
6666- text: entry.text,
6767- emoji: entry.emoji,
6868- createdAt: entry.created_at
8787+ const processedEntries = await Promise.all((entries || []).map(async entry => {
8888+ // For the new table, we need to resolve handles from DIDs
8989+ // For the old table, we might already have handles
9090+ const authorDid = entry.did;
9191+ let authorHandle = entry.author_handle || null;
9292+9393+ // If we don't have a handle (which will always be the case for the new table), resolve it
9494+ if (!authorHandle) {
9595+ const resolvedHandle = await resolveDidToHandle(authorDid);
9696+ authorHandle = resolvedHandle || 'unknown';
9797+ }
9898+9999+ return {
100100+ id: entry.id,
101101+ uri: entry.uri,
102102+ cid: entry.cid,
103103+ authorDid: authorDid,
104104+ authorHandle: authorHandle,
105105+ text: entry.text,
106106+ emoji: entry.emoji,
107107+ createdAt: entry.created_at
108108+ };
69109 }));
7011071111 // Update the cache
···76116 } else {
77117 // If no Supabase credentials, fall back to mock data
78118 console.log('No Supabase credentials, using mock data');
7979- const mockEntries = await getMockEntries();
119119+ const mockEntries = getMockEntries();
8012081121 // Update cache
82122 cachedEntries = mockEntries;
···9513596136// Function to generate mock entries for testing
97137// This is used when Supabase is not configured
9898-async function getMockEntries() {
138138+function getMockEntries() {
99139 // Create some mock entries for testing
100140 const mockEntries = [
101141 {
···133173 return mockEntries;
134174}
135175136136-// Function to attempt to resolve a DID to a handle using the Bluesky API
137137-// This is used when we have a record with an author_did but no author_handle
176176+// Function to attempt to resolve a DID to a handle
177177+// First tries PLC directory, then falls back to Bluesky API if needed
138178async function resolveDidToHandle(did: string): Promise<string | null> {
139179 try {
140140- await agent.login({ identifier: 'user.bsky.social', password: 'none' });
141141- const response = await agent.getProfile({ actor: did });
142142- return response.data.handle;
180180+ // Try PLC directory first (faster and doesn't require auth)
181181+ if (did && did.startsWith('did:plc:')) {
182182+ const plcResponse = await fetch(`https://plc.directory/${did}/data`);
183183+ if (plcResponse.ok) {
184184+ const plcData = await plcResponse.json();
185185+ if (plcData && plcData.alsoKnownAs && plcData.alsoKnownAs.length > 0) {
186186+ // alsoKnownAs contains values like 'at://user.bsky.social'
187187+ const handle = plcData.alsoKnownAs[0].split('//')[1];
188188+ if (handle) return handle;
189189+ }
190190+ }
191191+ }
192192+193193+ // Fall back to Bluesky API
194194+ console.log(`Falling back to Bluesky API for DID: ${did}`);
195195+ try {
196196+ // Try to resolve DID directly with Bluesky API
197197+ await agent.login({ identifier: 'user.bsky.social', password: 'none' });
198198+ const response = await agent.getProfile({ actor: did });
199199+ return response.data.handle;
200200+ } catch (apiError) {
201201+ console.error(`Failed to resolve handle with Bluesky API for DID ${did}:`, apiError);
202202+ return null;
203203+ }
143204 } catch (error) {
144205 console.error(`Failed to resolve handle for DID ${did}:`, error);
145206 return null;
+232
contextual info for claude/bluesky_jetstream.md
···11+# Jetstream
22+33+Jetstream is a streaming service that consumes an ATProto `com.atproto.sync.subscribeRepos` stream and converts it into lightweight, friendly JSON.
44+55+Jetstream converts the CBOR-encoded MST blocks produced by the ATProto firehose and translates them into JSON objects that are easier to interface with using standard tooling available in programming languages.
66+77+### Public Instances
88+99+As of writing, there are 4 official public Jetstream instances operated by Bluesky.
1010+1111+| Hostname | Region |
1212+| --------------------------------- | ------- |
1313+| `jetstream1.us-east.bsky.network` | US-East |
1414+| `jetstream2.us-east.bsky.network` | US-East |
1515+| `jetstream1.us-west.bsky.network` | US-West |
1616+| `jetstream2.us-west.bsky.network` | US-West |
1717+1818+Connect to these instances over WSS: `wss://jetstream2.us-west.bsky.network/subscribe`
1919+2020+We will monitor and operate these instances and do our best to keep them available for public use by developers.
2121+2222+Feel free to have multiple connections to Jetstream instances if needed. We encourage you to make use of Jetstream wherever you may consider using the `com.atproto.sync.subscribeRepos` firehose if you don't need the features of the full sync protocol.
2323+2424+Because cursors for Jetstream are time-based (unix microseconds), you can use the same cursor for multiple instances to get roughly the same data.
2525+2626+When switching between instances, it may be prudent to rewind your cursor a few seconds for gapless playback if you process events idempotently.
2727+2828+## Running Jetstream
2929+3030+To run Jetstream, make sure you have docker and docker compose installed and run `make up` in the repo root.
3131+3232+This will pull the latest built image from GHCR and start a Jetstream instance at `http://localhost:6008`
3333+3434+- To build Jetstream from source via Docker and start it up, run `make rebuild`
3535+3636+Once started, you can connect to the event stream at: `ws://localhost:6008/subscribe`
3737+3838+Prometheus metrics are exposed at `http://localhost:6009/metrics`
3939+4040+A [Grafana Dashboard](#dashboard-preview) for Jetstream is available at `./grafana-dashboard.json` and should be easy to copy/paste into Grafana's dashboard import prompt.
4141+4242+- This dashboard has a few device-specific graphs for disk and network usage that require NodeExporter and may need to be tuned to your setup.
4343+4444+## Consuming Jetstream
4545+4646+To consume Jetstream you can use any websocket client
4747+4848+Connect to `ws://localhost:6008/subscribe` to start the stream
4949+5050+The following Query Parameters are supported:
5151+5252+- `wantedCollections` - An array of [Collection NSIDs](https://atproto.com/specs/nsid) to filter which records you receive on your stream (default empty = all collections)
5353+ - `wantedCollections` supports NSID path prefixes i.e. `app.bsky.graph.*`, or `app.bsky.*`. The prefix before the `.*` must pass NSID validation and Jetstream **does not** support incomplete prefixes i.e. `app.bsky.graph.fo*`.
5454+ - Regardless of desired collections, all subscribers recieve Account and Identity events.
5555+ - You can specify at most 100 wanted collections/prefixes.
5656+- `wantedDids` - An array of Repo DIDs to filter which records you receive on your stream (Default empty = all repos)
5757+ - You can specify at most 10,000 wanted DIDs.
5858+- `maxMessageSizeBytes` - The maximum size of a payload that this client would like to receive. Zero means no limit, negative values are treated as zero. (Default "0" or empty = no maximum size)
5959+- `cursor` - A unix microseconds timestamp cursor to begin playback from
6060+ - An absent cursor or a cursor from the future will result in live-tail operation
6161+ - When reconnecting, use the `time_us` from your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback
6262+- `compress` - Set to `true` to enable `zstd` [compression](#compression)
6363+- `requireHello` - Set to `true` to pause replay/live-tail until the server recevies a [`SubscriberOptionsUpdatePayload`](#options-updates) over the socket in a [Subscriber Sourced Message](#subscriber-sourced-messages)
6464+6565+### Examples
6666+6767+A simple example that hits the public instance looks like:
6868+6969+```bash
7070+$ websocat wss://jetstream2.us-east.bsky.network/subscribe\?wantedCollections=app.bsky.feed.post
7171+```
7272+7373+A maximal example using all parameters looks like:
7474+7575+```bash
7676+$ websocat "ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow&wantedDids=did:plc:q6gjnaw2blty4crticxkmujt&cursor=1725519626134432"
7777+```
7878+7979+### Example events:
8080+8181+Jetstream events have 3 `kinds`s (so far):
8282+8383+- `commit`: a Commit to a repo which involves either a create, update, or delete of a record
8484+- `identity`: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handle
8585+- `account`: an Account event that indicates a change in account status i.e. from `active` to `deactivated`, or to `takendown` if the PDS has taken down the repo.
8686+8787+Jetstream Commits have 3 `operations`:
8888+8989+- `create`: Create a new record with the contents provided
9090+- `update`: Update an existing record and replace it with the contents provided
9191+- `delete`: Delete an existing record with the DID, Collection, and RKey provided
9292+9393+#### A like committed to a repo
9494+9595+```json
9696+{
9797+ "did": "did:plc:eygmaihciaxprqvxpfvl6flk",
9898+ "time_us": 1725911162329308,
9999+ "kind": "commit",
100100+ "commit": {
101101+ "rev": "3l3qo2vutsw2b",
102102+ "operation": "create",
103103+ "collection": "app.bsky.feed.like",
104104+ "rkey": "3l3qo2vuowo2b",
105105+ "record": {
106106+ "$type": "app.bsky.feed.like",
107107+ "createdAt": "2024-09-09T19:46:02.102Z",
108108+ "subject": {
109109+ "cid": "bafyreidc6sydkkbchcyg62v77wbhzvb2mvytlmsychqgwf2xojjtirmzj4",
110110+ "uri": "at://did:plc:wa7b35aakoll7hugkrjtf3xf/app.bsky.feed.post/3l3pte3p2e325"
111111+ }
112112+ },
113113+ "cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
114114+ }
115115+}
116116+```
117117+118118+#### A deleted follow record
119119+120120+```json
121121+{
122122+ "did": "did:plc:rfov6bpyztcnedeyyzgfq42k",
123123+ "time_us": 1725516666833633,
124124+ "kind": "commit",
125125+ "commit": {
126126+ "rev": "3l3f6nzl3cv2s",
127127+ "operation": "delete",
128128+ "collection": "app.bsky.graph.follow",
129129+ "rkey": "3l3dn7tku762u"
130130+ }
131131+}
132132+```
133133+134134+#### An identity update
135135+136136+```json
137137+{
138138+ "did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
139139+ "time_us": 1725516665234703,
140140+ "kind": "identity",
141141+ "identity": {
142142+ "did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
143143+ "handle": "yohenrique.bsky.social",
144144+ "seq": 1409752997,
145145+ "time": "2024-09-05T06:11:04.870Z"
146146+ }
147147+}
148148+```
149149+150150+#### An account becoming active
151151+152152+```json
153153+{
154154+ "did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
155155+ "time_us": 1725516665333808,
156156+ "kind": "account",
157157+ "account": {
158158+ "active": true,
159159+ "did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
160160+ "seq": 1409753013,
161161+ "time": "2024-09-05T06:11:04.870Z"
162162+ }
163163+}
164164+```
165165+166166+### Compression
167167+168168+Jetstream supports `zstd`-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in `pkg/models/zstd_dictionary` and is required to decode compressed messages from the server.
169169+170170+`zstd` compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose.
171171+172172+The provided client library uses compression by default, using an embedded copy of the Dictionary from the `models` package.
173173+174174+To request a compressed stream, pass the `Socket-Encoding: zstd` header through when initiating the websocket _or_ pass `compress=true` in the query string.
175175+176176+### Subscriber Sourced messages
177177+178178+Subscribers can send Text messages to Jetstream over the websocket using the `SubscriberSourcedMessage` framing below:
179179+180180+```go
181181+type SubscriberSourcedMessage struct {
182182+ Type string `json:"type"`
183183+ Payload json.RawMessage `json:"payload"`
184184+}
185185+```
186186+187187+The supported message types are as follows:
188188+189189+- `options_update`
190190+191191+#### Options Updates
192192+193193+A client can update their `wantedCollections` and `wantedDids` after connecting to the socket by sending a Subscriber Sourced Message.
194194+195195+To send an Options Update, provide the string `options_update` in the `type` field and a `SubscriberOptionsUpdatePayload` in the `payload` field.
196196+197197+The shape for a `SubscriberOptionsUpdatePayload` is as follows:
198198+199199+```go
200200+type SubscriberOptionsUpdateMsg struct {
201201+ WantedCollections []string `json:"wantedCollections"`
202202+ WantedDIDs []string `json:"wantedDids"`
203203+ MaxMessageSizeBytes int `json:"maxMessageSizeBytes"`
204204+}
205205+```
206206+207207+If either array is empty, the relevant filter will be disabled (i.e. sending empty `wantedDids` will mean a client gets messages for all DIDs again).
208208+209209+Some limitations apply around the size of the message: right now the message can be at most 10MB in size and can contain up to 100 collection filters _and_ up to 10,000 DID filters.
210210+211211+Additionally, a client can connect with `?requireHello=true` in the query params to pause replay/live-tail until the first Options Update message is sent by the client over the socket.
212212+213213+Invalid Options Updates in `requireHello` mode or normal operating mode will result in the client being disconnected.
214214+215215+An example Subscriber Sourced Message with an Options Update payload is as follows:
216216+217217+```json
218218+{
219219+ "type": "options_update",
220220+ "payload": {
221221+ "wantedCollections": ["app.bsky.feed.post"],
222222+ "wantedDids": ["did:plc:q6gjnaw2blty4crticxkmujt"],
223223+ "maxMessageSizeBytes": 1000000
224224+ }
225225+}
226226+```
227227+228228+The above payload will filter such that a client receives only posts, and only from a the specified DID.
229229+230230+### Dashboard Preview
231231+232232+