This repository has no description
1// jetstream-consumer.js
2// Script to consume Bluesky firehose via Jetstream and save records to Supabase
3
4import WebSocket from 'ws';
5import { createClient } from '@supabase/supabase-js';
6import dotenv from 'dotenv';
7import fs from 'fs';
8import path from 'path';
9import https from 'https';
10import { promisify } from 'util';
11
12// Load environment variables
13dotenv.config();
14
15// Configure Supabase client
16const supabaseUrl = process.env.SUPABASE_URL;
17const supabaseKey = process.env.SUPABASE_KEY;
18const supabase = createClient(supabaseUrl, supabaseKey);
19
20// Configure Jetstream connection
21const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe';
22const WANTED_COLLECTION = 'im.flushing.right.now';
23const CURSOR_FILE_PATH = path.join(process.cwd(), 'cursor.txt');
24
25// Read cursor from file if it exists
26function loadCursor() {
27 try {
28 if (fs.existsSync(CURSOR_FILE_PATH)) {
29 const cursor = fs.readFileSync(CURSOR_FILE_PATH, 'utf8').trim();
30 console.log(`Loaded cursor: ${cursor}`);
31 return cursor;
32 }
33 } catch (error) {
34 console.error('Error loading cursor:', error);
35 }
36 return null;
37}
38
39// Save cursor to file
40function saveCursor(cursor) {
41 try {
42 fs.writeFileSync(CURSOR_FILE_PATH, cursor.toString());
43 } catch (error) {
44 console.error('Error saving cursor:', error);
45 }
46}
47
48// Utility function to add response headers to avoid rate limiting
49function getRequestOptions(url) {
50 const parsedUrl = new URL(url);
51 return {
52 hostname: parsedUrl.hostname,
53 path: parsedUrl.pathname + parsedUrl.search,
54 headers: {
55 'User-Agent': 'FlushingRecorder/1.0 (https://example.com/)',
56 'Accept': 'application/json'
57 },
58 timeout: 10000
59 };
60}
61
62// Resolve a DID to a handle using multiple methods
63async function resolveDIDToHandle(did) {
64 console.log(`Attempting to resolve DID: ${did}`);
65
66 // Make sure the DID is properly formatted
67 if (!did || !did.startsWith('did:')) {
68 console.error(`Invalid DID format: ${did}`);
69 return null;
70 }
71
72 // Method 1: Try the Bluesky API (most reliable)
73 try {
74 console.log(`Trying Bluesky API method for ${did}`);
75 const handle = await resolveDIDWithBskyAPI(did);
76 if (handle) {
77 console.log(`Bluesky API resolved ${did} to ${handle}`);
78 return handle;
79 }
80 } catch (error) {
81 console.error(`Bluesky API method failed for ${did}:`, error);
82 }
83
84 // Method 2: Try the PLC directory
85 try {
86 console.log(`Trying PLC directory method for ${did}`);
87 const handle = await resolveDIDWithPLC(did);
88 if (handle) {
89 console.log(`PLC directory resolved ${did} to ${handle}`);
90 return handle;
91 }
92 } catch (error) {
93 console.error(`PLC directory method failed for ${did}:`, error);
94 }
95
96 // Method 3: Try handle resolver (unlikely to work for DIDs, but worth a try)
97 try {
98 console.log(`Trying handle resolver method for ${did}`);
99 const handle = await resolveDIDWithHandleResolver(did);
100 if (handle) {
101 console.log(`Handle resolver resolved ${did} to ${handle}`);
102 return handle;
103 }
104 } catch (error) {
105 console.error(`Handle resolver method failed for ${did}:`, error);
106 }
107
108 console.log(`All resolution methods failed for ${did}`);
109 return null;
110}
111
112// Method 1: Resolve using PLC directory
113async function resolveDIDWithPLC(did) {
114 return new Promise((resolve, reject) => {
115 const url = `https://plc.directory/${encodeURIComponent(did)}`;
116 console.log(`Making PLC directory request to: ${url}`);
117
118 const options = getRequestOptions(url);
119
120 const req = https.get(options, (res) => {
121 let data = '';
122
123 // Log response status
124 console.log(`PLC Directory response status: ${res.statusCode}`);
125
126 res.on('data', (chunk) => {
127 data += chunk;
128 });
129
130 res.on('end', () => {
131 try {
132 console.log(`PLC raw response for ${did}: ${data.substring(0, 300)}...`);
133
134 if (res.statusCode !== 200) {
135 console.warn(`Failed to resolve DID ${did} with PLC: HTTP ${res.statusCode}`);
136 resolve(null);
137 return;
138 }
139
140 // Try to parse as JSON first
141 try {
142 const didDoc = JSON.parse(data);
143
144 // Extract handle from alsoKnownAs
145 if (didDoc.alsoKnownAs && Array.isArray(didDoc.alsoKnownAs) && didDoc.alsoKnownAs.length > 0) {
146 console.log(`Found alsoKnownAs entries: ${JSON.stringify(didDoc.alsoKnownAs)}`);
147
148 // Look for value starting with "at://"
149 const atValue = didDoc.alsoKnownAs.find(value => value.startsWith('at://'));
150 if (atValue) {
151 const handle = atValue.replace('at://', '');
152 console.log(`Successfully resolved ${did} to handle: ${handle}`);
153 resolve(handle);
154 return;
155 } else {
156 console.warn(`No 'at://' prefix found in alsoKnownAs for ${did}`);
157 }
158 } else {
159 console.warn(`No alsoKnownAs property found in DID document for ${did}`);
160 }
161 } catch (jsonError) {
162 console.log(`JSON parsing failed, trying regex: ${jsonError.message}`);
163 }
164
165 // If JSON parsing fails or doesn't find handle, try regex as fallback
166 const atMatch = data.match(/at:\/\/([^"'\\s]+)/);
167 if (atMatch && atMatch[1]) {
168 const handle = atMatch[1];
169 console.log(`Regex extracted handle for ${did}: ${handle}`);
170 resolve(handle);
171 return;
172 }
173
174 resolve(null); // No handle found
175 } catch (error) {
176 console.error(`Error parsing PLC directory response for ${did}:`, error);
177 resolve(null);
178 }
179 });
180 });
181
182 req.on('error', (error) => {
183 console.error(`Error fetching PLC document for ${did}:`, error);
184 resolve(null);
185 });
186
187 req.on('timeout', () => {
188 console.error(`PLC request timeout for ${did}`);
189 req.destroy();
190 resolve(null);
191 });
192 });
193}
194
195// Method 2: Resolve using Bluesky API
196async function resolveDIDWithBskyAPI(did) {
197 return new Promise((resolve, reject) => {
198 // The Bluesky API endpoint for DID-to-handle resolution
199 const url = `https://public.api.bsky.app/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(did)}`;
200 console.log(`Making Bluesky API request to: ${url}`);
201
202 const options = getRequestOptions(url);
203
204 const req = https.get(options, (res) => {
205 let data = '';
206
207 // Log response status
208 console.log(`Bluesky API response status: ${res.statusCode}`);
209
210 res.on('data', (chunk) => {
211 data += chunk;
212 });
213
214 res.on('end', () => {
215 try {
216 if (res.statusCode !== 200) {
217 console.warn(`Failed to resolve DID ${did} with Bluesky API: HTTP ${res.statusCode}`);
218 resolve(null);
219 return;
220 }
221
222 const repoInfo = JSON.parse(data);
223
224 if (repoInfo && repoInfo.handle) {
225 const handle = repoInfo.handle;
226 console.log(`Successfully resolved ${did} to handle: ${handle} using Bluesky API`);
227 resolve(handle);
228 return;
229 }
230
231 resolve(null); // No handle found
232 } catch (error) {
233 console.error(`Error parsing Bluesky API response for ${did}:`, error);
234 resolve(null);
235 }
236 });
237 });
238
239 req.on('error', (error) => {
240 console.error(`Error fetching from Bluesky API for ${did}:`, error);
241 resolve(null);
242 });
243
244 req.on('timeout', () => {
245 console.error(`Bluesky API request timeout for ${did}`);
246 req.destroy();
247 resolve(null);
248 });
249 });
250}
251
252// Method 3: Try Bluesky official handle resolver
253async function resolveDIDWithHandleResolver(did) {
254 try {
255 // First check if this is already a handle format (user.bsky.social)
256 if (did.includes('.') && !did.startsWith('did:')) {
257 console.log(`Input appears to be a handle already: ${did}`);
258 return did;
259 }
260
261 return new Promise((resolve, reject) => {
262 const url = `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(did)}`;
263 console.log(`Making handle resolver request to: ${url}`);
264
265 const options = getRequestOptions(url);
266
267 const req = https.get(options, (res) => {
268 let data = '';
269
270 // Log response status
271 console.log(`Handle resolver response status: ${res.statusCode}`);
272
273 res.on('data', (chunk) => {
274 data += chunk;
275 });
276
277 res.on('end', () => {
278 try {
279 if (res.statusCode !== 200) {
280 console.warn(`Failed to resolve ${did} with handle resolver: HTTP ${res.statusCode}`);
281 resolve(null);
282 return;
283 }
284
285 const response = JSON.parse(data);
286
287 if (response && response.did === did) {
288 // This means we resolved a handle to a DID, but we want the opposite
289 resolve(null);
290 return;
291 }
292
293 resolve(null); // No handle found
294 } catch (error) {
295 console.error(`Error parsing handle resolver response for ${did}:`, error);
296 resolve(null);
297 }
298 });
299 });
300
301 req.on('error', (error) => {
302 console.error(`Error fetching from handle resolver for ${did}:`, error);
303 resolve(null);
304 });
305
306 req.on('timeout', () => {
307 console.error(`Handle resolver request timeout for ${did}`);
308 req.destroy();
309 resolve(null);
310 });
311 });
312 } catch (error) {
313 console.error(`Exception in handle resolver for ${did}:`, error);
314 return null;
315 }
316}
317
318// Process Jetstream event
319async function processEvent(event) {
320 try {
321 // Save the cursor for each event we process
322 saveCursor(event.time_us);
323
324 // Only process commit events
325 if (event.kind !== 'commit') {
326 // Don't log skipped events to reduce noise
327 return;
328 }
329
330 // Only process commits for our target collection
331 if (event.commit.collection !== WANTED_COLLECTION) {
332 // Don't log skipped collections to reduce noise
333 return;
334 }
335
336 // Now we can log since we know it's relevant
337 console.log(`Processing event: ${JSON.stringify(event).substring(0, 500)}...`);
338
339 // Extract record data
340 const { did, time_us } = event;
341 const { operation, collection, rkey, record, cid } = event.commit;
342
343 console.log(`Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`);
344
345 // Skip delete operations
346 if (operation === 'delete') {
347 console.log(`Skipping delete operation`);
348 return;
349 }
350
351 // Try different approaches to get a handle
352
353 // Approach 1: Check if handle is already in the record
354 let handle = null;
355 if (record && record.handle) {
356 console.log(`Found handle in record: ${record.handle}`);
357 handle = record.handle;
358 }
359
360 // Approach 2: Try to resolve via APIs
361 if (!handle) {
362 console.log(`Resolving handle for DID: ${did}`);
363 handle = await resolveDIDToHandle(did);
364
365 if (handle) {
366 console.log(`Successfully resolved handle: ${handle}`);
367 } else {
368 console.log(`Failed to resolve handle for DID: ${did}`);
369
370 // Check existing records in database for this DID
371 try {
372 const { data, error } = await supabase
373 .from('flushing_records')
374 .select('handle')
375 .eq('did', did)
376 .not('handle', 'is', null)
377 .not('handle', 'eq', 'unknown')
378 .order('indexed_at', { ascending: false })
379 .limit(1);
380
381 if (!error && data && data.length > 0 && data[0].handle) {
382 handle = data[0].handle;
383 console.log(`Found handle in database for DID ${did}: ${handle}`);
384 } else {
385 console.log(`No existing handle found in database for DID: ${did}`);
386 handle = 'unknown'; // Set explicitly to unknown
387 }
388 } catch (dbError) {
389 console.error(`Error checking database for existing handle: ${dbError.message}`);
390 handle = 'unknown'; // Set explicitly if DB query fails
391 }
392 }
393 }
394
395 // Double-check that we have a handle, default to 'unknown' if not
396 if (!handle) {
397 console.log(`No handle could be resolved for DID ${did}, using 'unknown'`);
398 handle = 'unknown';
399 }
400
401 // Prepare data for insertion - DO NOT include id field at all
402 const recordData = {
403 did,
404 collection,
405 type: record?.$type,
406 created_at: record?.createdAt || new Date().toISOString(),
407 emoji: record?.emoji,
408 text: record?.text,
409 cid,
410 uri: `at://${did}/${collection}/${rkey}`,
411 indexed_at: new Date().toISOString(),
412 handle: handle // This will never be null or undefined now
413 };
414
415 console.log(`Preparing to insert/update record with handle '${recordData.handle}'`);
416
417 // First check if the record already exists
418 const { data: existingData, error: checkError } = await supabase
419 .from('flushing_records')
420 .select('id, handle')
421 .eq('uri', recordData.uri)
422 .limit(1);
423
424 let result;
425
426 if (checkError) {
427 console.error(`Error checking if record exists: ${checkError.message}`);
428 return;
429 }
430
431 // If record exists, update it
432 if (existingData && existingData.length > 0) {
433 console.log(`Record with URI ${recordData.uri} already exists, updating`);
434
435 // If existing record has a valid handle and current handle is 'unknown', use the existing handle
436 if (existingData[0].handle && existingData[0].handle !== 'unknown' && recordData.handle === 'unknown') {
437 console.log(`Keeping existing handle '${existingData[0].handle}' instead of replacing with 'unknown'`);
438 recordData.handle = existingData[0].handle;
439 }
440
441 const { data, error } = await supabase
442 .from('flushing_records')
443 .update(recordData)
444 .eq('uri', recordData.uri);
445
446 result = { data, error };
447 }
448 // Otherwise insert a new record
449 else {
450 console.log(`Record with URI ${recordData.uri} doesn't exist, inserting with handle: ${recordData.handle}`);
451 const { data, error } = await supabase
452 .from('flushing_records')
453 .insert(recordData);
454
455 result = { data, error };
456 }
457
458 // Check the result of the operation
459 if (result.error) {
460 console.error(`Error saving record to Supabase: ${result.error.message}`);
461 console.error(`Failed record data: ${JSON.stringify(recordData)}`);
462 } else {
463 console.log(`Successfully saved record: ${recordData.uri} (handle: ${recordData.handle})`);
464 }
465
466 } catch (error) {
467 console.error(`Error processing event: ${error.message}`);
468 console.error(error.stack);
469 }
470}
471
472// Process 'identity' events when they come through the firehose
473async function processIdentityEvent(event) {
474 try {
475 if (event.kind !== 'identity' || !event.identity) {
476 return;
477 }
478
479 const { did, handle } = event.identity;
480
481 if (did && handle) {
482 // Check if we have any records with this DID that have 'unknown' handles
483 try {
484 const { data, error } = await supabase
485 .from('flushing_records')
486 .select('uri')
487 .eq('did', did)
488 .eq('handle', 'unknown');
489
490 if (!error && data && data.length > 0) {
491 console.log(`Found ${data.length} records with DID ${did} and unknown handle. Updating to ${handle}...`);
492
493 // Update all matching records with the new handle
494 const { updateData, updateError } = await supabase
495 .from('flushing_records')
496 .update({ handle })
497 .eq('did', did)
498 .eq('handle', 'unknown');
499
500 if (updateError) {
501 console.error(`Error updating records with DID ${did}: ${updateError.message}`);
502 } else {
503 console.log(`Successfully updated handle for records with DID ${did} to ${handle}`);
504 }
505 }
506 } catch (dbError) {
507 console.error(`Error updating unknown handles: ${dbError.message}`);
508 }
509 }
510 } catch (error) {
511 console.error(`Error processing identity event: ${error.message}`);
512 }
513}
514
515// Connect to Jetstream and process events
516function connectToJetstream() {
517 const cursor = loadCursor();
518
519 // Building the URL with query parameters - now include identity events!
520 // Including identity events will help us maintain DID-to-handle mapping
521 let url = `${JETSTREAM_URL}?wantedCollections=${WANTED_COLLECTION}`;
522 if (cursor) {
523 // Subtract a few seconds (in microseconds) to ensure no gaps
524 const rewindCursor = parseInt(cursor) - 5000000; // 5 seconds in microseconds
525 url += `&cursor=${rewindCursor}`;
526 }
527
528 console.log(`Connecting to Jetstream: ${url}`);
529
530 const ws = new WebSocket(url);
531
532 ws.on('open', () => {
533 console.log('Connected to Jetstream');
534 });
535
536 ws.on('message', async (data) => {
537 try {
538 const event = JSON.parse(data.toString());
539
540 // Process identity events to keep our DID-to-handle mapping up to date
541 if (event.kind === 'identity') {
542 await processIdentityEvent(event);
543 }
544
545 // Process other events normally
546 await processEvent(event);
547 } catch (error) {
548 console.error('Error parsing message:', error);
549 // Don't log message data to reduce noise
550 }
551 });
552
553 ws.on('error', (error) => {
554 console.error('WebSocket error:', error);
555 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
556 });
557
558 ws.on('close', () => {
559 console.log('Connection closed. Attempting to reconnect...');
560 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
561 });
562
563 // Heartbeat to keep the connection alive
564 const interval = setInterval(() => {
565 if (ws.readyState === WebSocket.OPEN) {
566 ws.ping();
567 } else {
568 clearInterval(interval);
569 }
570 }, 30000);
571}
572
573// Start the application
574function start() {
575 console.log('Starting Jetstream consumer...');
576 connectToJetstream();
577
578 // Handle process termination
579 process.on('SIGINT', () => {
580 console.log('Process terminated. Exiting...');
581 process.exit(0);
582 });
583}
584
585start();