Monorepo for Tangled
tangled.org
1package queue
2
3import (
4 "slices"
5 "sync"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8)
9
10type Job struct {
11 Run func() error
12 OnFail func(error)
13}
14
15type ownedJob struct {
16 owner syntax.DID
17 job Job
18}
19
20// prefers users with fewer running jobs, otherwise it's FIFO
21type Queue struct {
22 mu sync.Mutex
23 cond *sync.Cond
24 queue []ownedJob
25 running map[syntax.DID]int
26 maxSize int
27 workers int
28 stopped bool
29 wg sync.WaitGroup
30}
31
32func NewQueue(queueSize, numWorkers int) *Queue {
33 q := &Queue{
34 maxSize: queueSize,
35 workers: numWorkers,
36 running: make(map[syntax.DID]int),
37 }
38 q.cond = sync.NewCond(&q.mu)
39 return q
40}
41
42// todo(dawn): add a per-user cap so a single user can't fill the queue
43func (q *Queue) Enqueue(owner syntax.DID, job Job) bool {
44 q.mu.Lock()
45 defer q.mu.Unlock()
46 if q.stopped || len(q.queue) >= q.maxSize {
47 return false
48 }
49 q.queue = append(q.queue, ownedJob{owner: owner, job: job})
50 q.cond.Signal()
51 return true
52}
53
54func (q *Queue) Start() {
55 for range q.workers {
56 q.wg.Add(1)
57 go q.worker()
58 }
59}
60
61func (q *Queue) worker() {
62 defer q.wg.Done()
63 for {
64 picked, ok := q.takeNext()
65 if !ok {
66 return
67 }
68
69 err := picked.job.Run()
70 if err != nil && picked.job.OnFail != nil {
71 picked.job.OnFail(err)
72 }
73
74 q.finish(picked.owner)
75 }
76}
77
78// get or wait for the next job
79func (q *Queue) takeNext() (ownedJob, bool) {
80 q.mu.Lock()
81 defer q.mu.Unlock()
82
83 for len(q.queue) == 0 && !q.stopped {
84 q.cond.Wait() // waiting for jobs
85 }
86 if q.stopped && len(q.queue) == 0 {
87 return ownedJob{}, false // no jobs are left and the queue is stopped
88 }
89
90 idx := q.pickBest()
91 picked := q.queue[idx]
92 q.queue = slices.Delete(q.queue, idx, idx+1)
93 q.running[picked.owner]++
94
95 return picked, true
96}
97
98// index of the queued job whose owner has the fewest currently-running jobs,
99// tiebreaking by arrival order.
100func (q *Queue) pickBest() int {
101 best := 0
102 for idx, job := range q.queue {
103 if q.running[job.owner] < q.running[q.queue[best].owner] {
104 best = idx
105 }
106 }
107 return best
108}
109
110// called when finishing a job
111func (q *Queue) finish(owner syntax.DID) {
112 q.mu.Lock()
113 defer q.mu.Unlock()
114
115 q.running[owner]--
116 if q.running[owner] <= 0 {
117 delete(q.running, owner)
118 }
119}
120
121func (q *Queue) Stop() {
122 q.mu.Lock()
123 q.stopped = true
124 q.cond.Broadcast()
125 q.mu.Unlock()
126 q.wg.Wait()
127}