Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

appview/pipelines: introduce status notifier

allows web/ssh handlers to subscribe to status updates for a particular
pipeline by ATURI. we can then live update UI based on data arriving on this
channel.

spindlestream can now notify interested parties about new pipeline
statuses.

Signed-off-by: oppiliappan <me@oppi.li>

author
oppiliappan
committer
Tangled
date (May 26, 2026, 9:08 AM +0300) commit 2c52d785 parent eb6bafbf change-id rkkpuzxq
+67 -6
+52
appview/pipelines/notifier.go
··· 1 + package pipelines 2 + 3 + import ( 4 + "sync" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/notifier" 8 + ) 9 + 10 + // StatusNotifier is a keyed broadcast notifier for pipeline status changes, keyed by the pipeline's AT URI. 11 + // 12 + // subscribers are notified whenever a status update arrives for that pipeline 13 + type StatusNotifier struct { 14 + mu sync.Mutex 15 + keys map[syntax.ATURI]*notifier.Notifier 16 + } 17 + 18 + func NewStatusNotifier() *StatusNotifier { 19 + return &StatusNotifier{ 20 + keys: make(map[syntax.ATURI]*notifier.Notifier), 21 + } 22 + } 23 + 24 + func (n *StatusNotifier) Publish(uri syntax.ATURI) { 25 + n.mu.Lock() 26 + p, ok := n.keys[uri] 27 + n.mu.Unlock() 28 + if ok { 29 + p.NotifyAll() 30 + } 31 + } 32 + 33 + func (n *StatusNotifier) Subscribe(uri syntax.ATURI) chan struct{} { 34 + n.mu.Lock() 35 + p, ok := n.keys[uri] 36 + if !ok { 37 + nb := notifier.New() 38 + p = &nb 39 + n.keys[uri] = p 40 + } 41 + n.mu.Unlock() 42 + return p.Subscribe() 43 + } 44 + 45 + func (n *StatusNotifier) Unsubscribe(uri syntax.ATURI, ch chan struct{}) { 46 + n.mu.Lock() 47 + p, ok := n.keys[uri] 48 + n.mu.Unlock() 49 + if ok { 50 + p.Unsubscribe(ch) 51 + } 52 + }
+8 -5
appview/state/spindlestream.go
··· 14 14 "tangled.org/core/appview/config" 15 15 "tangled.org/core/appview/db" 16 16 "tangled.org/core/appview/models" 17 + "tangled.org/core/appview/pipelines" 17 18 ec "tangled.org/core/eventconsumer" 18 19 "tangled.org/core/eventconsumer/cursor" 19 20 "tangled.org/core/log" ··· 22 23 spindle "tangled.org/core/spindle/models" 23 24 ) 24 25 25 - func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 26 + func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { 26 27 logger := log.FromContext(ctx) 27 28 logger = log.SubLogger(logger, "spindlestream") 28 29 ··· 46 47 47 48 cfg := ec.ConsumerConfig{ 48 49 Sources: srcs, 49 - ProcessFunc: spindleIngester(ctx, logger, d), 50 + ProcessFunc: spindleIngester(ctx, logger, d, pn), 50 51 RetryInterval: c.Spindlestream.RetryInterval, 51 52 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 52 53 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, ··· 60 61 return ec.NewConsumer(cfg), nil 61 62 } 62 63 63 - func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 64 + func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 64 65 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 65 66 switch msg.Nsid { 66 67 case tangled.PipelineStatusNSID: 67 - return ingestPipelineStatus(ctx, logger, d, source, msg) 68 + return ingestPipelineStatus(ctx, logger, d, pn, source, msg) 68 69 } 69 70 70 71 return nil 71 72 } 72 73 } 73 74 74 - func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 75 + func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg ec.Message) error { 75 76 var record tangled.PipelineStatus 76 77 err := json.Unmarshal(msg.EventJson, &record) 77 78 if err != nil { ··· 110 111 if err != nil { 111 112 return fmt.Errorf("failed to add pipeline status: %w", err) 112 113 } 114 + 115 + pn.Publish(pipelineUri) 113 116 114 117 return nil 115 118 }
+7 -1
appview/state/state.go
··· 28 28 whnotify "tangled.org/core/appview/notify/webhook" 29 29 "tangled.org/core/appview/oauth" 30 30 "tangled.org/core/appview/pages" 31 + "tangled.org/core/appview/pipelines" 32 + pipelinessh "tangled.org/core/appview/pipelines/ssh" 31 33 "tangled.org/core/appview/reporesolver" 32 34 "tangled.org/core/appview/repoverify" 33 35 "tangled.org/core/appview/validator" ··· 67 69 repoResolver *reporesolver.RepoResolver 68 70 knotstream *eventconsumer.Consumer 69 71 spindlestream *eventconsumer.Consumer 72 + pipelineNotifier *pipelines.StatusNotifier 70 73 logger *slog.Logger 71 74 validator *validator.Validator 72 75 cfClient *cloudflare.Client ··· 210 213 } 211 214 knotstream.Start(ctx) 212 215 213 - spindlestream, err := Spindlestream(ctx, config, d, enforcer) 216 + pipelineNotifier := pipelines.NewStatusNotifier() 217 + 218 + spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 214 219 if err != nil { 215 220 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 216 221 } ··· 232 237 repoResolver: repoResolver, 233 238 knotstream: knotstream, 234 239 spindlestream: spindlestream, 240 + pipelineNotifier: pipelineNotifier, 235 241 logger: logger, 236 242 validator: validator, 237 243 cfClient: cfClient,