Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "fmt"
6 "slices"
7 "sync"
8 "time"
9)
10
11const defaultAgingThreshold = 30 * time.Second
12
13type Resources[Self any] interface {
14 Fits(Self) bool
15 Add(Self) Self
16 Sub(Self) Self
17}
18
19type ResourceScheduler[R Resources[R]] struct {
20 mu sync.Mutex
21 budget R
22 max R
23 used R
24 queue []*resourceWaiter[R]
25 now func() time.Time // get time now, is a field for mocking
26 agingThreshold time.Duration
27}
28
29type resourceWaiter[R Resources[R]] struct {
30 req R
31 ready chan struct{}
32 enqueuedAt time.Time
33}
34
35type resourceLease[R Resources[R]] struct {
36 scheduler *ResourceScheduler[R]
37 req R
38 once sync.Once
39}
40
41func NewResourceScheduler[R Resources[R]](budget, max R, agingThreshold time.Duration) *ResourceScheduler[R] {
42 if agingThreshold <= 0 {
43 agingThreshold = defaultAgingThreshold
44 }
45 return &ResourceScheduler[R]{
46 budget: budget,
47 max: max,
48 now: time.Now,
49 agingThreshold: agingThreshold,
50 }
51}
52
53func (s *ResourceScheduler[R]) Acquire(ctx context.Context, req R) (WorkflowSlot, error) {
54 if s == nil {
55 return NoopSlot{}, nil
56 }
57
58 s.mu.Lock()
59 if !req.Fits(s.budget) || !req.Fits(s.max) {
60 s.mu.Unlock()
61 return nil, fmt.Errorf("%w: request=%v budget=%v max=%v", ErrNoWorkflowSlots, req, s.budget, s.max)
62 }
63 if len(s.queue) == 0 && s.used.Add(req).Fits(s.budget) {
64 s.used = s.used.Add(req)
65 s.mu.Unlock()
66 return &resourceLease[R]{scheduler: s, req: req}, nil
67 }
68
69 waiter := &resourceWaiter[R]{req: req, ready: make(chan struct{}), enqueuedAt: s.now()}
70 s.queue = append(s.queue, waiter)
71 s.schedule()
72 s.mu.Unlock()
73
74 select {
75 case <-waiter.ready:
76 return &resourceLease[R]{scheduler: s, req: req}, nil
77 case <-ctx.Done():
78 s.mu.Lock()
79 select {
80 case <-waiter.ready:
81 // undo committed resources, schedule already did that
82 s.used = s.used.Sub(req)
83 default:
84 // still in queue, just remove
85 s.remove(waiter)
86 }
87 s.schedule()
88 s.mu.Unlock()
89 return nil, ctx.Err()
90 }
91}
92
93func (l *resourceLease[R]) Release() {
94 if l == nil || l.scheduler == nil {
95 return
96 }
97 l.once.Do(func() {
98 l.scheduler.release(l.req)
99 })
100}
101
102func (s *ResourceScheduler[R]) release(req R) {
103 s.mu.Lock()
104 defer s.mu.Unlock()
105 s.used = s.used.Sub(req)
106 s.schedule()
107}
108
109// start every waiter whose request fits. once a waiter is older than
110// agingThreshold, count its request as already used so younger waiters
111// stop being scheduled ahead of it.
112func (s *ResourceScheduler[R]) schedule() {
113 var reserved R
114 now := s.now()
115 i := 0
116 for i < len(s.queue) {
117 w := s.queue[i]
118 if s.used.Add(reserved).Add(w.req).Fits(s.budget) {
119 s.queue = slices.Delete(s.queue, i, i+1)
120 s.used = s.used.Add(w.req)
121 close(w.ready)
122 continue
123 }
124 if now.Sub(w.enqueuedAt) >= s.agingThreshold {
125 reserved = reserved.Add(w.req)
126 }
127 i++
128 }
129}
130
131func (s *ResourceScheduler[R]) remove(waiter *resourceWaiter[R]) {
132 for i, candidate := range s.queue {
133 if candidate != waiter {
134 continue
135 }
136 s.queue = slices.Delete(s.queue, i, i+1)
137 return
138 }
139}