···11+# Supabase configuration
22+NEXT_PUBLIC_SUPABASE_URL=your-supabase-url
33+SUPABASE_SERVICE_ROLE_KEY=your-service-role-key
44+55+# Optional: Bluesky API configuration
66+# Only needed if you want to authenticate with the Bluesky API
77+# BLUESKY_API_USERNAME=your-bluesky-username
88+# BLUESKY_API_PASSWORD=your-bluesky-password
+16-3
app/README.md
···6677- Bluesky OAuth authentication
88- Custom lexicon schema for status updates
99-- Emoji selection
99+- Emoji selection
1010- Responsive design
1111+- Feed of all users' flushing status updates
11121213## Tech Stack
1314···1516- React
1617- TypeScript
1718- Bluesky AT Protocol
1919+- Supabase (for feed storage)
2020+- WebSockets (for firehose connection)
18211922## Local Development
2023···2528npm install
2629```
27302828-3. Start the development server:
3131+3. Create a `.env.local` file based on `.env.example` and add your Supabase credentials
3232+3333+4. Start the development server:
29343035```bash
3136npm run dev
3237```
33383434-4. Open [http://localhost:3000](http://localhost:3000) in your browser
3939+5. Open [http://localhost:3000](http://localhost:3000) in your browser
4040+4141+6. For the firehose connection (optional, for feed functionality):
4242+ - Set up a Supabase project with the SQL in the `sql/setup.sql` file
4343+ - Run the firehose worker script on a server:
4444+4545+```bash
4646+node scripts/firehose-worker.js
4747+```
35483649## Deployment
3750
···11+const WebSocket = require('ws');
22+const cbor = require('cbor-web');
33+const { createClient } = require('@supabase/supabase-js');
44+require('dotenv').config();
55+66+// Constants
77+const FIREHOSE_URL = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos';
88+const FLUSHING_STATUS_NSID = 'im.flushing.right.now';
99+1010+// Supabase setup - ensure you have these set in your .env file
1111+const supabaseUrl = process.env.SUPABASE_URL;
1212+const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY;
1313+const supabase = createClient(supabaseUrl, supabaseKey);
1414+1515+// Reconnection parameters
1616+const MAX_RECONNECT_DELAY = 30000; // 30 seconds
1717+let reconnectAttempts = 0;
1818+let ws = null;
1919+2020+// Connect to the firehose
2121+function connectToFirehose() {
2222+ console.log('Connecting to Bluesky firehose...');
2323+2424+ ws = new WebSocket(FIREHOSE_URL);
2525+2626+ ws.on('open', () => {
2727+ console.log('Connected to firehose.');
2828+ // Reset reconnect counter on successful connection
2929+ reconnectAttempts = 0;
3030+ });
3131+3232+ ws.on('message', async (data) => {
3333+ try {
3434+ // In a real implementation, parse CBOR data to extract repo operations
3535+ // For now, log the message to track activity
3636+ console.log('Received message from firehose');
3737+3838+ // Decode the CBOR message (this is a simplified version)
3939+ // The actual implementation would need to handle the header and payload separately
4040+ const decoded = cbor.decode(data);
4141+4242+ // Process the message if it's a commit
4343+ if (decoded.op === 1 && decoded.t === '#commit') {
4444+ // Process repo commit
4545+ const commit = decoded.payload;
4646+4747+ // Check if this commit contains a flushing record
4848+ const flushingOps = commit.ops.filter(op => {
4949+ return op.path.startsWith(FLUSHING_STATUS_NSID) && op.action === 'create';
5050+ });
5151+5252+ if (flushingOps.length > 0) {
5353+ console.log(`Found ${flushingOps.length} flushing records in commit from ${commit.repo}`);
5454+5555+ // Process each flushing record
5656+ for (const op of flushingOps) {
5757+ await processFlushingRecord(commit.repo, op.path, op.cid, commit.blocks);
5858+ }
5959+ }
6060+ }
6161+ } catch (error) {
6262+ console.error('Error processing message:', error);
6363+ }
6464+ });
6565+6666+ ws.on('error', (error) => {
6767+ console.error('WebSocket error:', error);
6868+ });
6969+7070+ ws.on('close', (code, reason) => {
7171+ console.log(`Connection closed: ${code} - ${reason}`);
7272+7373+ // Implement exponential backoff for reconnection
7474+ reconnectAttempts++;
7575+ const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), MAX_RECONNECT_DELAY);
7676+7777+ console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts})...`);
7878+ setTimeout(connectToFirehose, delay);
7979+ });
8080+}
8181+8282+// Process a flushing record and store it in Supabase
8383+async function processFlushingRecord(authorDid, recordPath, cid, blocks) {
8484+ try {
8585+ // Extract the record data from the blocks (simplified)
8686+ // In a real implementation, you would need to properly decode the IPLD blocks
8787+ const recordData = blocks[cid];
8888+8989+ if (!recordData) {
9090+ console.error('Record data not found in blocks');
9191+ return;
9292+ }
9393+9494+ // Extract the record URI
9595+ const uri = `at://${authorDid}/${recordPath}`;
9696+9797+ // Check if we already have this record
9898+ const { data: existingRecord } = await supabase
9999+ .from('flushing_entries')
100100+ .select('id')
101101+ .eq('uri', uri)
102102+ .single();
103103+104104+ if (existingRecord) {
105105+ console.log('Record already exists, skipping');
106106+ return;
107107+ }
108108+109109+ // Create a new entry in Supabase
110110+ const newEntry = {
111111+ uri,
112112+ cid,
113113+ author_did: authorDid,
114114+ text: recordData.text,
115115+ emoji: recordData.emoji,
116116+ created_at: recordData.createdAt
117117+ };
118118+119119+ const { error } = await supabase
120120+ .from('flushing_entries')
121121+ .insert(newEntry);
122122+123123+ if (error) {
124124+ console.error('Error inserting record:', error);
125125+ } else {
126126+ console.log('Successfully stored new flushing record');
127127+ }
128128+129129+ // Also try to resolve the author's handle if we don't have it
130130+ const { data: authorData } = await supabase
131131+ .from('users')
132132+ .select('handle')
133133+ .eq('did', authorDid)
134134+ .single();
135135+136136+ if (!authorData || !authorData.handle) {
137137+ // TODO: Use the Bluesky API to resolve the handle from the DID
138138+ // This would be done with the BskyAgent or direct API call
139139+ console.log('Need to resolve handle for DID:', authorDid);
140140+ }
141141+ } catch (error) {
142142+ console.error('Error processing flushing record:', error);
143143+ }
144144+}
145145+146146+// Create the necessary tables if they don't exist
147147+async function setupDatabase() {
148148+ try {
149149+ // Create flushing_entries table
150150+ const { error: entriesError } = await supabase.rpc('create_flushing_entries_table_if_not_exists');
151151+ if (entriesError) {
152152+ console.error('Error creating flushing_entries table:', entriesError);
153153+ }
154154+155155+ // Create users table
156156+ const { error: usersError } = await supabase.rpc('create_users_table_if_not_exists');
157157+ if (usersError) {
158158+ console.error('Error creating users table:', usersError);
159159+ }
160160+ } catch (error) {
161161+ console.error('Error setting up database:', error);
162162+ }
163163+}
164164+165165+// Start the worker
166166+async function start() {
167167+ console.log('Starting firehose worker...');
168168+169169+ // Check if we have the required environment variables
170170+ if (!supabaseUrl || !supabaseKey) {
171171+ console.error('Missing Supabase credentials. Please set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY environment variables.');
172172+ process.exit(1);
173173+ }
174174+175175+ // Setup the database
176176+ await setupDatabase();
177177+178178+ // Connect to the firehose
179179+ connectToFirehose();
180180+}
181181+182182+// Handle process termination
183183+process.on('SIGINT', () => {
184184+ console.log('Shutting down...');
185185+ if (ws) {
186186+ ws.close();
187187+ }
188188+ process.exit(0);
189189+});
190190+191191+// Start the worker
192192+start();
+83
app/sql/setup.sql
···11+-- Setup script for Supabase database
22+33+-- Users table to store user profile information
44+CREATE TABLE IF NOT EXISTS users (
55+ did TEXT PRIMARY KEY,
66+ handle TEXT NOT NULL,
77+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
88+);
99+1010+-- Function to create users table if it doesn't exist
1111+CREATE OR REPLACE FUNCTION create_users_table_if_not_exists()
1212+RETURNS void AS $$
1313+BEGIN
1414+ IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'users') THEN
1515+ CREATE TABLE users (
1616+ did TEXT PRIMARY KEY,
1717+ handle TEXT NOT NULL,
1818+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
1919+ );
2020+ END IF;
2121+END;
2222+$$ LANGUAGE plpgsql;
2323+2424+-- Flushing entries table to store flushing status records
2525+CREATE TABLE IF NOT EXISTS flushing_entries (
2626+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
2727+ uri TEXT UNIQUE NOT NULL,
2828+ cid TEXT NOT NULL,
2929+ author_did TEXT NOT NULL,
3030+ author_handle TEXT,
3131+ text TEXT NOT NULL,
3232+ emoji TEXT NOT NULL,
3333+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
3434+ indexed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
3535+);
3636+3737+-- Function to create flushing_entries table if it doesn't exist
3838+CREATE OR REPLACE FUNCTION create_flushing_entries_table_if_not_exists()
3939+RETURNS void AS $$
4040+BEGIN
4141+ IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'flushing_entries') THEN
4242+ CREATE TABLE flushing_entries (
4343+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
4444+ uri TEXT UNIQUE NOT NULL,
4545+ cid TEXT NOT NULL,
4646+ author_did TEXT NOT NULL,
4747+ author_handle TEXT,
4848+ text TEXT NOT NULL,
4949+ emoji TEXT NOT NULL,
5050+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
5151+ indexed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
5252+ );
5353+ END IF;
5454+END;
5555+$$ LANGUAGE plpgsql;
5656+5757+-- Create indexes for performance
5858+CREATE INDEX IF NOT EXISTS idx_flushing_entries_created_at ON flushing_entries(created_at DESC);
5959+CREATE INDEX IF NOT EXISTS idx_flushing_entries_author_did ON flushing_entries(author_did);
6060+CREATE INDEX IF NOT EXISTS idx_users_handle ON users(handle);
6161+6262+-- Create a trigger to update the users table when a flushing entry is inserted with an author_handle
6363+CREATE OR REPLACE FUNCTION update_user_from_entry()
6464+RETURNS TRIGGER AS $$
6565+BEGIN
6666+ IF NEW.author_handle IS NOT NULL THEN
6767+ INSERT INTO users (did, handle, updated_at)
6868+ VALUES (NEW.author_did, NEW.author_handle, NOW())
6969+ ON CONFLICT (did)
7070+ DO UPDATE SET
7171+ handle = EXCLUDED.handle,
7272+ updated_at = NOW();
7373+ END IF;
7474+ RETURN NEW;
7575+END;
7676+$$ LANGUAGE plpgsql;
7777+7878+-- Create the trigger
7979+DROP TRIGGER IF EXISTS trigger_update_user_from_entry ON flushing_entries;
8080+CREATE TRIGGER trigger_update_user_from_entry
8181+AFTER INSERT OR UPDATE ON flushing_entries
8282+FOR EACH ROW
8383+EXECUTE FUNCTION update_user_from_entry();
+147
app/src/app/api/bluesky/feed/route.ts
···11+import { NextRequest, NextResponse } from 'next/server';
22+import { createClient } from '@supabase/supabase-js';
33+import { BskyAgent } from '@atproto/api';
44+55+// Constants
66+const FLUSHING_STATUS_NSID = 'im.flushing.right.now';
77+const MAX_ENTRIES = 20;
88+99+// Cache settings to avoid hitting the database too frequently
1010+const CACHE_TTL = 5 * 60 * 1000; // 5 minutes in milliseconds
1111+let cachedEntries: any[] = [];
1212+let lastFetchTime = 0;
1313+1414+// Supabase client - using environment variables
1515+const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL || '';
1616+const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY || '';
1717+1818+// Bluesky agent for public interactions (used to resolve DIDs to handles if needed)
1919+const agent = new BskyAgent({
2020+ service: 'https://bsky.social'
2121+});
2222+2323+export async function GET(request: NextRequest) {
2424+ try {
2525+ const now = Date.now();
2626+2727+ // Check if cache is still valid
2828+ if (now - lastFetchTime < CACHE_TTL && cachedEntries.length > 0) {
2929+ console.log('Returning cached entries');
3030+ return NextResponse.json({ entries: cachedEntries });
3131+ }
3232+3333+ console.log('Fetching fresh entries');
3434+3535+ // If we have Supabase credentials, fetch from there
3636+ if (supabaseUrl && supabaseKey) {
3737+ const supabase = createClient(supabaseUrl, supabaseKey);
3838+3939+ // Fetch the latest entries from Supabase
4040+ const { data: entries, error } = await supabase
4141+ .from('flushing_entries')
4242+ .select(`
4343+ id,
4444+ uri,
4545+ cid,
4646+ author_did,
4747+ author_handle,
4848+ text,
4949+ emoji,
5050+ created_at
5151+ `)
5252+ .order('created_at', { ascending: false })
5353+ .limit(MAX_ENTRIES);
5454+5555+ if (error) {
5656+ throw new Error(`Supabase error: ${error.message}`);
5757+ }
5858+5959+ // Transform the data to match our client-side model
6060+ const processedEntries = (entries || []).map(entry => ({
6161+ id: entry.id,
6262+ uri: entry.uri,
6363+ cid: entry.cid,
6464+ authorDid: entry.author_did,
6565+ authorHandle: entry.author_handle || 'unknown',
6666+ text: entry.text,
6767+ emoji: entry.emoji,
6868+ createdAt: entry.created_at
6969+ }));
7070+7171+ // Update the cache
7272+ cachedEntries = processedEntries;
7373+ lastFetchTime = now;
7474+7575+ return NextResponse.json({ entries: processedEntries });
7676+ } else {
7777+ // If no Supabase credentials, fall back to mock data
7878+ console.log('No Supabase credentials, using mock data');
7979+ const mockEntries = await getMockEntries();
8080+8181+ // Update cache
8282+ cachedEntries = mockEntries;
8383+ lastFetchTime = now;
8484+8585+ return NextResponse.json({ entries: mockEntries });
8686+ }
8787+ } catch (error: any) {
8888+ console.error('Error fetching feed:', error);
8989+ return NextResponse.json(
9090+ { error: 'Failed to fetch feed', message: error.message },
9191+ { status: 500 }
9292+ );
9393+ }
9494+}
9595+9696+// Function to generate mock entries for testing
9797+// This is used when Supabase is not configured
9898+async function getMockEntries() {
9999+ // Create some mock entries for testing
100100+ const mockEntries = [
101101+ {
102102+ id: '1',
103103+ uri: 'at://did:plc:12345/im.flushing.right.now/1',
104104+ cid: 'bafyreiabc123',
105105+ authorDid: 'did:plc:12345',
106106+ authorHandle: 'alice.bsky.social',
107107+ text: 'Taking a quick break at work',
108108+ emoji: '🚽',
109109+ createdAt: new Date(Date.now() - 15 * 60000).toISOString() // 15 minutes ago
110110+ },
111111+ {
112112+ id: '2',
113113+ uri: 'at://did:plc:67890/im.flushing.right.now/2',
114114+ cid: 'bafyreiabc456',
115115+ authorDid: 'did:plc:67890',
116116+ authorHandle: 'bob.bsky.social',
117117+ text: 'Reading the news on my phone',
118118+ emoji: '📱',
119119+ createdAt: new Date(Date.now() - 45 * 60000).toISOString() // 45 minutes ago
120120+ },
121121+ {
122122+ id: '3',
123123+ uri: 'at://did:plc:abcdef/im.flushing.right.now/3',
124124+ cid: 'bafyreiabc789',
125125+ authorDid: 'did:plc:abcdef',
126126+ authorHandle: 'charlie.bsky.social',
127127+ text: 'Just finished a great book chapter',
128128+ emoji: '📚',
129129+ createdAt: new Date(Date.now() - 120 * 60000).toISOString() // 2 hours ago
130130+ }
131131+ ];
132132+133133+ return mockEntries;
134134+}
135135+136136+// Function to attempt to resolve a DID to a handle using the Bluesky API
137137+// This is used when we have a record with an author_did but no author_handle
138138+async function resolveDidToHandle(did: string): Promise<string | null> {
139139+ try {
140140+ await agent.login({ identifier: 'user.bsky.social', password: 'none' });
141141+ const response = await agent.getProfile({ actor: did });
142142+ return response.data.handle;
143143+ } catch (error) {
144144+ console.error(`Failed to resolve handle for DID ${did}:`, error);
145145+ return null;
146146+ }
147147+}
···11+1. Do you want to implement a server-side or client-side solution for the
22+33+firehose?
44+55+- I don't really know the difference, can you recommend? Maybe we start small and upgrade later?
66+77+1. Do you have a preferred WebSocket library for Node.js or the browser?
88+99+- my answer: I'm not sure, I don't know enough about this. Could you make a recommendation and we can try it and then change course later if needed?
1010+1111+1. How do you want to handle CBOR encoding/decoding?
1212+1313+- my answer: I'm not sure, recommend?
1414+1515+1. How many entries do you want to display in the feed?
1616+1717+- my answer: maybe like the last 20 to start?
1818+1919+1. Do you need historical data or just new activity?
2020+2121+- my answer: I'd like historically the last 20 ideally, would that be hard?
2222+2323+1. Any UI preferences for displaying the feed?
2424+2525+- my answer: let's do a simple clean list that shows the user handle, timestamp, emoji + text, and then if you click on thier username it takes you to their profile at https://bsky.app/profile/username.example
2626+2727+1. Authentication requirements for firehose access?
2828+2929+- I don't think it needs auth, can you check the documentation? Let me know what you need to proceed.
3030+3131+1. Do you want to filter the firehose for im.flushing.right.now records on
3232+3333+the server or client?
3434+3535+- I'm not sure, maybe client? what's easier?
3636+3737+1. Do you want to integrate this with a database for persistence?
3838+3939+- yeah I have a supabase account that i'd like to connect
4040+4141+1. Environment considerations
4242+4343+- Is this app deployed on a platform that supports WebSockets? I don't know, does Vercel support that?