Monorepo for Tangled
tangled.org
1package pipelines
2
3import (
4 "sync"
5
6 "github.com/bluesky-social/indigo/atproto/syntax"
7 "tangled.org/core/notifier"
8)
9
10// StatusNotifier is a keyed broadcast notifier for pipeline status changes, keyed by the pipeline's AT URI.
11//
12// subscribers are notified whenever a status update arrives for that pipeline
13type StatusNotifier struct {
14 mu sync.Mutex
15 keys map[syntax.ATURI]*notifier.Notifier
16}
17
18func NewStatusNotifier() *StatusNotifier {
19 return &StatusNotifier{
20 keys: make(map[syntax.ATURI]*notifier.Notifier),
21 }
22}
23
24func (n *StatusNotifier) Publish(uri syntax.ATURI) {
25 n.mu.Lock()
26 p, ok := n.keys[uri]
27 n.mu.Unlock()
28 if ok {
29 p.NotifyAll()
30 }
31}
32
33func (n *StatusNotifier) Subscribe(uri syntax.ATURI) chan struct{} {
34 n.mu.Lock()
35 p, ok := n.keys[uri]
36 if !ok {
37 nb := notifier.New()
38 p = &nb
39 n.keys[uri] = p
40 }
41 n.mu.Unlock()
42 return p.Subscribe()
43}
44
45func (n *StatusNotifier) Unsubscribe(uri syntax.ATURI, ch chan struct{}) {
46 n.mu.Lock()
47 p, ok := n.keys[uri]
48 n.mu.Unlock()
49 if ok {
50 p.Unsubscribe(ch)
51 }
52}