Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package queue 2 3import ( 4 "slices" 5 "sync" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8) 9 10type Job struct { 11 Run func() error 12 OnFail func(error) 13} 14 15type ownedJob struct { 16 owner syntax.DID 17 job Job 18} 19 20// prefers users with fewer running jobs, otherwise it's FIFO 21type Queue struct { 22 mu sync.Mutex 23 cond *sync.Cond 24 queue []ownedJob 25 running map[syntax.DID]int 26 maxSize int 27 workers int 28 stopped bool 29 wg sync.WaitGroup 30} 31 32func NewQueue(queueSize, numWorkers int) *Queue { 33 q := &Queue{ 34 maxSize: queueSize, 35 workers: numWorkers, 36 running: make(map[syntax.DID]int), 37 } 38 q.cond = sync.NewCond(&q.mu) 39 return q 40} 41 42// todo(dawn): add a per-user cap so a single user can't fill the queue 43func (q *Queue) Enqueue(owner syntax.DID, job Job) bool { 44 q.mu.Lock() 45 defer q.mu.Unlock() 46 if q.stopped || len(q.queue) >= q.maxSize { 47 return false 48 } 49 q.queue = append(q.queue, ownedJob{owner: owner, job: job}) 50 q.cond.Signal() 51 return true 52} 53 54func (q *Queue) Start() { 55 for range q.workers { 56 q.wg.Add(1) 57 go q.worker() 58 } 59} 60 61func (q *Queue) worker() { 62 defer q.wg.Done() 63 for { 64 picked, ok := q.takeNext() 65 if !ok { 66 return 67 } 68 69 err := picked.job.Run() 70 if err != nil && picked.job.OnFail != nil { 71 picked.job.OnFail(err) 72 } 73 74 q.finish(picked.owner) 75 } 76} 77 78// get or wait for the next job 79func (q *Queue) takeNext() (ownedJob, bool) { 80 q.mu.Lock() 81 defer q.mu.Unlock() 82 83 for len(q.queue) == 0 && !q.stopped { 84 q.cond.Wait() // waiting for jobs 85 } 86 if q.stopped && len(q.queue) == 0 { 87 return ownedJob{}, false // no jobs are left and the queue is stopped 88 } 89 90 idx := q.pickBest() 91 picked := q.queue[idx] 92 q.queue = slices.Delete(q.queue, idx, idx+1) 93 q.running[picked.owner]++ 94 95 return picked, true 96} 97 98// index of the queued job whose owner has the fewest currently-running jobs, 99// tiebreaking by arrival order. 100func (q *Queue) pickBest() int { 101 best := 0 102 for idx, job := range q.queue { 103 if q.running[job.owner] < q.running[q.queue[best].owner] { 104 best = idx 105 } 106 } 107 return best 108} 109 110// called when finishing a job 111func (q *Queue) finish(owner syntax.DID) { 112 q.mu.Lock() 113 defer q.mu.Unlock() 114 115 q.running[owner]-- 116 if q.running[owner] <= 0 { 117 delete(q.running, owner) 118 } 119} 120 121func (q *Queue) Stop() { 122 q.mu.Lock() 123 q.stopped = true 124 q.cond.Broadcast() 125 q.mu.Unlock() 126 q.wg.Wait() 127}