This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

add out of order banner

+104 -74
Screenshot 2025-03-08 at 10.12.06 PM.png

This is a binary file and will not be displayed.

+89 -74
app/scripts/firehose-worker.js
··· 1 1 const WebSocket = require("ws"); 2 - const { createClient } = require("@supabase/supabase-js"); 3 2 const path = require("path"); 3 + const fs = require("fs"); 4 4 require("dotenv").config({ path: path.resolve(__dirname, "../../.env") }); // Load environment variables from .env file in app root 5 5 6 6 const JETSTREAM_URL = "wss://jetstream2.us-west.bsky.network/subscribe"; 7 7 const FLUSHING_STATUS_NSID = "im.flushing.right.now"; 8 + const LOG_FILE = path.resolve(__dirname, "flushing-logs.jsonl"); 8 9 9 - // Supabase setup from .env file 10 - const supabaseUrl = process.env.SUPABASE_URL; 11 - const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY; 10 + console.log("Starting firehose worker with file storage"); 11 + console.log(`Will save records to: ${LOG_FILE}`); 12 12 13 - if (!supabaseUrl || !supabaseKey) { 14 - console.error("Missing Supabase credentials. Add SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY to your .env file"); 15 - process.exit(1); 13 + // Create log directory if needed 14 + const logDir = path.dirname(LOG_FILE); 15 + if (!fs.existsSync(logDir)) { 16 + fs.mkdirSync(logDir, { recursive: true }); 16 17 } 17 18 18 - const supabase = createClient(supabaseUrl, supabaseKey); 19 + // Function to append records to a log file 20 + function saveRecord(record) { 21 + return new Promise((resolve, reject) => { 22 + const logEntry = JSON.stringify(record) + "\n"; 23 + fs.appendFile(LOG_FILE, logEntry, (err) => { 24 + if (err) { 25 + console.error("Error writing to log file:", err); 26 + reject(err); 27 + } else { 28 + resolve({ success: true }); 29 + } 30 + }); 31 + }); 32 + } 19 33 20 - // Ensure the table exists 21 - async function setupDatabase() { 34 + // Test file writing on startup 35 + async function setupSystem() { 22 36 try { 23 - console.log("Setting up database..."); 37 + console.log("Testing file logging system..."); 24 38 25 - // Check if the table already exists 26 - const { error: queryError } = await supabase 27 - .from('flushing_records') 28 - .select('id', { count: 'exact', head: true }); 39 + // Test if we can write to the log file 40 + const testRecord = { 41 + type: "startup", 42 + timestamp: new Date().toISOString(), 43 + message: "Firehose worker started" 44 + }; 29 45 30 - // If no error, table exists 31 - if (!queryError) { 32 - console.log("Table 'flushing_records' already exists"); 33 - return; 46 + await saveRecord(testRecord); 47 + console.log("✅ File logging test successful"); 48 + 49 + // Create stats counter file if it doesn't exist 50 + const statsFile = path.resolve(__dirname, "flushing-stats.json"); 51 + if (!fs.existsSync(statsFile)) { 52 + const initialStats = { 53 + total_records: 0, 54 + start_time: new Date().toISOString(), 55 + last_update: new Date().toISOString() 56 + }; 57 + fs.writeFileSync(statsFile, JSON.stringify(initialStats, null, 2)); 58 + console.log("Created new stats file"); 34 59 } 35 60 36 - // Create the table using SQL 37 - const { error: sqlError } = await supabase.sql` 38 - CREATE TABLE IF NOT EXISTS flushing_records ( 39 - id SERIAL PRIMARY KEY, 40 - did TEXT NOT NULL, 41 - collection TEXT NOT NULL, 42 - type TEXT NOT NULL, 43 - created_at TIMESTAMP WITH TIME ZONE NOT NULL, 44 - emoji TEXT, 45 - text TEXT, 46 - cid TEXT NOT NULL, 47 - uri TEXT UNIQUE NOT NULL, 48 - indexed_at TIMESTAMP WITH TIME ZONE DEFAULT now() 49 - ); 50 - 51 - CREATE INDEX IF NOT EXISTS flushing_records_did_idx ON flushing_records(did); 52 - `; 53 - 54 - if (sqlError) { 55 - console.error("Error creating table:", sqlError); 56 - process.exit(1); 57 - } 58 - console.log("Table created successfully"); 59 61 } catch (err) { 60 - console.error("Error setting up database:", err); 62 + console.error("Error setting up file logging:", err); 61 63 process.exit(1); 62 64 } 63 65 } ··· 125 127 const uri = "at://" + authorDid + "/" + recordPath; 126 128 console.log("URI:", uri); 127 129 128 - // Save to Supabase 130 + // Save to log file 129 131 try { 130 - // Check if record already exists 131 - const { data: existingData, error: checkError } = await supabase 132 - .from("flushing_records") 133 - .select("id") 134 - .eq("uri", uri) 135 - .limit(1); 136 - 137 - if (checkError) { 138 - console.error("Error checking for existing record:", checkError.message); 139 - return; 140 - } 141 - 142 - if (existingData && existingData.length > 0) { 143 - console.log("Record already exists, skipping"); 144 - return; 145 - } 146 - 147 - // Insert new record 148 - const newRecord = { 132 + // Create a record with all the info we need - matching structure 133 + const flushingRecord = { 149 134 did: authorDid, 150 135 collection: message.commit.collection, 151 136 type: recordType, ··· 153 138 emoji: recordEmoji, 154 139 text: recordText, 155 140 cid: cid, 156 - uri: uri 141 + uri: uri, 142 + handle: "unknown", // We'll add real handle resolution later 143 + indexed_at: new Date().toISOString() 157 144 }; 158 - 159 - const { error: insertError } = await supabase 160 - .from("flushing_records") 161 - .insert(newRecord); 162 - 163 - if (insertError) { 164 - console.error("Error saving record:", insertError.message); 145 + 146 + // Save to file 147 + console.log("Saving record to log file..."); 148 + const result = await saveRecord(flushingRecord); 149 + 150 + if (result.success) { 151 + console.log("Record saved successfully!"); 152 + 153 + // Update stats counter 154 + try { 155 + const statsFile = path.resolve(__dirname, "flushing-stats.json"); 156 + let stats = { total_records: 0 }; 157 + 158 + if (fs.existsSync(statsFile)) { 159 + stats = JSON.parse(fs.readFileSync(statsFile, 'utf8')); 160 + } 161 + 162 + stats.total_records++; 163 + stats.last_update = new Date().toISOString(); 164 + 165 + fs.writeFileSync(statsFile, JSON.stringify(stats, null, 2)); 166 + 167 + // Only log every 10 records to reduce noise 168 + if (flushingFoundCount % 10 === 0) { 169 + console.log(`Total records processed: ${stats.total_records}`); 170 + } 171 + } catch (statsErr) { 172 + console.error("Error updating stats:", statsErr.message); 173 + } 165 174 } else { 166 - console.log("Record saved successfully"); 175 + console.error("Failed to save record to file"); 167 176 } 177 + 178 + // Keep track of how many records we've processed 179 + if (flushingFoundCount % 50 === 0) { 180 + console.log(`Processed ${flushingFoundCount} flushing records in this session`); 181 + } 182 + 168 183 } catch (err) { 169 - console.error("Error interacting with database:", err.message); 184 + console.error("Error processing record:", err.message); 170 185 } 171 186 } 172 187 } catch (err) { ··· 186 201 187 202 // Start the worker 188 203 async function start() { 189 - await setupDatabase(); 204 + await setupSystem(); 190 205 connect(); 191 206 } 192 207
+11
app/src/app/feed/feed.module.css
··· 64 64 margin-bottom: 1rem; 65 65 } 66 66 67 + .notice { 68 + background-color: #fff3e0; 69 + color: #e65100; 70 + padding: 1rem; 71 + border-radius: 4px; 72 + margin-bottom: 1.5rem; 73 + border-left: 4px solid #ff9800; 74 + font-size: 1.1rem; 75 + line-height: 1.4; 76 + } 77 + 67 78 .loadingContainer { 68 79 display: flex; 69 80 flex-direction: column;
+4
app/src/app/feed/page.tsx
··· 73 73 </p> 74 74 </header> 75 75 76 + <div className={styles.notice}> 77 + ⚠️ The flush feed is currently out of order. You can still make flushes that save to your PDS, but the feed here won't update until we fix the leak. Sorry for the inconvenience! 78 + </div> 79 + 76 80 <div className={styles.controls}> 77 81 <button 78 82 onClick={() => fetchLatestEntries(true)}