···11-const WebSocket = require("ws");
22-const path = require("path");
33-const fs = require("fs");
44-require("dotenv").config({ path: path.resolve(__dirname, "../../.env") }); // Load environment variables from .env file in app root
11+// jetstream-consumer.js
22+// Script to consume Bluesky firehose via Jetstream and save records to Supabase
5366-const JETSTREAM_URL = "wss://jetstream2.us-west.bsky.network/subscribe";
77-const FLUSHING_STATUS_NSID = "im.flushing.right.now";
88-const LOG_FILE = path.resolve(__dirname, "flushing-logs.jsonl");
44+import WebSocket from 'ws';
55+import { createClient } from '@supabase/supabase-js';
66+import dotenv from 'dotenv';
77+import fs from 'fs';
88+import path from 'path';
99+import https from 'https';
1010+import { promisify } from 'util';
9111010-console.log("Starting firehose worker with file storage");
1111-console.log(`Will save records to: ${LOG_FILE}`);
1212+// Load environment variables
1313+dotenv.config();
12141313-// Create log directory if needed
1414-const logDir = path.dirname(LOG_FILE);
1515-if (!fs.existsSync(logDir)) {
1616- fs.mkdirSync(logDir, { recursive: true });
1717-}
1515+// Configure Supabase client
1616+const supabaseUrl = process.env.SUPABASE_URL;
1717+const supabaseKey = process.env.SUPABASE_KEY;
1818+const supabase = createClient(supabaseUrl, supabaseKey);
18191919-// Function to append records to a log file
2020-function saveRecord(record) {
2121- return new Promise((resolve, reject) => {
2222- const logEntry = JSON.stringify(record) + "\n";
2323- fs.appendFile(LOG_FILE, logEntry, (err) => {
2424- if (err) {
2525- console.error("Error writing to log file:", err);
2626- reject(err);
2727- } else {
2828- resolve({ success: true });
2929- }
3030- });
3131- });
3232-}
2020+// Configure Jetstream connection
2121+const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe';
2222+const WANTED_COLLECTION = 'im.flushing.right.now';
2323+const CURSOR_FILE_PATH = path.join(process.cwd(), 'cursor.txt');
33243434-// Test file writing on startup
3535-async function setupSystem() {
3636- try {
3737- console.log("Testing file logging system...");
3838-3939- // Test if we can write to the log file
4040- const testRecord = {
4141- type: "startup",
4242- timestamp: new Date().toISOString(),
4343- message: "Firehose worker started"
4444- };
4545-4646- await saveRecord(testRecord);
4747- console.log("✅ File logging test successful");
4848-4949- // Create stats counter file if it doesn't exist
5050- const statsFile = path.resolve(__dirname, "flushing-stats.json");
5151- if (!fs.existsSync(statsFile)) {
5252- const initialStats = {
5353- total_records: 0,
5454- start_time: new Date().toISOString(),
5555- last_update: new Date().toISOString()
5656- };
5757- fs.writeFileSync(statsFile, JSON.stringify(initialStats, null, 2));
5858- console.log("Created new stats file");
5959- }
6060-6161- } catch (err) {
6262- console.error("Error setting up file logging:", err);
6363- process.exit(1);
2525+// Read cursor from file if it exists
2626+function loadCursor() {
2727+ try {
2828+ if (fs.existsSync(CURSOR_FILE_PATH)) {
2929+ const cursor = fs.readFileSync(CURSOR_FILE_PATH, 'utf8').trim();
3030+ console.log(`Loaded cursor: ${cursor}`);
3131+ return cursor;
6432 }
3333+ } catch (error) {
3434+ console.error('Error loading cursor:', error);
3535+ }
3636+ return null;
6537}
66386767-let messageCount = 0;
6868-let flushingFoundCount = 0;
3939+// Save cursor to file
4040+function saveCursor(cursor) {
4141+ try {
4242+ fs.writeFileSync(CURSOR_FILE_PATH, cursor.toString());
4343+ } catch (error) {
4444+ console.error('Error saving cursor:', error);
4545+ }
4646+}
69477070-function connect() {
7171- console.log("Connecting to Jetstream");
4848+// Utility function to add response headers to avoid rate limiting
4949+function getRequestOptions(url) {
5050+ const parsedUrl = new URL(url);
5151+ return {
5252+ hostname: parsedUrl.hostname,
5353+ path: parsedUrl.pathname + parsedUrl.search,
5454+ headers: {
5555+ 'User-Agent': 'FlushingRecorder/1.0 (https://example.com/)',
5656+ 'Accept': 'application/json'
5757+ },
5858+ timeout: 10000
5959+ };
6060+}
72617373- const wsUrl = JETSTREAM_URL + "?wantedCollections=" + FLUSHING_STATUS_NSID;
7474- const ws = new WebSocket(wsUrl);
6262+// Resolve a DID to a handle using multiple methods
6363+async function resolveDIDToHandle(did) {
6464+ console.log(`Attempting to resolve DID: ${did}`);
6565+6666+ // Make sure the DID is properly formatted
6767+ if (!did || !did.startsWith('did:')) {
6868+ console.error(`Invalid DID format: ${did}`);
6969+ return null;
7070+ }
7171+7272+ // Method 1: Try the Bluesky API (most reliable)
7373+ try {
7474+ console.log(`Trying Bluesky API method for ${did}`);
7575+ const handle = await resolveDIDWithBskyAPI(did);
7676+ if (handle) {
7777+ console.log(`Bluesky API resolved ${did} to ${handle}`);
7878+ return handle;
7979+ }
8080+ } catch (error) {
8181+ console.error(`Bluesky API method failed for ${did}:`, error);
8282+ }
8383+8484+ // Method 2: Try the PLC directory
8585+ try {
8686+ console.log(`Trying PLC directory method for ${did}`);
8787+ const handle = await resolveDIDWithPLC(did);
8888+ if (handle) {
8989+ console.log(`PLC directory resolved ${did} to ${handle}`);
9090+ return handle;
9191+ }
9292+ } catch (error) {
9393+ console.error(`PLC directory method failed for ${did}:`, error);
9494+ }
9595+9696+ // Method 3: Try handle resolver (unlikely to work for DIDs, but worth a try)
9797+ try {
9898+ console.log(`Trying handle resolver method for ${did}`);
9999+ const handle = await resolveDIDWithHandleResolver(did);
100100+ if (handle) {
101101+ console.log(`Handle resolver resolved ${did} to ${handle}`);
102102+ return handle;
103103+ }
104104+ } catch (error) {
105105+ console.error(`Handle resolver method failed for ${did}:`, error);
106106+ }
107107+108108+ console.log(`All resolution methods failed for ${did}`);
109109+ return null;
110110+}
751117676- ws.on("open", () => {
7777- console.log("Connected to Jetstream");
7878- });
7979-8080- ws.on("message", async (data) => {
8181- messageCount++;
8282- if (messageCount % 1000 === 0) {
8383- console.log("Messages:", messageCount);
112112+// Method 1: Resolve using PLC directory
113113+async function resolveDIDWithPLC(did) {
114114+ return new Promise((resolve, reject) => {
115115+ const url = `https://plc.directory/${encodeURIComponent(did)}`;
116116+ console.log(`Making PLC directory request to: ${url}`);
117117+118118+ const options = getRequestOptions(url);
119119+120120+ const req = https.get(options, (res) => {
121121+ let data = '';
122122+123123+ // Log response status
124124+ console.log(`PLC Directory response status: ${res.statusCode}`);
125125+126126+ res.on('data', (chunk) => {
127127+ data += chunk;
128128+ });
129129+130130+ res.on('end', () => {
131131+ try {
132132+ console.log(`PLC raw response for ${did}: ${data.substring(0, 300)}...`);
133133+134134+ if (res.statusCode !== 200) {
135135+ console.warn(`Failed to resolve DID ${did} with PLC: HTTP ${res.statusCode}`);
136136+ resolve(null);
137137+ return;
138138+ }
139139+140140+ // Try to parse as JSON first
141141+ try {
142142+ const didDoc = JSON.parse(data);
143143+144144+ // Extract handle from alsoKnownAs
145145+ if (didDoc.alsoKnownAs && Array.isArray(didDoc.alsoKnownAs) && didDoc.alsoKnownAs.length > 0) {
146146+ console.log(`Found alsoKnownAs entries: ${JSON.stringify(didDoc.alsoKnownAs)}`);
147147+148148+ // Look for value starting with "at://"
149149+ const atValue = didDoc.alsoKnownAs.find(value => value.startsWith('at://'));
150150+ if (atValue) {
151151+ const handle = atValue.replace('at://', '');
152152+ console.log(`Successfully resolved ${did} to handle: ${handle}`);
153153+ resolve(handle);
154154+ return;
155155+ } else {
156156+ console.warn(`No 'at://' prefix found in alsoKnownAs for ${did}`);
157157+ }
158158+ } else {
159159+ console.warn(`No alsoKnownAs property found in DID document for ${did}`);
160160+ }
161161+ } catch (jsonError) {
162162+ console.log(`JSON parsing failed, trying regex: ${jsonError.message}`);
163163+ }
164164+165165+ // If JSON parsing fails or doesn't find handle, try regex as fallback
166166+ const atMatch = data.match(/at:\/\/([^"'\\s]+)/);
167167+ if (atMatch && atMatch[1]) {
168168+ const handle = atMatch[1];
169169+ console.log(`Regex extracted handle for ${did}: ${handle}`);
170170+ resolve(handle);
171171+ return;
172172+ }
173173+174174+ resolve(null); // No handle found
175175+ } catch (error) {
176176+ console.error(`Error parsing PLC directory response for ${did}:`, error);
177177+ resolve(null);
84178 }
179179+ });
180180+ });
181181+182182+ req.on('error', (error) => {
183183+ console.error(`Error fetching PLC document for ${did}:`, error);
184184+ resolve(null);
185185+ });
186186+187187+ req.on('timeout', () => {
188188+ console.error(`PLC request timeout for ${did}`);
189189+ req.destroy();
190190+ resolve(null);
191191+ });
192192+ });
193193+}
85194195195+// Method 2: Resolve using Bluesky API
196196+async function resolveDIDWithBskyAPI(did) {
197197+ return new Promise((resolve, reject) => {
198198+ // The Bluesky API endpoint for DID-to-handle resolution
199199+ const url = `https://api.bsky.app/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(did)}`;
200200+ console.log(`Making Bluesky API request to: ${url}`);
201201+202202+ const options = getRequestOptions(url);
203203+204204+ const req = https.get(options, (res) => {
205205+ let data = '';
206206+207207+ // Log response status
208208+ console.log(`Bluesky API response status: ${res.statusCode}`);
209209+210210+ res.on('data', (chunk) => {
211211+ data += chunk;
212212+ });
213213+214214+ res.on('end', () => {
86215 try {
8787- const message = JSON.parse(data.toString());
8888-8989- if (message.kind === "commit" &&
9090- message.commit &&
9191- message.commit.collection === FLUSHING_STATUS_NSID) {
9292-9393- flushingFoundCount++;
9494- console.log("Found flushing record:", flushingFoundCount);
9595- console.log(JSON.stringify(message, null, 2));
9696-9797- const recordPath = message.commit.collection + "/" + message.commit.rkey;
9898- const authorDid = message.did;
9999- const cid = message.commit.cid || "cid_" + Date.now();
100100-101101- let recordText = "No text found";
102102- let recordEmoji = "🚽";
103103- let recordCreatedAt = new Date().toISOString();
104104- let recordType = FLUSHING_STATUS_NSID;
105105-106106- if (message.commit.record) {
107107- if (message.commit.record.text) {
108108- recordText = message.commit.record.text;
109109- }
110110- if (message.commit.record.emoji) {
111111- recordEmoji = message.commit.record.emoji;
112112- }
113113- if (message.commit.record.createdAt) {
114114- recordCreatedAt = message.commit.record.createdAt;
115115- }
116116- if (message.commit.record.$type) {
117117- recordType = message.commit.record.$type;
118118- }
119119- }
120120-121121- console.log("Author:", authorDid);
122122- console.log("Path:", recordPath);
123123- console.log("Text:", recordText);
124124- console.log("Emoji:", recordEmoji);
125125- console.log("Created at:", recordCreatedAt);
126126-127127- const uri = "at://" + authorDid + "/" + recordPath;
128128- console.log("URI:", uri);
129129-130130- // Save to log file
131131- try {
132132- // Create a record with all the info we need - matching structure
133133- const flushingRecord = {
134134- did: authorDid,
135135- collection: message.commit.collection,
136136- type: recordType,
137137- created_at: recordCreatedAt,
138138- emoji: recordEmoji,
139139- text: recordText,
140140- cid: cid,
141141- uri: uri,
142142- handle: "unknown", // We'll add real handle resolution later
143143- indexed_at: new Date().toISOString()
144144- };
145145-146146- // Save to file
147147- console.log("Saving record to log file...");
148148- const result = await saveRecord(flushingRecord);
149149-150150- if (result.success) {
151151- console.log("Record saved successfully!");
152152-153153- // Update stats counter
154154- try {
155155- const statsFile = path.resolve(__dirname, "flushing-stats.json");
156156- let stats = { total_records: 0 };
157157-158158- if (fs.existsSync(statsFile)) {
159159- stats = JSON.parse(fs.readFileSync(statsFile, 'utf8'));
160160- }
161161-162162- stats.total_records++;
163163- stats.last_update = new Date().toISOString();
164164-165165- fs.writeFileSync(statsFile, JSON.stringify(stats, null, 2));
166166-167167- // Only log every 10 records to reduce noise
168168- if (flushingFoundCount % 10 === 0) {
169169- console.log(`Total records processed: ${stats.total_records}`);
170170- }
171171- } catch (statsErr) {
172172- console.error("Error updating stats:", statsErr.message);
173173- }
174174- } else {
175175- console.error("Failed to save record to file");
176176- }
177177-178178- // Keep track of how many records we've processed
179179- if (flushingFoundCount % 50 === 0) {
180180- console.log(`Processed ${flushingFoundCount} flushing records in this session`);
181181- }
182182-183183- } catch (err) {
184184- console.error("Error processing record:", err.message);
185185- }
186186- }
187187- } catch (err) {
188188- console.error("Error processing message:", err.message);
216216+ if (res.statusCode !== 200) {
217217+ console.warn(`Failed to resolve DID ${did} with Bluesky API: HTTP ${res.statusCode}`);
218218+ resolve(null);
219219+ return;
220220+ }
221221+222222+ const repoInfo = JSON.parse(data);
223223+224224+ if (repoInfo && repoInfo.handle) {
225225+ const handle = repoInfo.handle;
226226+ console.log(`Successfully resolved ${did} to handle: ${handle} using Bluesky API`);
227227+ resolve(handle);
228228+ return;
229229+ }
230230+231231+ resolve(null); // No handle found
232232+ } catch (error) {
233233+ console.error(`Error parsing Bluesky API response for ${did}:`, error);
234234+ resolve(null);
189235 }
236236+ });
190237 });
191191-192192- ws.on("error", (error) => {
193193- console.error("WebSocket error:", error.message);
238238+239239+ req.on('error', (error) => {
240240+ console.error(`Error fetching from Bluesky API for ${did}:`, error);
241241+ resolve(null);
194242 });
243243+244244+ req.on('timeout', () => {
245245+ console.error(`Bluesky API request timeout for ${did}`);
246246+ req.destroy();
247247+ resolve(null);
248248+ });
249249+ });
250250+}
195251196196- ws.on("close", () => {
197197- console.log("Connection closed, reconnecting in 5s");
198198- setTimeout(connect, 5000);
252252+// Method 3: Try Bluesky official handle resolver
253253+async function resolveDIDWithHandleResolver(did) {
254254+ try {
255255+ // First check if this is already a handle format (user.bsky.social)
256256+ if (did.includes('.') && !did.startsWith('did:')) {
257257+ console.log(`Input appears to be a handle already: ${did}`);
258258+ return did;
259259+ }
260260+261261+ return new Promise((resolve, reject) => {
262262+ const url = `https://api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(did)}`;
263263+ console.log(`Making handle resolver request to: ${url}`);
264264+265265+ const options = getRequestOptions(url);
266266+267267+ const req = https.get(options, (res) => {
268268+ let data = '';
269269+270270+ // Log response status
271271+ console.log(`Handle resolver response status: ${res.statusCode}`);
272272+273273+ res.on('data', (chunk) => {
274274+ data += chunk;
275275+ });
276276+277277+ res.on('end', () => {
278278+ try {
279279+ if (res.statusCode !== 200) {
280280+ console.warn(`Failed to resolve ${did} with handle resolver: HTTP ${res.statusCode}`);
281281+ resolve(null);
282282+ return;
283283+ }
284284+285285+ const response = JSON.parse(data);
286286+287287+ if (response && response.did === did) {
288288+ // This means we resolved a handle to a DID, but we want the opposite
289289+ resolve(null);
290290+ return;
291291+ }
292292+293293+ resolve(null); // No handle found
294294+ } catch (error) {
295295+ console.error(`Error parsing handle resolver response for ${did}:`, error);
296296+ resolve(null);
297297+ }
298298+ });
299299+ });
300300+301301+ req.on('error', (error) => {
302302+ console.error(`Error fetching from handle resolver for ${did}:`, error);
303303+ resolve(null);
304304+ });
305305+306306+ req.on('timeout', () => {
307307+ console.error(`Handle resolver request timeout for ${did}`);
308308+ req.destroy();
309309+ resolve(null);
310310+ });
199311 });
312312+ } catch (error) {
313313+ console.error(`Exception in handle resolver for ${did}:`, error);
314314+ return null;
315315+ }
200316}
201317202202-// Start the worker
203203-async function start() {
204204- await setupSystem();
205205- connect();
318318+// Process Jetstream event
319319+async function processEvent(event) {
320320+ try {
321321+ // Save the cursor for each event we process
322322+ saveCursor(event.time_us);
323323+324324+ // Only process commit events
325325+ if (event.kind !== 'commit') {
326326+ // Don't log skipped events to reduce noise
327327+ return;
328328+ }
329329+330330+ // Only process commits for our target collection
331331+ if (event.commit.collection !== WANTED_COLLECTION) {
332332+ // Don't log skipped collections to reduce noise
333333+ return;
334334+ }
335335+336336+ // Now we can log since we know it's relevant
337337+ console.log(`Processing event: ${JSON.stringify(event).substring(0, 500)}...`);
338338+339339+ // Extract record data
340340+ const { did, time_us } = event;
341341+ const { operation, collection, rkey, record, cid } = event.commit;
342342+343343+ console.log(`Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`);
344344+345345+ // Skip delete operations
346346+ if (operation === 'delete') {
347347+ console.log(`Skipping delete operation`);
348348+ return;
349349+ }
350350+351351+ // Try different approaches to get a handle
352352+353353+ // Approach 1: Check if handle is already in the record
354354+ let handle = null;
355355+ if (record && record.handle) {
356356+ console.log(`Found handle in record: ${record.handle}`);
357357+ handle = record.handle;
358358+ }
359359+360360+ // Approach 2: Try to resolve via APIs
361361+ if (!handle) {
362362+ console.log(`Resolving handle for DID: ${did}`);
363363+ handle = await resolveDIDToHandle(did);
364364+365365+ if (handle) {
366366+ console.log(`Successfully resolved handle: ${handle}`);
367367+ } else {
368368+ console.log(`Failed to resolve handle for DID: ${did}`);
369369+370370+ // Check existing records in database for this DID
371371+ try {
372372+ const { data, error } = await supabase
373373+ .from('flushing_records')
374374+ .select('handle')
375375+ .eq('did', did)
376376+ .not('handle', 'is', null)
377377+ .not('handle', 'eq', 'unknown')
378378+ .order('indexed_at', { ascending: false })
379379+ .limit(1);
380380+381381+ if (!error && data && data.length > 0 && data[0].handle) {
382382+ handle = data[0].handle;
383383+ console.log(`Found handle in database for DID ${did}: ${handle}`);
384384+ } else {
385385+ console.log(`No existing handle found in database for DID: ${did}`);
386386+ handle = 'unknown'; // Set explicitly to unknown
387387+ }
388388+ } catch (dbError) {
389389+ console.error(`Error checking database for existing handle: ${dbError.message}`);
390390+ handle = 'unknown'; // Set explicitly if DB query fails
391391+ }
392392+ }
393393+ }
394394+395395+ // Double-check that we have a handle, default to 'unknown' if not
396396+ if (!handle) {
397397+ console.log(`No handle could be resolved for DID ${did}, using 'unknown'`);
398398+ handle = 'unknown';
399399+ }
400400+401401+ // Prepare data for insertion - DO NOT include id field at all
402402+ const recordData = {
403403+ did,
404404+ collection,
405405+ type: record?.$type,
406406+ created_at: record?.createdAt || new Date().toISOString(),
407407+ emoji: record?.emoji,
408408+ text: record?.text,
409409+ cid,
410410+ uri: `at://${did}/${collection}/${rkey}`,
411411+ indexed_at: new Date().toISOString(),
412412+ handle: handle // This will never be null or undefined now
413413+ };
414414+415415+ console.log(`Preparing to insert/update record with handle '${recordData.handle}'`);
416416+417417+ // First check if the record already exists
418418+ const { data: existingData, error: checkError } = await supabase
419419+ .from('flushing_records')
420420+ .select('id, handle')
421421+ .eq('uri', recordData.uri)
422422+ .limit(1);
423423+424424+ let result;
425425+426426+ if (checkError) {
427427+ console.error(`Error checking if record exists: ${checkError.message}`);
428428+ return;
429429+ }
430430+431431+ // If record exists, update it
432432+ if (existingData && existingData.length > 0) {
433433+ console.log(`Record with URI ${recordData.uri} already exists, updating`);
434434+435435+ // If existing record has a valid handle and current handle is 'unknown', use the existing handle
436436+ if (existingData[0].handle && existingData[0].handle !== 'unknown' && recordData.handle === 'unknown') {
437437+ console.log(`Keeping existing handle '${existingData[0].handle}' instead of replacing with 'unknown'`);
438438+ recordData.handle = existingData[0].handle;
439439+ }
440440+441441+ const { data, error } = await supabase
442442+ .from('flushing_records')
443443+ .update(recordData)
444444+ .eq('uri', recordData.uri);
445445+446446+ result = { data, error };
447447+ }
448448+ // Otherwise insert a new record
449449+ else {
450450+ console.log(`Record with URI ${recordData.uri} doesn't exist, inserting with handle: ${recordData.handle}`);
451451+ const { data, error } = await supabase
452452+ .from('flushing_records')
453453+ .insert(recordData);
454454+455455+ result = { data, error };
456456+ }
457457+458458+ // Check the result of the operation
459459+ if (result.error) {
460460+ console.error(`Error saving record to Supabase: ${result.error.message}`);
461461+ console.error(`Failed record data: ${JSON.stringify(recordData)}`);
462462+ } else {
463463+ console.log(`Successfully saved record: ${recordData.uri} (handle: ${recordData.handle})`);
464464+ }
465465+466466+ } catch (error) {
467467+ console.error(`Error processing event: ${error.message}`);
468468+ console.error(error.stack);
469469+ }
206470}
207471208208-// Run the worker
209209-start();
472472+// Process 'identity' events when they come through the firehose
473473+async function processIdentityEvent(event) {
474474+ try {
475475+ if (event.kind !== 'identity' || !event.identity) {
476476+ return;
477477+ }
478478+479479+ const { did, handle } = event.identity;
480480+481481+ if (did && handle) {
482482+ // Check if we have any records with this DID that have 'unknown' handles
483483+ try {
484484+ const { data, error } = await supabase
485485+ .from('flushing_records')
486486+ .select('uri')
487487+ .eq('did', did)
488488+ .eq('handle', 'unknown');
489489+490490+ if (!error && data && data.length > 0) {
491491+ console.log(`Found ${data.length} records with DID ${did} and unknown handle. Updating to ${handle}...`);
492492+493493+ // Update all matching records with the new handle
494494+ const { updateData, updateError } = await supabase
495495+ .from('flushing_records')
496496+ .update({ handle })
497497+ .eq('did', did)
498498+ .eq('handle', 'unknown');
499499+500500+ if (updateError) {
501501+ console.error(`Error updating records with DID ${did}: ${updateError.message}`);
502502+ } else {
503503+ console.log(`Successfully updated handle for records with DID ${did} to ${handle}`);
504504+ }
505505+ }
506506+ } catch (dbError) {
507507+ console.error(`Error updating unknown handles: ${dbError.message}`);
508508+ }
509509+ }
510510+ } catch (error) {
511511+ console.error(`Error processing identity event: ${error.message}`);
512512+ }
513513+}
514514+515515+// Connect to Jetstream and process events
516516+function connectToJetstream() {
517517+ const cursor = loadCursor();
518518+519519+ // Building the URL with query parameters - now include identity events!
520520+ // Including identity events will help us maintain DID-to-handle mapping
521521+ let url = `${JETSTREAM_URL}?wantedCollections=${WANTED_COLLECTION}`;
522522+ if (cursor) {
523523+ // Subtract a few seconds (in microseconds) to ensure no gaps
524524+ const rewindCursor = parseInt(cursor) - 5000000; // 5 seconds in microseconds
525525+ url += `&cursor=${rewindCursor}`;
526526+ }
527527+528528+ console.log(`Connecting to Jetstream: ${url}`);
529529+530530+ const ws = new WebSocket(url);
531531+532532+ ws.on('open', () => {
533533+ console.log('Connected to Jetstream');
534534+ });
535535+536536+ ws.on('message', async (data) => {
537537+ try {
538538+ const event = JSON.parse(data.toString());
539539+540540+ // Process identity events to keep our DID-to-handle mapping up to date
541541+ if (event.kind === 'identity') {
542542+ await processIdentityEvent(event);
543543+ }
544544+545545+ // Process other events normally
546546+ await processEvent(event);
547547+ } catch (error) {
548548+ console.error('Error parsing message:', error);
549549+ // Don't log message data to reduce noise
550550+ }
551551+ });
552552+553553+ ws.on('error', (error) => {
554554+ console.error('WebSocket error:', error);
555555+ setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
556556+ });
557557+558558+ ws.on('close', () => {
559559+ console.log('Connection closed. Attempting to reconnect...');
560560+ setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
561561+ });
562562+563563+ // Heartbeat to keep the connection alive
564564+ const interval = setInterval(() => {
565565+ if (ws.readyState === WebSocket.OPEN) {
566566+ ws.ping();
567567+ } else {
568568+ clearInterval(interval);
569569+ }
570570+ }, 30000);
571571+}
210572211211-process.on("SIGINT", () => {
212212- console.log("Shutting down");
573573+// Start the application
574574+function start() {
575575+ console.log('Starting Jetstream consumer...');
576576+ connectToJetstream();
577577+578578+ // Handle process termination
579579+ process.on('SIGINT', () => {
580580+ console.log('Process terminated. Exiting...');
213581 process.exit(0);
214214-});582582+ });
583583+}
584584+585585+start();
+61-19
app/src/app/api/bluesky/feed-direct/route.ts
···87878888 entries = data || [];
8989 } else {
9090- // Main query: get the most recent entries
9191- // Use a direct SQL query for maximum reliability
9292- const { data, error } = await supabase.rpc('get_latest_entries', {
9393- max_entries: MAX_ENTRIES
9494- });
9090+ // First try: Direct raw SQL query via executeRaw (most reliable)
9191+ try {
9292+ // Use a direct SQL query to completely bypass any ORM and query builder caching
9393+ const rawQuery = `
9494+ SELECT * FROM flushing_records
9595+ ORDER BY id DESC
9696+ LIMIT ${MAX_ENTRIES}
9797+ `;
9898+9999+ console.log('Executing direct SQL query:', rawQuery);
100100+101101+ const { data: directData, error: directError } = await supabase.rpc(
102102+ 'execute_raw_query',
103103+ { raw_query: rawQuery }
104104+ );
105105+106106+ if (directError) {
107107+ console.error('Raw SQL query error:', directError);
108108+ // Continue to next approach
109109+ } else if (directData && Array.isArray(directData) && directData.length > 0) {
110110+ console.log(`✅ Direct SQL query successful, found ${directData.length} entries`);
111111+ entries = directData;
112112+113113+ // We got data, return early
114114+ return entries;
115115+ }
116116+ } catch (rawError) {
117117+ console.error('Exception executing raw SQL:', rawError);
118118+ // Continue to next approach
119119+ }
951209696- if (error) {
9797- console.error('RPC function error:', error);
121121+ // Second try: Using the RPC function approach
122122+ try {
123123+ console.log('Trying RPC function approach');
981249999- // Fallback to regular query if RPC fails
100100- console.log('Falling back to regular query');
101101- const { data: fallbackData, error: fallbackError } = await supabase
102102- .from('flushing_records')
103103- .select('*')
104104- .order('id', { ascending: false })
105105- .limit(MAX_ENTRIES);
125125+ const { data, error } = await supabase.rpc('get_latest_entries', {
126126+ max_entries: MAX_ENTRIES
127127+ });
128128+129129+ if (error) {
130130+ console.error('RPC function error:', error);
131131+ // Continue to fallback approach
132132+ } else if (data && Array.isArray(data) && data.length > 0) {
133133+ console.log(`✅ RPC function query successful, found ${data.length} entries`);
134134+ entries = data;
106135107107- if (fallbackError) {
108108- throw new Error(`Fallback query error: ${fallbackError.message}`);
136136+ // We got data, return early
137137+ return entries;
109138 }
139139+ } catch (rpcError) {
140140+ console.error('Exception in RPC function:', rpcError);
141141+ // Continue to fallback approach
142142+ }
143143+144144+ // Final fallback: Standard query builder approach
145145+ console.log('Falling back to standard query builder');
146146+ const { data: fallbackData, error: fallbackError } = await supabase
147147+ .from('flushing_records')
148148+ .select('*')
149149+ .order('id', { ascending: false })
150150+ .limit(MAX_ENTRIES);
110151111111- entries = fallbackData || [];
112112- } else {
113113- entries = data || [];
152152+ if (fallbackError) {
153153+ throw new Error(`Fallback query error: ${fallbackError.message}`);
114154 }
155155+156156+ entries = fallbackData || [];
115157 }
116158117159 console.log(`Query returned ${entries.length} entries`);
+24-2
app/src/app/page.tsx
···176176 setText('is ');
177177 setSuccess('Your flushing status has been updated!');
178178179179+ // Create a temporary entry to display immediately
180180+ if (result && result.uri && result.cid) {
181181+ const tempEntry: FlushingEntry = {
182182+ id: `temp-${Date.now()}`, // Create a temporary ID
183183+ uri: result.uri,
184184+ cid: result.cid,
185185+ authorDid: did,
186186+ authorHandle: handle,
187187+ text: formattedText,
188188+ emoji: selectedEmoji,
189189+ createdAt: new Date().toISOString()
190190+ };
191191+192192+ console.log('Adding temporary entry to feed:', tempEntry);
193193+194194+ // Add the temporary entry to the top of the feed
195195+ setEntries(prevEntries => [tempEntry, ...prevEntries]);
196196+197197+ // Also mark it as a new entry for animation
198198+ setNewEntryIds(new Set([tempEntry.id]));
199199+ }
200200+179201 // Close status form after successful submission
180202 setTimeout(() => {
181203 setStatusOpen(false);
182204 }, 2000);
183205184184- // Refresh the feed to show the new status
206206+ // Still refresh the feed after a delay to get the actual database entry
185207 setTimeout(() => {
186208 fetchLatestEntries(true);
187187- }, 1000);
209209+ }, 3000);
188210 } catch (err: any) {
189211 console.error('Failed to update status:', err);
190212 setStatusError(`Failed to update status: ${err.message || 'Unknown error'}`);
+26
execute_raw_query.sql
···11+-- Create a function to execute raw SQL queries safely
22+-- This allows direct SQL execution for better performance and reliability
33+44+CREATE OR REPLACE FUNCTION execute_raw_query(raw_query TEXT)
55+RETURNS JSONB
66+LANGUAGE plpgsql
77+SECURITY DEFINER -- This runs with the privileges of the function creator
88+AS $$
99+DECLARE
1010+ result JSONB;
1111+BEGIN
1212+ -- Only allow SELECT queries for security
1313+ IF position('SELECT' in upper(raw_query)) != 1 THEN
1414+ RAISE EXCEPTION 'Only SELECT queries are allowed';
1515+ END IF;
1616+1717+ -- Execute the query and convert result to JSON
1818+ EXECUTE 'SELECT json_agg(t) FROM (' || raw_query || ') t' INTO result;
1919+2020+ -- Return empty array instead of null if no results
2121+ RETURN COALESCE(result, '[]'::jsonb);
2222+END;
2323+$$;
2424+2525+-- Test the function
2626+SELECT execute_raw_query('SELECT id, did, handle, text FROM flushing_records ORDER BY id DESC LIMIT 3');