This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1// jetstream-consumer.js 2// Script to consume Bluesky firehose via Jetstream and save records to Supabase 3 4import WebSocket from 'ws'; 5import { createClient } from '@supabase/supabase-js'; 6import dotenv from 'dotenv'; 7import fs from 'fs'; 8import path from 'path'; 9import https from 'https'; 10import { promisify } from 'util'; 11 12// Load environment variables 13dotenv.config(); 14 15// Configure Supabase client 16const supabaseUrl = process.env.SUPABASE_URL; 17const supabaseKey = process.env.SUPABASE_KEY; 18const supabase = createClient(supabaseUrl, supabaseKey); 19 20// Configure Jetstream connection 21const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe'; 22const WANTED_COLLECTION = 'im.flushing.right.now'; 23const CURSOR_FILE_PATH = path.join(process.cwd(), 'cursor.txt'); 24 25// Read cursor from file if it exists 26function loadCursor() { 27 try { 28 if (fs.existsSync(CURSOR_FILE_PATH)) { 29 const cursor = fs.readFileSync(CURSOR_FILE_PATH, 'utf8').trim(); 30 console.log(`Loaded cursor: ${cursor}`); 31 return cursor; 32 } 33 } catch (error) { 34 console.error('Error loading cursor:', error); 35 } 36 return null; 37} 38 39// Save cursor to file 40function saveCursor(cursor) { 41 try { 42 fs.writeFileSync(CURSOR_FILE_PATH, cursor.toString()); 43 } catch (error) { 44 console.error('Error saving cursor:', error); 45 } 46} 47 48// Utility function to add response headers to avoid rate limiting 49function getRequestOptions(url) { 50 const parsedUrl = new URL(url); 51 return { 52 hostname: parsedUrl.hostname, 53 path: parsedUrl.pathname + parsedUrl.search, 54 headers: { 55 'User-Agent': 'FlushingRecorder/1.0 (https://example.com/)', 56 'Accept': 'application/json' 57 }, 58 timeout: 10000 59 }; 60} 61 62// Resolve a DID to a handle using multiple methods 63async function resolveDIDToHandle(did) { 64 console.log(`Attempting to resolve DID: ${did}`); 65 66 // Make sure the DID is properly formatted 67 if (!did || !did.startsWith('did:')) { 68 console.error(`Invalid DID format: ${did}`); 69 return null; 70 } 71 72 // Method 1: Try the Bluesky API (most reliable) 73 try { 74 console.log(`Trying Bluesky API method for ${did}`); 75 const handle = await resolveDIDWithBskyAPI(did); 76 if (handle) { 77 console.log(`Bluesky API resolved ${did} to ${handle}`); 78 return handle; 79 } 80 } catch (error) { 81 console.error(`Bluesky API method failed for ${did}:`, error); 82 } 83 84 // Method 2: Try the PLC directory 85 try { 86 console.log(`Trying PLC directory method for ${did}`); 87 const handle = await resolveDIDWithPLC(did); 88 if (handle) { 89 console.log(`PLC directory resolved ${did} to ${handle}`); 90 return handle; 91 } 92 } catch (error) { 93 console.error(`PLC directory method failed for ${did}:`, error); 94 } 95 96 // Method 3: Try handle resolver (unlikely to work for DIDs, but worth a try) 97 try { 98 console.log(`Trying handle resolver method for ${did}`); 99 const handle = await resolveDIDWithHandleResolver(did); 100 if (handle) { 101 console.log(`Handle resolver resolved ${did} to ${handle}`); 102 return handle; 103 } 104 } catch (error) { 105 console.error(`Handle resolver method failed for ${did}:`, error); 106 } 107 108 console.log(`All resolution methods failed for ${did}`); 109 return null; 110} 111 112// Method 1: Resolve using PLC directory 113async function resolveDIDWithPLC(did) { 114 return new Promise((resolve, reject) => { 115 const url = `https://plc.directory/${encodeURIComponent(did)}`; 116 console.log(`Making PLC directory request to: ${url}`); 117 118 const options = getRequestOptions(url); 119 120 const req = https.get(options, (res) => { 121 let data = ''; 122 123 // Log response status 124 console.log(`PLC Directory response status: ${res.statusCode}`); 125 126 res.on('data', (chunk) => { 127 data += chunk; 128 }); 129 130 res.on('end', () => { 131 try { 132 console.log(`PLC raw response for ${did}: ${data.substring(0, 300)}...`); 133 134 if (res.statusCode !== 200) { 135 console.warn(`Failed to resolve DID ${did} with PLC: HTTP ${res.statusCode}`); 136 resolve(null); 137 return; 138 } 139 140 // Try to parse as JSON first 141 try { 142 const didDoc = JSON.parse(data); 143 144 // Extract handle from alsoKnownAs 145 if (didDoc.alsoKnownAs && Array.isArray(didDoc.alsoKnownAs) && didDoc.alsoKnownAs.length > 0) { 146 console.log(`Found alsoKnownAs entries: ${JSON.stringify(didDoc.alsoKnownAs)}`); 147 148 // Look for value starting with "at://" 149 const atValue = didDoc.alsoKnownAs.find(value => value.startsWith('at://')); 150 if (atValue) { 151 const handle = atValue.replace('at://', ''); 152 console.log(`Successfully resolved ${did} to handle: ${handle}`); 153 resolve(handle); 154 return; 155 } else { 156 console.warn(`No 'at://' prefix found in alsoKnownAs for ${did}`); 157 } 158 } else { 159 console.warn(`No alsoKnownAs property found in DID document for ${did}`); 160 } 161 } catch (jsonError) { 162 console.log(`JSON parsing failed, trying regex: ${jsonError.message}`); 163 } 164 165 // If JSON parsing fails or doesn't find handle, try regex as fallback 166 const atMatch = data.match(/at:\/\/([^"'\\s]+)/); 167 if (atMatch && atMatch[1]) { 168 const handle = atMatch[1]; 169 console.log(`Regex extracted handle for ${did}: ${handle}`); 170 resolve(handle); 171 return; 172 } 173 174 resolve(null); // No handle found 175 } catch (error) { 176 console.error(`Error parsing PLC directory response for ${did}:`, error); 177 resolve(null); 178 } 179 }); 180 }); 181 182 req.on('error', (error) => { 183 console.error(`Error fetching PLC document for ${did}:`, error); 184 resolve(null); 185 }); 186 187 req.on('timeout', () => { 188 console.error(`PLC request timeout for ${did}`); 189 req.destroy(); 190 resolve(null); 191 }); 192 }); 193} 194 195// Method 2: Resolve using Bluesky API 196async function resolveDIDWithBskyAPI(did) { 197 return new Promise((resolve, reject) => { 198 // The Bluesky API endpoint for DID-to-handle resolution 199 const url = `https://public.api.bsky.app/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(did)}`; 200 console.log(`Making Bluesky API request to: ${url}`); 201 202 const options = getRequestOptions(url); 203 204 const req = https.get(options, (res) => { 205 let data = ''; 206 207 // Log response status 208 console.log(`Bluesky API response status: ${res.statusCode}`); 209 210 res.on('data', (chunk) => { 211 data += chunk; 212 }); 213 214 res.on('end', () => { 215 try { 216 if (res.statusCode !== 200) { 217 console.warn(`Failed to resolve DID ${did} with Bluesky API: HTTP ${res.statusCode}`); 218 resolve(null); 219 return; 220 } 221 222 const repoInfo = JSON.parse(data); 223 224 if (repoInfo && repoInfo.handle) { 225 const handle = repoInfo.handle; 226 console.log(`Successfully resolved ${did} to handle: ${handle} using Bluesky API`); 227 resolve(handle); 228 return; 229 } 230 231 resolve(null); // No handle found 232 } catch (error) { 233 console.error(`Error parsing Bluesky API response for ${did}:`, error); 234 resolve(null); 235 } 236 }); 237 }); 238 239 req.on('error', (error) => { 240 console.error(`Error fetching from Bluesky API for ${did}:`, error); 241 resolve(null); 242 }); 243 244 req.on('timeout', () => { 245 console.error(`Bluesky API request timeout for ${did}`); 246 req.destroy(); 247 resolve(null); 248 }); 249 }); 250} 251 252// Method 3: Try Bluesky official handle resolver 253async function resolveDIDWithHandleResolver(did) { 254 try { 255 // First check if this is already a handle format (user.bsky.social) 256 if (did.includes('.') && !did.startsWith('did:')) { 257 console.log(`Input appears to be a handle already: ${did}`); 258 return did; 259 } 260 261 return new Promise((resolve, reject) => { 262 const url = `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(did)}`; 263 console.log(`Making handle resolver request to: ${url}`); 264 265 const options = getRequestOptions(url); 266 267 const req = https.get(options, (res) => { 268 let data = ''; 269 270 // Log response status 271 console.log(`Handle resolver response status: ${res.statusCode}`); 272 273 res.on('data', (chunk) => { 274 data += chunk; 275 }); 276 277 res.on('end', () => { 278 try { 279 if (res.statusCode !== 200) { 280 console.warn(`Failed to resolve ${did} with handle resolver: HTTP ${res.statusCode}`); 281 resolve(null); 282 return; 283 } 284 285 const response = JSON.parse(data); 286 287 if (response && response.did === did) { 288 // This means we resolved a handle to a DID, but we want the opposite 289 resolve(null); 290 return; 291 } 292 293 resolve(null); // No handle found 294 } catch (error) { 295 console.error(`Error parsing handle resolver response for ${did}:`, error); 296 resolve(null); 297 } 298 }); 299 }); 300 301 req.on('error', (error) => { 302 console.error(`Error fetching from handle resolver for ${did}:`, error); 303 resolve(null); 304 }); 305 306 req.on('timeout', () => { 307 console.error(`Handle resolver request timeout for ${did}`); 308 req.destroy(); 309 resolve(null); 310 }); 311 }); 312 } catch (error) { 313 console.error(`Exception in handle resolver for ${did}:`, error); 314 return null; 315 } 316} 317 318// Process Jetstream event 319async function processEvent(event) { 320 try { 321 // Save the cursor for each event we process 322 saveCursor(event.time_us); 323 324 // Only process commit events 325 if (event.kind !== 'commit') { 326 // Don't log skipped events to reduce noise 327 return; 328 } 329 330 // Only process commits for our target collection 331 if (event.commit.collection !== WANTED_COLLECTION) { 332 // Don't log skipped collections to reduce noise 333 return; 334 } 335 336 // Now we can log since we know it's relevant 337 console.log(`Processing event: ${JSON.stringify(event).substring(0, 500)}...`); 338 339 // Extract record data 340 const { did, time_us } = event; 341 const { operation, collection, rkey, record, cid } = event.commit; 342 343 console.log(`Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`); 344 345 // Skip delete operations 346 if (operation === 'delete') { 347 console.log(`Skipping delete operation`); 348 return; 349 } 350 351 // Try different approaches to get a handle 352 353 // Approach 1: Check if handle is already in the record 354 let handle = null; 355 if (record && record.handle) { 356 console.log(`Found handle in record: ${record.handle}`); 357 handle = record.handle; 358 } 359 360 // Approach 2: Try to resolve via APIs 361 if (!handle) { 362 console.log(`Resolving handle for DID: ${did}`); 363 handle = await resolveDIDToHandle(did); 364 365 if (handle) { 366 console.log(`Successfully resolved handle: ${handle}`); 367 } else { 368 console.log(`Failed to resolve handle for DID: ${did}`); 369 370 // Check existing records in database for this DID 371 try { 372 const { data, error } = await supabase 373 .from('flushing_records') 374 .select('handle') 375 .eq('did', did) 376 .not('handle', 'is', null) 377 .not('handle', 'eq', 'unknown') 378 .order('indexed_at', { ascending: false }) 379 .limit(1); 380 381 if (!error && data && data.length > 0 && data[0].handle) { 382 handle = data[0].handle; 383 console.log(`Found handle in database for DID ${did}: ${handle}`); 384 } else { 385 console.log(`No existing handle found in database for DID: ${did}`); 386 handle = 'unknown'; // Set explicitly to unknown 387 } 388 } catch (dbError) { 389 console.error(`Error checking database for existing handle: ${dbError.message}`); 390 handle = 'unknown'; // Set explicitly if DB query fails 391 } 392 } 393 } 394 395 // Double-check that we have a handle, default to 'unknown' if not 396 if (!handle) { 397 console.log(`No handle could be resolved for DID ${did}, using 'unknown'`); 398 handle = 'unknown'; 399 } 400 401 // Prepare data for insertion - DO NOT include id field at all 402 const recordData = { 403 did, 404 collection, 405 type: record?.$type, 406 created_at: record?.createdAt || new Date().toISOString(), 407 emoji: record?.emoji, 408 text: record?.text, 409 cid, 410 uri: `at://${did}/${collection}/${rkey}`, 411 indexed_at: new Date().toISOString(), 412 handle: handle // This will never be null or undefined now 413 }; 414 415 console.log(`Preparing to insert/update record with handle '${recordData.handle}'`); 416 417 // First check if the record already exists 418 const { data: existingData, error: checkError } = await supabase 419 .from('flushing_records') 420 .select('id, handle') 421 .eq('uri', recordData.uri) 422 .limit(1); 423 424 let result; 425 426 if (checkError) { 427 console.error(`Error checking if record exists: ${checkError.message}`); 428 return; 429 } 430 431 // If record exists, update it 432 if (existingData && existingData.length > 0) { 433 console.log(`Record with URI ${recordData.uri} already exists, updating`); 434 435 // If existing record has a valid handle and current handle is 'unknown', use the existing handle 436 if (existingData[0].handle && existingData[0].handle !== 'unknown' && recordData.handle === 'unknown') { 437 console.log(`Keeping existing handle '${existingData[0].handle}' instead of replacing with 'unknown'`); 438 recordData.handle = existingData[0].handle; 439 } 440 441 const { data, error } = await supabase 442 .from('flushing_records') 443 .update(recordData) 444 .eq('uri', recordData.uri); 445 446 result = { data, error }; 447 } 448 // Otherwise insert a new record 449 else { 450 console.log(`Record with URI ${recordData.uri} doesn't exist, inserting with handle: ${recordData.handle}`); 451 const { data, error } = await supabase 452 .from('flushing_records') 453 .insert(recordData); 454 455 result = { data, error }; 456 } 457 458 // Check the result of the operation 459 if (result.error) { 460 console.error(`Error saving record to Supabase: ${result.error.message}`); 461 console.error(`Failed record data: ${JSON.stringify(recordData)}`); 462 } else { 463 console.log(`Successfully saved record: ${recordData.uri} (handle: ${recordData.handle})`); 464 } 465 466 } catch (error) { 467 console.error(`Error processing event: ${error.message}`); 468 console.error(error.stack); 469 } 470} 471 472// Process 'identity' events when they come through the firehose 473async function processIdentityEvent(event) { 474 try { 475 if (event.kind !== 'identity' || !event.identity) { 476 return; 477 } 478 479 const { did, handle } = event.identity; 480 481 if (did && handle) { 482 // Check if we have any records with this DID that have 'unknown' handles 483 try { 484 const { data, error } = await supabase 485 .from('flushing_records') 486 .select('uri') 487 .eq('did', did) 488 .eq('handle', 'unknown'); 489 490 if (!error && data && data.length > 0) { 491 console.log(`Found ${data.length} records with DID ${did} and unknown handle. Updating to ${handle}...`); 492 493 // Update all matching records with the new handle 494 const { updateData, updateError } = await supabase 495 .from('flushing_records') 496 .update({ handle }) 497 .eq('did', did) 498 .eq('handle', 'unknown'); 499 500 if (updateError) { 501 console.error(`Error updating records with DID ${did}: ${updateError.message}`); 502 } else { 503 console.log(`Successfully updated handle for records with DID ${did} to ${handle}`); 504 } 505 } 506 } catch (dbError) { 507 console.error(`Error updating unknown handles: ${dbError.message}`); 508 } 509 } 510 } catch (error) { 511 console.error(`Error processing identity event: ${error.message}`); 512 } 513} 514 515// Connect to Jetstream and process events 516function connectToJetstream() { 517 const cursor = loadCursor(); 518 519 // Building the URL with query parameters - now include identity events! 520 // Including identity events will help us maintain DID-to-handle mapping 521 let url = `${JETSTREAM_URL}?wantedCollections=${WANTED_COLLECTION}`; 522 if (cursor) { 523 // Subtract a few seconds (in microseconds) to ensure no gaps 524 const rewindCursor = parseInt(cursor) - 5000000; // 5 seconds in microseconds 525 url += `&cursor=${rewindCursor}`; 526 } 527 528 console.log(`Connecting to Jetstream: ${url}`); 529 530 const ws = new WebSocket(url); 531 532 ws.on('open', () => { 533 console.log('Connected to Jetstream'); 534 }); 535 536 ws.on('message', async (data) => { 537 try { 538 const event = JSON.parse(data.toString()); 539 540 // Process identity events to keep our DID-to-handle mapping up to date 541 if (event.kind === 'identity') { 542 await processIdentityEvent(event); 543 } 544 545 // Process other events normally 546 await processEvent(event); 547 } catch (error) { 548 console.error('Error parsing message:', error); 549 // Don't log message data to reduce noise 550 } 551 }); 552 553 ws.on('error', (error) => { 554 console.error('WebSocket error:', error); 555 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds 556 }); 557 558 ws.on('close', () => { 559 console.log('Connection closed. Attempting to reconnect...'); 560 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds 561 }); 562 563 // Heartbeat to keep the connection alive 564 const interval = setInterval(() => { 565 if (ws.readyState === WebSocket.OPEN) { 566 ws.ping(); 567 } else { 568 clearInterval(interval); 569 } 570 }, 30000); 571} 572 573// Start the application 574function start() { 575 console.log('Starting Jetstream consumer...'); 576 connectToJetstream(); 577 578 // Handle process termination 579 process.on('SIGINT', () => { 580 console.log('Process terminated. Exiting...'); 581 process.exit(0); 582 }); 583} 584 585start();