Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

eventconsumer: unify knot/spindle sources, monotonic cursor

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
date (May 29, 2026, 2:50 PM +0300) commit 3bf37006 parent f8c74ae4 change-id xwypyztn
+422 -153
+74 -48
eventconsumer/consumer.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "log/slog" 7 - "math/rand" 7 + "net/http" 8 8 "net/url" 9 9 "sync" 10 10 "time" 11 11 12 12 "tangled.org/core/eventconsumer/cursor" 13 + "tangled.org/core/eventstream" 13 14 "tangled.org/core/log" 14 15 15 16 "github.com/avast/retry-go/v4" 16 17 "github.com/gorilla/websocket" 17 18 ) 18 19 19 - type ProcessFunc func(ctx context.Context, source Source, message Message) error 20 - 21 - type Message struct { 22 - Rkey string 23 - Nsid string 24 - Created int64 `json:"created"` 25 - EventJson json.RawMessage `json:"event"` 26 - } 20 + type ProcessFunc func(ctx context.Context, source Source, event eventstream.Event) error 27 21 28 22 type ConsumerConfig struct { 29 23 Sources map[Source]struct{} ··· 34 28 WorkerCount int 35 29 QueueSize int 36 30 Logger *slog.Logger 37 - Dev bool 38 31 CursorStore cursor.Store 32 + URLFunc func(Source, int64) (*url.URL, error) 33 + 34 + Dialer *websocket.Dialer 35 + RequestHeader http.Header 36 + MaxRetryAttempts uint 37 + OnConnectExceeded func(Source, error) 39 38 } 40 39 41 40 func NewConsumerConfig() *ConsumerConfig { ··· 44 43 } 45 44 } 46 45 47 - type Source interface { 48 - // url to start streaming events from 49 - Url(cursor int64, dev bool) (*url.URL, error) 50 - // cache key for cursor storage 51 - Key() string 52 - } 53 - 54 46 type Consumer struct { 55 - wg sync.WaitGroup 56 - dialer *websocket.Dialer 57 - jobQueue chan job 58 - logger *slog.Logger 59 - randSource *rand.Rand 47 + sourceWg sync.WaitGroup 48 + workerWg sync.WaitGroup 49 + dialer *websocket.Dialer 50 + jobQueue chan job 51 + logger *slog.Logger 60 52 61 53 // sourcesMu guards sources. It must only be held for short, non-blocking 62 54 // map operations; never across a blocking call (dial, read, close). ··· 69 61 type sourceState struct { 70 62 cancel context.CancelFunc 71 63 conn *websocket.Conn 64 + 65 + cursorMu sync.Mutex 66 + cursorMax int64 72 67 } 73 68 74 69 type job struct { ··· 97 92 } 98 93 if cfg.CursorStore == nil { 99 94 cfg.CursorStore = &cursor.MemoryStore{} 95 + } 96 + if cfg.URLFunc == nil { 97 + cfg.URLFunc = DefaultURL(false) 98 + } 99 + dialer := cfg.Dialer 100 + if dialer == nil { 101 + dialer = websocket.DefaultDialer 100 102 } 101 103 return &Consumer{ 102 - cfg: cfg, 103 - dialer: websocket.DefaultDialer, 104 - jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 105 - logger: cfg.Logger, 106 - randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 107 - sources: make(map[Source]*sourceState), 104 + cfg: cfg, 105 + dialer: dialer, 106 + jobQueue: make(chan job, cfg.QueueSize), 107 + logger: cfg.Logger, 108 + sources: make(map[Source]*sourceState), 108 109 } 109 110 } 110 111 111 112 func (c *Consumer) Start(ctx context.Context) { 112 113 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 113 114 114 - // start workers 115 115 for range c.cfg.WorkerCount { 116 - c.wg.Add(1) 116 + c.workerWg.Add(1) 117 117 go c.worker(ctx) 118 118 } 119 119 120 - // start streaming 121 120 for source := range c.cfg.Sources { 122 121 c.AddSource(ctx, source) 123 122 } 124 123 } 125 124 126 125 func (c *Consumer) Stop() { 127 - // snapshot conns under lock so we don't hold sourcesMu across Close 126 + // snapshot cancels and conns under lock so we don't hold sourcesMu across Close 128 127 c.sourcesMu.Lock() 128 + cancels := make([]context.CancelFunc, 0, len(c.sources)) 129 129 conns := make([]*websocket.Conn, 0, len(c.sources)) 130 130 for _, st := range c.sources { 131 + if st.cancel != nil { 132 + cancels = append(cancels, st.cancel) 133 + } 131 134 if st.conn != nil { 132 135 conns = append(conns, st.conn) 133 136 } 134 137 } 135 138 c.sourcesMu.Unlock() 136 139 140 + for _, cancel := range cancels { 141 + cancel() 142 + } 137 143 for _, conn := range conns { 138 144 conn.Close() 139 145 } 140 146 141 - c.wg.Wait() 147 + c.sourceWg.Wait() 142 148 close(c.jobQueue) 149 + c.workerWg.Wait() 143 150 } 144 151 145 152 func (c *Consumer) AddSource(ctx context.Context, s Source) { ··· 153 160 c.sources[s] = &sourceState{cancel: cancel} 154 161 c.sourcesMu.Unlock() 155 162 156 - c.wg.Add(1) 163 + c.sourceWg.Add(1) 157 164 go c.startConnectionLoop(srcCtx, s) 158 165 } 159 166 ··· 180 187 } 181 188 182 189 func (c *Consumer) worker(ctx context.Context) { 183 - defer c.wg.Done() 190 + defer c.workerWg.Done() 184 191 for { 185 192 select { 186 193 case <-ctx.Done(): ··· 190 197 return 191 198 } 192 199 193 - var msg Message 194 - err := json.Unmarshal(j.message, &msg) 200 + var ev eventstream.Event 201 + err := json.Unmarshal(j.message, &ev) 195 202 if err != nil { 196 203 c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err) 197 - return 204 + continue 198 205 } 199 206 200 - if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 207 + if err := c.cfg.ProcessFunc(ctx, j.source, ev); err != nil { 201 208 c.logger.Error("error processing message", "source", j.source, "err", err) 202 209 } 203 210 204 - cursorVal := msg.Created 205 - if cursorVal == 0 { 206 - cursorVal = time.Now().UnixNano() 207 - } 208 - c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 211 + c.advanceCursor(j.source, ev.Created) 209 212 } 210 213 } 211 214 } 212 215 216 + func (c *Consumer) advanceCursor(s Source, newCursor int64) { 217 + if newCursor == 0 { 218 + return 219 + } 220 + c.sourcesMu.Lock() 221 + st, ok := c.sources[s] 222 + c.sourcesMu.Unlock() 223 + if !ok { 224 + return 225 + } 226 + 227 + st.cursorMu.Lock() 228 + defer st.cursorMu.Unlock() 229 + if newCursor <= st.cursorMax { 230 + return 231 + } 232 + st.cursorMax = newCursor 233 + c.cfg.CursorStore.Set(s.Key(), newCursor) 234 + } 235 + 213 236 func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 214 - defer c.wg.Done() 237 + defer c.sourceWg.Done() 215 238 216 239 // attempt connection initially 217 240 err := c.runConnection(ctx, source) ··· 240 263 func (c *Consumer) runConnection(ctx context.Context, source Source) error { 241 264 cursor := c.cfg.CursorStore.Get(source.Key()) 242 265 243 - u, err := source.Url(cursor, c.cfg.Dev) 266 + u, err := c.cfg.URLFunc(source, cursor) 244 267 if err != nil { 245 268 return err 246 269 } ··· 248 271 c.logger.Info("connecting", "url", u.String()) 249 272 250 273 retryOpts := []retry.Option{ 251 - retry.Attempts(0), // infinite attempts 274 + retry.Attempts(c.cfg.MaxRetryAttempts), 252 275 retry.DelayType(retry.BackOffDelay), 253 276 retry.Delay(c.cfg.RetryInterval), 254 277 retry.MaxDelay(c.cfg.MaxRetryInterval), ··· 269 292 err = retry.Do(func() error { 270 293 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 271 294 defer cancel() 272 - conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) 295 + conn, _, err = c.dialer.DialContext(connCtx, u.String(), c.cfg.RequestHeader) 273 296 return err 274 297 }, retryOpts...) 275 298 if err != nil { 299 + if c.cfg.OnConnectExceeded != nil { 300 + c.cfg.OnConnectExceeded(source, err) 301 + } 276 302 return err 277 303 } 278 304
+282
eventconsumer/consumer_test.go
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "log/slog" 9 + "net/http" 10 + "net/http/httptest" 11 + "strings" 12 + "sync" 13 + "testing" 14 + "time" 15 + 16 + "tangled.org/core/eventconsumer/cursor" 17 + "tangled.org/core/eventstream" 18 + "tangled.org/core/notifier" 19 + ) 20 + 21 + type memSrc struct { 22 + mu sync.Mutex 23 + events []eventstream.Event 24 + } 25 + 26 + func (s *memSrc) add(ev eventstream.Event) { 27 + s.mu.Lock() 28 + defer s.mu.Unlock() 29 + s.events = append(s.events, ev) 30 + } 31 + 32 + func (s *memSrc) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 33 + s.mu.Lock() 34 + defer s.mu.Unlock() 35 + out := []eventstream.Event{} 36 + for _, ev := range s.events { 37 + if ev.Created > cursor { 38 + out = append(out, ev) 39 + if len(out) == limit { 40 + break 41 + } 42 + } 43 + } 44 + return out, nil 45 + } 46 + 47 + func mkEv(i int) eventstream.Event { 48 + return eventstream.Event{ 49 + Rkey: fmt.Sprintf("rk-%04d", i), 50 + Nsid: "sh.tangled.test", 51 + EventJson: json.RawMessage(fmt.Sprintf(`{"i":%d}`, i)), 52 + Created: int64(i + 1), 53 + } 54 + } 55 + 56 + func startEventServer(t *testing.T, src *memSrc) (Source, *notifier.Notifier) { 57 + t.Helper() 58 + n := notifier.New() 59 + mux := http.NewServeMux() 60 + mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 61 + _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 62 + Backend: src, 63 + Notifier: &n, 64 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 65 + BatchSize: 5, 66 + MaxBatchesPerDrain: 100, 67 + }) 68 + }) 69 + srv := httptest.NewServer(mux) 70 + t.Cleanup(srv.Close) 71 + addr := strings.TrimPrefix(srv.URL, "http://") 72 + return Source{Kind: "test", Host: addr}, &n 73 + } 74 + 75 + func TestConsumer_DrainAdvancesCursor(t *testing.T) { 76 + src := &memSrc{} 77 + for i := range 8 { 78 + src.add(mkEv(i)) 79 + } 80 + 81 + source, _ := startEventServer(t, src) 82 + 83 + store := &cursor.MemoryStore{} 84 + seenMu := sync.Mutex{} 85 + seen := []int64{} 86 + 87 + cfg := ConsumerConfig{ 88 + ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 89 + seenMu.Lock() 90 + seen = append(seen, msg.Created) 91 + seenMu.Unlock() 92 + return nil 93 + }, 94 + WorkerCount: 1, 95 + QueueSize: 16, 96 + ConnectionTimeout: 2 * time.Second, 97 + CursorStore: store, 98 + URLFunc: DefaultURL(true), 99 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 100 + } 101 + c := NewConsumer(cfg) 102 + 103 + ctx, cancel := context.WithCancel(context.Background()) 104 + defer cancel() 105 + 106 + c.Start(ctx) 107 + c.AddSource(ctx, source) 108 + 109 + deadline := time.Now().Add(3 * time.Second) 110 + for time.Now().Before(deadline) { 111 + seenMu.Lock() 112 + n := len(seen) 113 + seenMu.Unlock() 114 + if n >= 8 { 115 + break 116 + } 117 + time.Sleep(20 * time.Millisecond) 118 + } 119 + 120 + seenMu.Lock() 121 + defer seenMu.Unlock() 122 + if len(seen) != 8 { 123 + t.Fatalf("processed %d events, want 8: %v", len(seen), seen) 124 + } 125 + for i, got := range seen { 126 + if got != int64(i+1) { 127 + t.Fatalf("event %d: got created=%d want %d", i, got, i+1) 128 + } 129 + } 130 + 131 + if final := store.Get(source.Key()); final != 8 { 132 + t.Fatalf("cursor = %d, want 8", final) 133 + } 134 + } 135 + 136 + func TestConsumer_CursorMonotonic_OutOfOrderWorkers(t *testing.T) { 137 + src := &memSrc{} 138 + for i := range 4 { 139 + src.add(mkEv(i)) 140 + } 141 + 142 + source, _ := startEventServer(t, src) 143 + 144 + store := &cursor.MemoryStore{} 145 + 146 + releaseFirst := make(chan struct{}) 147 + processed := make(chan int64, 4) 148 + 149 + cfg := ConsumerConfig{ 150 + ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 151 + if msg.Created == 1 { 152 + <-releaseFirst 153 + } 154 + processed <- msg.Created 155 + return nil 156 + }, 157 + WorkerCount: 4, 158 + QueueSize: 16, 159 + ConnectionTimeout: 2 * time.Second, 160 + CursorStore: store, 161 + URLFunc: DefaultURL(true), 162 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 163 + } 164 + c := NewConsumer(cfg) 165 + 166 + ctx, cancel := context.WithCancel(context.Background()) 167 + defer cancel() 168 + 169 + c.Start(ctx) 170 + c.AddSource(ctx, source) 171 + 172 + for range 3 { 173 + select { 174 + case <-processed: 175 + case <-time.After(3 * time.Second): 176 + t.Fatal("timed out waiting for events 2-4 to be processed") 177 + } 178 + } 179 + 180 + if cur := store.Get(source.Key()); cur != 4 { 181 + t.Fatalf("cursor before slow worker finished = %d, want 4", cur) 182 + } 183 + 184 + close(releaseFirst) 185 + select { 186 + case <-processed: 187 + case <-time.After(3 * time.Second): 188 + t.Fatal("timed out waiting for slow worker") 189 + } 190 + 191 + if cur := store.Get(source.Key()); cur != 4 { 192 + t.Fatalf("cursor regressed after slow worker: %d, want 4", cur) 193 + } 194 + } 195 + 196 + func TestConsumer_StopTerminatesWithoutCtxCancel(t *testing.T) { 197 + src := &memSrc{} 198 + source, _ := startEventServer(t, src) 199 + 200 + cfg := ConsumerConfig{ 201 + ProcessFunc: func(ctx context.Context, _ Source, _ eventstream.Event) error { return nil }, 202 + WorkerCount: 2, 203 + QueueSize: 8, 204 + ConnectionTimeout: 2 * time.Second, 205 + CursorStore: &cursor.MemoryStore{}, 206 + URLFunc: DefaultURL(true), 207 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 208 + } 209 + c := NewConsumer(cfg) 210 + 211 + c.Start(context.Background()) 212 + c.AddSource(context.Background(), source) 213 + 214 + done := make(chan struct{}) 215 + go func() { 216 + c.Stop() 217 + close(done) 218 + }() 219 + 220 + select { 221 + case <-done: 222 + case <-time.After(5 * time.Second): 223 + t.Fatal("Stop did not return within 5s") 224 + } 225 + } 226 + 227 + func TestConsumer_ResumesFromStoredCursor(t *testing.T) { 228 + src := &memSrc{} 229 + for i := range 5 { 230 + src.add(mkEv(i)) 231 + } 232 + 233 + source, _ := startEventServer(t, src) 234 + 235 + store := &cursor.MemoryStore{} 236 + store.Set(source.Key(), 3) 237 + 238 + seenMu := sync.Mutex{} 239 + seen := []int64{} 240 + 241 + cfg := ConsumerConfig{ 242 + ProcessFunc: func(ctx context.Context, _ Source, msg eventstream.Event) error { 243 + seenMu.Lock() 244 + seen = append(seen, msg.Created) 245 + seenMu.Unlock() 246 + return nil 247 + }, 248 + WorkerCount: 1, 249 + QueueSize: 16, 250 + ConnectionTimeout: 2 * time.Second, 251 + CursorStore: store, 252 + URLFunc: DefaultURL(true), 253 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 254 + } 255 + c := NewConsumer(cfg) 256 + 257 + ctx, cancel := context.WithCancel(context.Background()) 258 + defer cancel() 259 + 260 + c.Start(ctx) 261 + c.AddSource(ctx, source) 262 + 263 + deadline := time.Now().Add(3 * time.Second) 264 + for time.Now().Before(deadline) { 265 + seenMu.Lock() 266 + n := len(seen) 267 + seenMu.Unlock() 268 + if n >= 2 { 269 + break 270 + } 271 + time.Sleep(20 * time.Millisecond) 272 + } 273 + 274 + seenMu.Lock() 275 + defer seenMu.Unlock() 276 + if len(seen) < 2 { 277 + t.Fatalf("processed %d events, want 2: %v", len(seen), seen) 278 + } 279 + if seen[0] != 4 || seen[1] != 5 { 280 + t.Fatalf("resumed events = %v, want [4 5]", seen) 281 + } 282 + }
+4 -5
eventconsumer/cursor/memory.go
··· 8 8 store sync.Map 9 9 } 10 10 11 - func (m *MemoryStore) Set(knot string, cursor int64) { 12 - m.store.Store(knot, cursor) 11 + func (m *MemoryStore) Set(key string, cursor int64) { 12 + m.store.Store(key, cursor) 13 13 } 14 14 15 - func (m *MemoryStore) Get(knot string) (cursor int64) { 16 - if result, ok := m.store.Load(knot); ok { 15 + func (m *MemoryStore) Get(key string) (cursor int64) { 16 + if result, ok := m.store.Load(key); ok { 17 17 if val, ok := result.(int64); ok { 18 18 return val 19 19 } 20 20 } 21 - 22 21 return 0 23 22 }
+8 -10
eventconsumer/cursor/redis.go
··· 22 22 } 23 23 } 24 24 25 - func (r *RedisStore) Set(knot string, cursor int64) { 26 - key := fmt.Sprintf(cursorKey, knot) 27 - r.rdb.Set(context.Background(), key, cursor, 0) 25 + func (r *RedisStore) Set(key string, cursor int64) { 26 + k := fmt.Sprintf(cursorKey, key) 27 + r.rdb.Set(context.Background(), k, cursor, 0) 28 28 } 29 29 30 - func (r *RedisStore) Get(knot string) (cursor int64) { 31 - key := fmt.Sprintf(cursorKey, knot) 32 - val, err := r.rdb.Get(context.Background(), key).Result() 30 + func (r *RedisStore) Get(key string) (cursor int64) { 31 + k := fmt.Sprintf(cursorKey, key) 32 + val, err := r.rdb.Get(context.Background(), k).Result() 33 33 if err != nil { 34 34 return 0 35 35 } 36 - cursor, err = strconv.ParseInt(val, 10, 64) 36 + parsed, err := strconv.ParseInt(val, 10, 64) 37 37 if err != nil { 38 - // TODO: log here 39 38 return 0 40 39 } 41 - 42 - return cursor 40 + return parsed 43 41 }
+10 -10
eventconsumer/cursor/sqlite.go
··· 2 2 3 3 import ( 4 4 "database/sql" 5 + "errors" 5 6 "fmt" 7 + "log/slog" 6 8 7 9 _ "github.com/mattn/go-sqlite3" 8 10 ) ··· 46 48 createTable := fmt.Sprintf(` 47 49 create table if not exists %s ( 48 50 knot text primary key, 49 - cursor text 51 + cursor integer 50 52 );`, s.tableName) 51 53 _, err := s.db.Exec(createTable) 52 54 return err 53 55 } 54 56 55 - func (s *SqliteStore) Set(knot string, cursor int64) { 57 + func (s *SqliteStore) Set(key string, cursor int64) { 56 58 query := fmt.Sprintf(` 57 59 insert into %s (knot, cursor) 58 60 values (?, ?) 59 61 on conflict(knot) do update set cursor=excluded.cursor; 60 62 `, s.tableName) 61 63 62 - _, err := s.db.Exec(query, knot, cursor) 63 - 64 - if err != nil { 65 - // TODO: log here 64 + if _, err := s.db.Exec(query, key, cursor); err != nil { 65 + slog.Default().Error("cursor sqlite set failed", "key", key, "cursor", cursor, "err", err) 66 66 } 67 67 } 68 68 69 - func (s *SqliteStore) Get(knot string) (cursor int64) { 69 + func (s *SqliteStore) Get(key string) (cursor int64) { 70 70 query := fmt.Sprintf(` 71 71 select cursor from %s where knot = ?; 72 72 `, s.tableName) 73 - err := s.db.QueryRow(query, knot).Scan(&cursor) 73 + err := s.db.QueryRow(query, key).Scan(&cursor) 74 74 75 75 if err != nil { 76 - if err != sql.ErrNoRows { 77 - // TODO: log here 76 + if !errors.Is(err, sql.ErrNoRows) { 77 + slog.Default().Error("cursor sqlite get failed", "key", key, "err", err) 78 78 } 79 79 return 0 80 80 }
+2 -2
eventconsumer/cursor/store.go
··· 1 1 package cursor 2 2 3 3 type Store interface { 4 - Set(knot string, cursor int64) 5 - Get(knot string) (cursor int64) 4 + Set(key string, cursor int64) 5 + Get(key string) (cursor int64) 6 6 }
-39
eventconsumer/knot.go
··· 1 - package eventconsumer 2 - 3 - import ( 4 - "fmt" 5 - "net/url" 6 - ) 7 - 8 - type KnotSource struct { 9 - Knot string 10 - } 11 - 12 - func (k KnotSource) Key() string { 13 - return k.Knot 14 - } 15 - 16 - func (k KnotSource) Url(cursor int64, dev bool) (*url.URL, error) { 17 - scheme := "wss" 18 - if dev { 19 - scheme = "ws" 20 - } 21 - 22 - u, err := url.Parse(scheme + "://" + k.Knot + "/events") 23 - if err != nil { 24 - return nil, err 25 - } 26 - 27 - if cursor != 0 { 28 - query := url.Values{} 29 - query.Add("cursor", fmt.Sprintf("%d", cursor)) 30 - u.RawQuery = query.Encode() 31 - } 32 - return u, nil 33 - } 34 - 35 - func NewKnotSource(knot string) KnotSource { 36 - return KnotSource{ 37 - Knot: knot, 38 - } 39 - }
+42
eventconsumer/source.go
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "net/url" 5 + "strconv" 6 + ) 7 + 8 + type Kind string 9 + 10 + const ( 11 + KindKnot Kind = "knot" 12 + KindSpindle Kind = "spindle" 13 + ) 14 + 15 + type Source struct { 16 + Kind Kind 17 + Host string 18 + } 19 + 20 + func NewKnotSource(host string) Source { return Source{Kind: KindKnot, Host: host} } 21 + func NewSpindleSource(host string) Source { return Source{Kind: KindSpindle, Host: host} } 22 + 23 + func (s Source) Key() string { return string(s.Kind) + ":" + s.Host } 24 + 25 + func DefaultURL(dev bool) func(Source, int64) (*url.URL, error) { 26 + scheme := "wss" 27 + if dev { 28 + scheme = "ws" 29 + } 30 + return func(s Source, cursor int64) (*url.URL, error) { 31 + u, err := url.Parse(scheme + "://" + s.Host + "/events") 32 + if err != nil { 33 + return nil, err 34 + } 35 + if cursor != 0 { 36 + q := url.Values{} 37 + q.Add("cursor", strconv.FormatInt(cursor, 10)) 38 + u.RawQuery = q.Encode() 39 + } 40 + return u, nil 41 + } 42 + }
-39
eventconsumer/spindle.go
··· 1 - package eventconsumer 2 - 3 - import ( 4 - "fmt" 5 - "net/url" 6 - ) 7 - 8 - type SpindleSource struct { 9 - Spindle string 10 - } 11 - 12 - func (s SpindleSource) Key() string { 13 - return s.Spindle 14 - } 15 - 16 - func (s SpindleSource) Url(cursor int64, dev bool) (*url.URL, error) { 17 - scheme := "wss" 18 - if dev { 19 - scheme = "ws" 20 - } 21 - 22 - u, err := url.Parse(scheme + "://" + s.Spindle + "/events") 23 - if err != nil { 24 - return nil, err 25 - } 26 - 27 - if cursor != 0 { 28 - query := url.Values{} 29 - query.Add("cursor", fmt.Sprintf("%d", cursor)) 30 - u.RawQuery = query.Encode() 31 - } 32 - return u, nil 33 - } 34 - 35 - func NewSpindleSource(spindle string) SpindleSource { 36 - return SpindleSource{ 37 - Spindle: spindle, 38 - } 39 - }