Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "testing"
8 "time"
9)
10
11// resources for testing
12type ru struct{ a, b int64 }
13
14func (r ru) Fits(limit ru) bool {
15 if limit.a > 0 && r.a > limit.a {
16 return false
17 }
18 if limit.b > 0 && r.b > limit.b {
19 return false
20 }
21 return true
22}
23func (r ru) Add(o ru) ru { return ru{r.a + o.a, r.b + o.b} }
24func (r ru) Sub(o ru) ru { return ru{max(0, r.a-o.a), max(0, r.b-o.b)} }
25func (r ru) String() string {
26 return fmt.Sprintf("a=%d b=%d", r.a, r.b)
27}
28
29type acquireResult struct {
30 slot WorkflowSlot
31 err error
32}
33
34func TestResourceSchedulerZeroLimitsDoNotApply(t *testing.T) {
35 t.Parallel()
36
37 scheduler := NewResourceScheduler(ru{}, ru{}, 0)
38
39 slot, err := scheduler.Acquire(context.Background(), ru{a: 1 << 20, b: 1 << 20})
40 if err != nil {
41 t.Fatalf("Acquire() error = %v", err)
42 }
43 slot.Release()
44}
45
46func TestResourceSchedulerRejectsRequestsThatCanNeverFit(t *testing.T) {
47 t.Parallel()
48
49 scheduler := NewResourceScheduler(ru{a: 1024, b: 10_000}, ru{a: 512, b: 5_000}, 0)
50
51 _, err := scheduler.Acquire(context.Background(), ru{a: 768, b: 100})
52 if !errors.Is(err, ErrNoWorkflowSlots) {
53 t.Fatalf("Acquire() error = %v, want ErrNoWorkflowSlots", err)
54 }
55
56 _, err = scheduler.Acquire(context.Background(), ru{a: 128, b: 12_000})
57 if !errors.Is(err, ErrNoWorkflowSlots) {
58 t.Fatalf("Acquire() error = %v, want ErrNoWorkflowSlots", err)
59 }
60}
61
62func TestResourceSchedulerWaitsUntilResourcesAreReleased(t *testing.T) {
63 t.Parallel()
64
65 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, 0)
66
67 first, err := scheduler.Acquire(context.Background(), ru{a: 1024})
68 if err != nil {
69 t.Fatalf("first Acquire() error = %v", err)
70 }
71 defer first.Release()
72
73 ch := acquireAsync(context.Background(), scheduler, ru{a: 1})
74 assertAcquireBlocked(t, ch)
75
76 first.Release()
77 first = NoopSlot{}
78
79 second := waitAcquireOK(t, ch)
80 second.Release()
81}
82
83func TestResourceSchedulerReleaseIsIdempotent(t *testing.T) {
84 t.Parallel()
85
86 scheduler := NewResourceScheduler(ru{a: 1}, ru{}, 0)
87
88 slot, err := scheduler.Acquire(context.Background(), ru{a: 1})
89 if err != nil {
90 t.Fatalf("Acquire() error = %v", err)
91 }
92
93 slot.Release()
94 slot.Release()
95
96 second, err := scheduler.Acquire(context.Background(), ru{a: 1})
97 if err != nil {
98 t.Fatalf("Acquire() after double release error = %v", err)
99 }
100 second.Release()
101}
102
103func TestResourceSchedulerBackfillsPastBlockedHead(t *testing.T) {
104 t.Parallel()
105
106 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, time.Hour) // disable aging so we test pure backfill
107
108 hold, err := scheduler.Acquire(context.Background(), ru{a: 512})
109 if err != nil {
110 t.Fatalf("hold Acquire() error = %v", err)
111 }
112 defer hold.Release()
113
114 bigCh := acquireAsync(context.Background(), scheduler, ru{a: 768})
115 assertAcquireBlocked(t, bigCh)
116
117 smallCh := acquireAsync(context.Background(), scheduler, ru{a: 256})
118 small := waitAcquireOK(t, smallCh)
119 small.Release()
120
121 assertAcquireBlocked(t, bigCh)
122}
123
124func TestResourceSchedulerAgingReservesCapacityForBlockedHead(t *testing.T) {
125 t.Parallel()
126
127 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, 10*time.Millisecond)
128 fakeNow := time.Now()
129 scheduler.now = func() time.Time { return fakeNow }
130
131 hold, err := scheduler.Acquire(context.Background(), ru{a: 512})
132 if err != nil {
133 t.Fatalf("hold Acquire() error = %v", err)
134 }
135
136 bigCh := acquireAsync(context.Background(), scheduler, ru{a: 768})
137 assertAcquireBlocked(t, bigCh)
138
139 fakeNow = fakeNow.Add(time.Second)
140
141 // big is now aged and reserves its 768. a 256 request would fit
142 // alongside the held 512, but the reservation blocks it.
143 smallCh := acquireAsync(context.Background(), scheduler, ru{a: 256})
144 assertAcquireBlocked(t, smallCh)
145
146 hold.Release()
147
148 big := waitAcquireOK(t, bigCh)
149 small := waitAcquireOK(t, smallCh)
150 small.Release()
151 big.Release()
152}
153
154func acquireAsync(ctx context.Context, scheduler *ResourceScheduler[ru], req ru) <-chan acquireResult {
155 ch := make(chan acquireResult, 1)
156 go func() {
157 slot, err := scheduler.Acquire(ctx, req)
158 ch <- acquireResult{slot: slot, err: err}
159 }()
160 return ch
161}
162
163func assertAcquireBlocked(t *testing.T, ch <-chan acquireResult) {
164 t.Helper()
165
166 select {
167 case res := <-ch:
168 if res.slot != nil {
169 res.slot.Release()
170 }
171 t.Fatalf("Acquire() returned before resources were available: err=%v", res.err)
172 case <-time.After(25 * time.Millisecond):
173 }
174}
175
176func waitAcquireOK(t *testing.T, ch <-chan acquireResult) WorkflowSlot {
177 t.Helper()
178
179 res := waitAcquireResult(t, ch)
180 if res.err != nil {
181 t.Fatalf("Acquire() error = %v", res.err)
182 }
183 if res.slot == nil {
184 t.Fatal("Acquire() returned nil slot")
185 }
186 return res.slot
187}
188
189func waitAcquireResult(t *testing.T, ch <-chan acquireResult) acquireResult {
190 t.Helper()
191
192 select {
193 case res := <-ch:
194 return res
195 case <-time.After(time.Second):
196 t.Fatal("timed out waiting for Acquire() result")
197 }
198
199 return acquireResult{}
200}