Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

wip: remove pipelineNotifier

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit 5dc27981 parent b935f893 change-id rswtrruw
+23 -101
+1 -12
appview/pipelines/logs.go
··· 4 4 "fmt" 5 5 "html/template" 6 6 "net/url" 7 - "path" 8 7 "regexp" 9 8 "strings" 10 9 ··· 94 93 query := url.Values{} 95 94 query.Set("pipeline", pipeline.String()) 96 95 query.Set("workflow", workflow) 97 - return fmt.Sprintf("%s/xrpc/%s?%s", u, tangled.CiWorkflowSubscribeLogsNSID, query.Encode()) 98 - } 99 - 100 - // TODO(boltless): deprecate this 101 - func SpindleURL(spindle, knot, rkey, workflow string) string { 102 - url, err := hostutil.EnsureWsScheme(spindle) 103 - if err != nil { 104 - return "" 105 - } 106 - 107 - return url + path.Join("/logs", knot, rkey, workflow) 96 + return fmt.Sprintf("%s/xrpc/%s?%s", u, tangled.CiPipelineSubscribeLogsNSID, query.Encode()) 108 97 }
-52
appview/pipelines/notifier.go
··· 1 - package pipelines 2 - 3 - import ( 4 - "sync" 5 - 6 - "github.com/bluesky-social/indigo/atproto/syntax" 7 - "tangled.org/core/notifier" 8 - ) 9 - 10 - // StatusNotifier is a keyed broadcast notifier for pipeline status changes, keyed by the pipeline's AT URI. 11 - // 12 - // subscribers are notified whenever a status update arrives for that pipeline 13 - type StatusNotifier struct { 14 - mu sync.Mutex 15 - keys map[syntax.ATURI]*notifier.Notifier 16 - } 17 - 18 - func NewStatusNotifier() *StatusNotifier { 19 - return &StatusNotifier{ 20 - keys: make(map[syntax.ATURI]*notifier.Notifier), 21 - } 22 - } 23 - 24 - func (n *StatusNotifier) Publish(uri syntax.ATURI) { 25 - n.mu.Lock() 26 - p, ok := n.keys[uri] 27 - n.mu.Unlock() 28 - if ok { 29 - p.NotifyAll() 30 - } 31 - } 32 - 33 - func (n *StatusNotifier) Subscribe(uri syntax.ATURI) chan struct{} { 34 - n.mu.Lock() 35 - p, ok := n.keys[uri] 36 - if !ok { 37 - nb := notifier.New() 38 - p = &nb 39 - n.keys[uri] = p 40 - } 41 - n.mu.Unlock() 42 - return p.Subscribe() 43 - } 44 - 45 - func (n *StatusNotifier) Unsubscribe(uri syntax.ATURI, ch chan struct{}) { 46 - n.mu.Lock() 47 - p, ok := n.keys[uri] 48 - n.mu.Unlock() 49 - if ok { 50 - p.Unsubscribe(ch) 51 - } 52 - }
+16 -19
appview/pipelines/pipelines.go
··· 28 28 ) 29 29 30 30 type Pipelines struct { 31 - repoResolver *reporesolver.RepoResolver 32 - idResolver *idresolver.Resolver 33 - config *config.Config 34 - oauth *oauth.OAuth 35 - pages *pages.Pages 36 - pipelineNotifier *StatusNotifier 37 - db *db.DB 38 - enforcer *rbac.Enforcer 39 - logger *slog.Logger 31 + repoResolver *reporesolver.RepoResolver 32 + idResolver *idresolver.Resolver 33 + config *config.Config 34 + oauth *oauth.OAuth 35 + pages *pages.Pages 36 + db *db.DB 37 + enforcer *rbac.Enforcer 38 + logger *slog.Logger 40 39 } 41 40 42 41 func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { ··· 55 54 oauth *oauth.OAuth, 56 55 repoResolver *reporesolver.RepoResolver, 57 56 pages *pages.Pages, 58 - pipelineNotifier *StatusNotifier, 59 57 idResolver *idresolver.Resolver, 60 58 db *db.DB, 61 59 config *config.Config, ··· 63 61 logger *slog.Logger, 64 62 ) *Pipelines { 65 63 return &Pipelines{ 66 - oauth: oauth, 67 - repoResolver: repoResolver, 68 - pages: pages, 69 - idResolver: idResolver, 70 - config: config, 71 - pipelineNotifier: pipelineNotifier, 72 - db: db, 73 - enforcer: enforcer, 74 - logger: logger, 64 + oauth: oauth, 65 + repoResolver: repoResolver, 66 + pages: pages, 67 + idResolver: idResolver, 68 + config: config, 69 + db: db, 70 + enforcer: enforcer, 71 + logger: logger, 75 72 } 76 73 } 77 74
-2
appview/pipelines/ssh/logstream.go
··· 6 6 tea "github.com/charmbracelet/bubbletea" 7 7 "github.com/gorilla/websocket" 8 8 "tangled.org/core/appview/pipelines" 9 - spindlemodel "tangled.org/core/spindle/models" 10 9 ) 11 10 12 11 type step struct { 13 12 id int 14 13 name string 15 14 command string 16 - kind spindlemodel.StepKind 17 15 lines []string 18 16 startTime time.Time 19 17 endTime time.Time
+5 -7
appview/pipelines/ssh/server.go
··· 10 10 tea "github.com/charmbracelet/wish/bubbletea" 11 11 "tangled.org/core/appview/config" 12 12 "tangled.org/core/appview/db" 13 - "tangled.org/core/appview/pipelines" 14 13 ) 15 14 16 15 type Server struct { 17 - db *db.DB 18 - config *config.Config 19 - pipelineNotifier *pipelines.StatusNotifier 20 - logger *slog.Logger 16 + db *db.DB 17 + config *config.Config 18 + logger *slog.Logger 21 19 } 22 20 23 - func New(db *db.DB, cfg *config.Config, pn *pipelines.StatusNotifier, logger *slog.Logger) *Server { 24 - return &Server{db: db, config: cfg, pipelineNotifier: pn, logger: logger} 21 + func New(db *db.DB, cfg *config.Config, logger *slog.Logger) *Server { 22 + return &Server{db: db, config: cfg, logger: logger} 25 23 } 26 24 27 25 func (s *Server) ListenAndServe(ctx context.Context) error {
-1
appview/state/router.go
··· 406 406 s.oauth, 407 407 s.repoResolver, 408 408 s.pages, 409 - s.pipelineNotifier, 410 409 s.idResolver, 411 410 s.db, 412 411 s.config,
+1 -8
appview/state/state.go
··· 30 30 whnotify "tangled.org/core/appview/notify/webhook" 31 31 "tangled.org/core/appview/oauth" 32 32 "tangled.org/core/appview/pages" 33 - "tangled.org/core/appview/pipelines" 34 33 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 34 "tangled.org/core/appview/reporesolver" 36 35 "tangled.org/core/appview/repoverify" ··· 71 70 repoResolver *reporesolver.RepoResolver 72 71 aclService *knotacl.Service 73 72 knotstream *eventconsumer.Consumer 74 - pipelineNotifier *pipelines.StatusNotifier 75 73 logger *slog.Logger 76 74 validator *validator.Validator 77 75 cfClient *cloudflare.Client ··· 220 218 } 221 219 knotstream.Start(ctx) 222 220 223 - // TODO: pipeline notifier serves as pipeline store with cache to prevent spamming spindles 224 - // maybe, not super sure... 225 - pipelineNotifier := pipelines.NewStatusNotifier() 226 - 227 221 state := &State{ 228 222 db: d, 229 223 notifier: notifier, ··· 240 234 repoResolver: repoResolver, 241 235 aclService: aclService, 242 236 knotstream: knotstream, 243 - pipelineNotifier: pipelineNotifier, 244 237 logger: logger, 245 238 validator: validator, 246 239 cfClient: cfClient, ··· 258 251 } 259 252 260 253 func (s *State) NewSSHServer() *pipelinessh.Server { 261 - return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 254 + return pipelinessh.New(s.db, s.config, log.SubLogger(s.logger, "pipelinessh")) 262 255 } 263 256 264 257 func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {