Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver,spindle: switch streaming to eventstream

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
date (May 29, 2026, 2:50 PM +0300) commit 3bee528b parent 3bf37006 change-id szvponsv
+59 -356
+7 -59
knotserver/db/events.go
··· 3 3 import ( 4 4 "encoding/json" 5 5 "fmt" 6 - "time" 7 6 7 + "tangled.org/core/eventstream" 8 8 "tangled.org/core/notifier" 9 9 "tangled.org/core/tid" 10 10 ) 11 11 12 - type Event struct { 13 - Rkey string `json:"rkey"` 14 - Nsid string `json:"nsid"` 15 - EventJson string `json:"event"` 16 - Created int64 `json:"created"` 17 - } 18 - 19 - func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 20 - 21 - _, err := d.db.Exec( 22 - `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 23 - event.Rkey, 24 - event.Nsid, 25 - event.EventJson, 26 - time.Now().UnixNano(), 27 - ) 28 - 29 - notifier.NotifyAll() 30 - 31 - return err 12 + func (d *DB) InsertEvent(event eventstream.Event, n *notifier.Notifier) error { 13 + return eventstream.Insert(d.db, event, n) 32 14 } 33 15 34 16 func (d *DB) EmitDIDAssign(n *notifier.Notifier, ownerDid, repoName, repoDid string) error { ··· 43 25 return fmt.Errorf("marshal didAssign event: %w", err) 44 26 } 45 27 46 - return d.InsertEvent(Event{ 28 + return d.InsertEvent(eventstream.Event{ 47 29 Rkey: tid.TID(), 48 30 Nsid: RepoDIDAssignNSID, 49 - EventJson: string(eventJson), 31 + EventJson: eventJson, 50 32 }, n) 51 33 } 52 34 53 - func (d *DB) GetEvents(cursor int64) ([]Event, error) { 54 - whereClause := "" 55 - args := []any{} 56 - if cursor > 0 { 57 - whereClause = "where created > ?" 58 - args = append(args, cursor) 59 - } 60 - 61 - query := fmt.Sprintf(` 62 - select rkey, nsid, event, created 63 - from events 64 - %s 65 - order by created asc 66 - limit 100 67 - `, whereClause) 68 - 69 - rows, err := d.db.Query(query, args...) 70 - if err != nil { 71 - return nil, err 72 - } 73 - defer rows.Close() 74 - 75 - var evts []Event 76 - for rows.Next() { 77 - var ev Event 78 - if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 79 - return nil, err 80 - } 81 - evts = append(evts, ev) 82 - } 83 - 84 - if err := rows.Err(); err != nil { 85 - return nil, err 86 - } 87 - 88 - return evts, nil 35 + func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 36 + return eventstream.List(d.db, cursor, limit) 89 37 }
+13 -123
knotserver/events.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 5 + "errors" 6 6 "net/http" 7 - "strconv" 8 7 "time" 9 8 10 9 "github.com/bluesky-social/indigo/xrpc" 11 - "github.com/gorilla/websocket" 12 10 "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventstream" 13 12 "tangled.org/core/log" 14 13 ) 15 14 16 - var upgrader = websocket.Upgrader{ 17 - ReadBufferSize: 1024, 18 - WriteBufferSize: 1024, 19 - } 20 - 21 15 func (h *Knot) Events(w http.ResponseWriter, r *http.Request) { 22 16 l := log.SubLogger(h.l, "eventstream") 23 17 l.Debug("received new connection") 24 18 25 - conn, err := upgrader.Upgrade(w, r, nil) 26 - if err != nil { 27 - l.Error("websocket upgrade failed", "err", err) 28 - w.WriteHeader(http.StatusInternalServerError) 29 - return 19 + err := eventstream.Stream(w, r, eventstream.StreamConfig{ 20 + Backend: h.db, 21 + Notifier: h.n, 22 + Logger: l, 23 + }) 24 + if err != nil && !errors.Is(err, eventstream.ErrDrainCap) { 25 + l.Error("event stream ended with error", "err", err) 30 26 } 31 - defer conn.Close() 32 - l.Debug("upgraded http to wss") 33 27 34 - ch := h.n.Subscribe() 35 - defer h.n.Unsubscribe(ch) 36 - 37 - ctx, cancel := context.WithCancel(r.Context()) 38 - defer cancel() 39 28 go func() { 40 - for { 41 - if _, _, err := conn.NextReader(); err != nil { 42 - l.Error("failed to read", "err", err) 43 - cancel() 44 - return 45 - } 29 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 30 + defer retryCancel() 31 + if err := h.requestCrawl(retryCtx); err != nil { 32 + l.Error("error requesting crawls", "err", err) 46 33 } 47 34 }() 48 - 49 - var cursor int64 50 - cursorStr := r.URL.Query().Get("cursor") 51 - if cursorStr != "" { 52 - cursor, err = strconv.ParseInt(cursorStr, 10, 64) 53 - if err != nil { 54 - l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 55 - cursor = 0 56 - } 57 - } 58 - 59 - l.Debug("going through backfill", "cursor", cursor) 60 - if err := h.drainBackfill(conn, &cursor, 10_000); err != nil { 61 - l.Error("failed to backfill", "err", err) 62 - return 63 - } 64 - 65 - // try request crawl when connection closed 66 - defer func() { 67 - go func() { 68 - retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 69 - defer retryCancel() 70 - if err := h.requestCrawl(retryCtx); err != nil { 71 - l.Error("error requesting crawls", "err", err) 72 - } 73 - }() 74 - }() 75 - 76 - for { 77 - // wait for new data or timeout 78 - select { 79 - case <-ctx.Done(): 80 - l.Debug("stopping stream: client closed connection") 81 - return 82 - case <-ch: 83 - l.Debug("going through live data", "cursor", cursor) 84 - if _, err := h.streamOps(conn, &cursor); err != nil { 85 - l.Error("failed to stream", "err", err) 86 - return 87 - } 88 - case <-time.After(30 * time.Second): 89 - // send a keep-alive 90 - if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 91 - l.Error("failed to write control", "err", err) 92 - } 93 - } 94 - } 95 - } 96 - 97 - func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error { 98 - for range maxBatches { 99 - n, err := h.streamOps(conn, cursor) 100 - if err != nil { 101 - return err 102 - } 103 - if n < 100 { 104 - return nil 105 - } 106 - } 107 - h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor) 108 - return nil 109 - } 110 - 111 - func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) { 112 - events, err := h.db.GetEvents(*cursor) 113 - if err != nil { 114 - h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 115 - return 0, err 116 - } 117 - 118 - for _, event := range events { 119 - var eventJson map[string]any 120 - err := json.Unmarshal([]byte(event.EventJson), &eventJson) 121 - if err != nil { 122 - h.l.Error("failed to unmarshal event", "err", err) 123 - return 0, err 124 - } 125 - 126 - jsonMsg, err := json.Marshal(map[string]any{ 127 - "rkey": event.Rkey, 128 - "nsid": event.Nsid, 129 - "event": eventJson, 130 - "created": event.Created, 131 - }) 132 - if err != nil { 133 - h.l.Error("failed to marshal record", "err", err) 134 - return 0, err 135 - } 136 - 137 - if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 138 - h.l.Debug("err", "err", err) 139 - return 0, err 140 - } 141 - *cursor = event.Created 142 - } 143 - 144 - return len(events), nil 145 35 } 146 36 147 37 func (h *Knot) requestCrawl(ctx context.Context) error {
+3 -2
knotserver/ingester.go
··· 19 19 jmodels "github.com/bluesky-social/jetstream/pkg/models" 20 20 "tangled.org/core/api/tangled" 21 21 "tangled.org/core/appview/models" 22 + "tangled.org/core/eventstream" 22 23 "tangled.org/core/knotserver/db" 23 24 "tangled.org/core/knotserver/git" 24 25 knotxrpc "tangled.org/core/knotserver/xrpc" ··· 502 503 return fmt.Errorf("failed to marshal pipeline event: %w", err) 503 504 } 504 505 505 - ev := db.Event{ 506 + ev := eventstream.Event{ 506 507 Rkey: tid.TID(), 507 508 Nsid: tangled.PipelineNSID, 508 - EventJson: string(eventJson), 509 + EventJson: eventJson, 509 510 } 510 511 511 512 l.Info("inserting pipeline event")
+5 -4
knotserver/internal.go
··· 18 18 "github.com/go-chi/chi/v5/middleware" 19 19 "github.com/go-git/go-git/v5/plumbing" 20 20 "tangled.org/core/api/tangled" 21 + "tangled.org/core/eventstream" 21 22 "tangled.org/core/hook" 22 23 "tangled.org/core/idresolver" 23 24 "tangled.org/core/knotserver/config" ··· 313 314 return err 314 315 } 315 316 316 - event := db.Event{ 317 + event := eventstream.Event{ 317 318 Rkey: tid.TID(), 318 319 Nsid: tangled.GitRefUpdateNSID, 319 - EventJson: string(eventJson), 320 + EventJson: eventJson, 320 321 } 321 322 322 323 return h.db.InsertEvent(event, h.n) ··· 417 418 return nil 418 419 } 419 420 420 - event := db.Event{ 421 + event := eventstream.Event{ 421 422 Rkey: tid.TID(), 422 423 Nsid: tangled.PipelineNSID, 423 - EventJson: string(eventJson), 424 + EventJson: eventJson, 424 425 } 425 426 426 427 if h.c.LogsAddr != "" {
+9 -4
knotserver/xrpc/set_default_branch.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "github.com/bluesky-social/indigo/xrpc" 11 11 "tangled.org/core/api/tangled" 12 - "tangled.org/core/knotserver/db" 12 + "tangled.org/core/eventstream" 13 13 "tangled.org/core/knotserver/git" 14 14 "tangled.org/core/rbac" 15 15 "tangled.org/core/tid" ··· 101 101 } 102 102 eventJson, err := json.Marshal(refUpdate) 103 103 if err != nil { 104 + fail(xrpcerr.GenericError(err)) 104 105 return 105 106 } 106 107 107 - x.Db.InsertEvent(db.Event{ 108 + if err := x.Db.InsertEvent(eventstream.Event{ 108 109 Rkey: tid.TID(), 109 110 Nsid: tangled.GitRefUpdateNSID, 110 - EventJson: string(eventJson), 111 - }, x.Notifier) 111 + EventJson: eventJson, 112 + }, x.Notifier); err != nil { 113 + l.Error("failed to insert event", "error", err) 114 + writeError(w, xrpcerr.GenericError(err), http.StatusInternalServerError) 115 + return 116 + } 112 117 113 118 w.WriteHeader(http.StatusOK) 114 119 }
+7 -60
spindle/db/events.go
··· 2 2 3 3 import ( 4 4 "encoding/json" 5 - "fmt" 6 5 "time" 7 6 8 7 "tangled.org/core/api/tangled" 8 + "tangled.org/core/eventstream" 9 9 "tangled.org/core/notifier" 10 10 "tangled.org/core/spindle/models" 11 11 "tangled.org/core/tid" 12 12 ) 13 13 14 - type Event struct { 15 - Rkey string `json:"rkey"` 16 - Nsid string `json:"nsid"` 17 - Created int64 `json:"created"` 18 - EventJson string `json:"event"` 14 + func (d *DB) insertEvent(event eventstream.Event, n *notifier.Notifier) error { 15 + return eventstream.Insert(d, event, n) 19 16 } 20 17 21 - func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error { 22 - _, err := d.Exec( 23 - `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 - event.Rkey, 25 - event.Nsid, 26 - event.EventJson, 27 - time.Now().UnixNano(), 28 - ) 29 - 30 - notifier.NotifyAll() 31 - 32 - return err 33 - } 34 - 35 - func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 - whereClause := "" 37 - args := []any{} 38 - if cursor > 0 { 39 - whereClause = "where created > ?" 40 - args = append(args, cursor) 41 - } 42 - 43 - query := fmt.Sprintf(` 44 - select rkey, nsid, event, created 45 - from events 46 - %s 47 - order by created asc 48 - limit 100 49 - `, whereClause) 50 - 51 - rows, err := d.Query(query, args...) 52 - if err != nil { 53 - return nil, err 54 - } 55 - defer rows.Close() 56 - 57 - var evts []Event 58 - for rows.Next() { 59 - var ev Event 60 - if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 - return nil, err 62 - } 63 - evts = append(evts, ev) 64 - } 65 - 66 - if err := rows.Err(); err != nil { 67 - return nil, err 68 - } 69 - 70 - return evts, nil 18 + func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 19 + return eventstream.List(d, cursor, limit) 71 20 } 72 21 73 22 func (d *DB) createStatusEvent( ··· 93 42 return err 94 43 } 95 44 96 - event := Event{ 45 + event := eventstream.Event{ 97 46 Rkey: tid.TID(), 98 47 Nsid: tangled.PipelineStatusNSID, 99 - Created: now.UnixNano(), 100 - EventJson: string(eventJson), 48 + EventJson: eventJson, 101 49 } 102 50 103 51 return d.insertEvent(event, n) 104 - 105 52 } 106 53 107 54 func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
+6 -5
spindle/server.go
··· 15 15 "tangled.org/core/api/tangled" 16 16 "tangled.org/core/eventconsumer" 17 17 "tangled.org/core/eventconsumer/cursor" 18 + "tangled.org/core/eventstream" 18 19 "tangled.org/core/idresolver" 19 20 "tangled.org/core/jetstream" 20 21 "tangled.org/core/log" ··· 183 184 // job in the above registered queue. 184 185 ccfg := eventconsumer.NewConsumerConfig() 185 186 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 186 - ccfg.Dev = cfg.Server.Dev 187 + ccfg.URLFunc = eventconsumer.DefaultURL(cfg.Server.Dev) 187 188 ccfg.ProcessFunc = spindle.processPipeline 188 189 ccfg.CursorStore = cursorStore 189 190 knownKnots, err := d.Knots() ··· 374 375 return x.Router() 375 376 } 376 377 377 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 378 + func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 378 379 if msg.Nsid == tangled.PipelineNSID { 379 380 tpl := tangled.Pipeline{} 380 381 err := json.Unmarshal(msg.EventJson, &tpl) ··· 391 392 return fmt.Errorf("no repo data found") 392 393 } 393 394 394 - if src.Key() != tpl.TriggerMetadata.Repo.Knot { 395 - return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 395 + if src.Host != tpl.TriggerMetadata.Repo.Knot { 396 + return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 396 397 } 397 398 398 399 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) ··· 401 402 } 402 403 403 404 pipelineId := models.PipelineId{ 404 - Knot: src.Key(), 405 + Knot: src.Host, 405 406 Rkey: msg.Rkey, 406 407 } 407 408
+9 -99
spindle/stream.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "errors" 6 7 "fmt" 7 8 "io" 8 9 "net/http" 9 10 "os" 10 - "strconv" 11 11 "time" 12 12 13 + "tangled.org/core/eventstream" 13 14 "tangled.org/core/log" 14 15 "tangled.org/core/spindle/models" 15 16 ··· 25 26 26 27 func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 27 28 l := log.SubLogger(s.l, "eventstream") 28 - 29 29 l.Debug("received new connection") 30 30 31 - conn, err := upgrader.Upgrade(w, r, nil) 32 - if err != nil { 33 - l.Error("websocket upgrade failed", "err", err) 34 - w.WriteHeader(http.StatusInternalServerError) 35 - return 36 - } 37 - defer conn.Close() 38 - l.Debug("upgraded http to wss") 39 - 40 - ch := s.n.Subscribe() 41 - defer s.n.Unsubscribe(ch) 42 - 43 - ctx, cancel := context.WithCancel(r.Context()) 44 - defer cancel() 45 - go func() { 46 - for { 47 - if _, _, err := conn.NextReader(); err != nil { 48 - l.Error("failed to read", "err", err) 49 - cancel() 50 - return 51 - } 52 - } 53 - }() 54 - 55 - var cursor int64 56 - cursorStr := r.URL.Query().Get("cursor") 57 - if cursorStr != "" { 58 - cursor, err = strconv.ParseInt(cursorStr, 10, 64) 59 - if err != nil { 60 - l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 61 - cursor = 0 62 - } 63 - } 64 - 65 - // complete backfill first before going to live data 66 - l.Debug("going through backfill", "cursor", cursor) 67 - if err := s.streamPipelines(conn, &cursor); err != nil { 68 - l.Error("failed to backfill", "err", err) 69 - return 70 - } 71 - 72 - for { 73 - // wait for new data or timeout 74 - select { 75 - case <-ctx.Done(): 76 - l.Debug("stopping stream: client closed connection") 77 - return 78 - case <-ch: 79 - // we have been notified of new data 80 - l.Debug("going through live data", "cursor", cursor) 81 - if err := s.streamPipelines(conn, &cursor); err != nil { 82 - l.Error("failed to stream", "err", err) 83 - return 84 - } 85 - case <-time.After(30 * time.Second): 86 - // send a keep-alive 87 - if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 88 - l.Error("failed to write control", "err", err) 89 - } 90 - } 31 + err := eventstream.Stream(w, r, eventstream.StreamConfig{ 32 + Backend: s.db, 33 + Notifier: s.n, 34 + Logger: l, 35 + }) 36 + if err != nil && !errors.Is(err, eventstream.ErrDrainCap) { 37 + l.Error("event stream ended with error", "err", err) 91 38 } 92 39 } 93 40 ··· 220 167 } 221 168 } 222 169 } 223 - } 224 - 225 - func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 226 - events, err := s.db.GetEvents(*cursor) 227 - if err != nil { 228 - s.l.Debug("err", "err", err) 229 - return err 230 - } 231 - 232 - for _, event := range events { 233 - // first extract the inner json into a map 234 - var eventJson map[string]any 235 - err := json.Unmarshal([]byte(event.EventJson), &eventJson) 236 - if err != nil { 237 - s.l.Error("failed to unmarshal event", "err", err) 238 - return err 239 - } 240 - 241 - jsonMsg, err := json.Marshal(map[string]any{ 242 - "rkey": event.Rkey, 243 - "nsid": event.Nsid, 244 - "event": eventJson, 245 - "created": event.Created, 246 - }) 247 - if err != nil { 248 - s.l.Error("failed to marshal record", "err", err) 249 - return err 250 - } 251 - 252 - if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 253 - s.l.Debug("err", "err", err) 254 - return err 255 - } 256 - *cursor = event.Created 257 - } 258 - 259 - return nil 260 170 } 261 171 262 172 func getWorkflowID(r *http.Request) (models.WorkflowId, error) {