Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "testing" 8 "time" 9) 10 11// resources for testing 12type ru struct{ a, b int64 } 13 14func (r ru) Fits(limit ru) bool { 15 if limit.a > 0 && r.a > limit.a { 16 return false 17 } 18 if limit.b > 0 && r.b > limit.b { 19 return false 20 } 21 return true 22} 23func (r ru) Add(o ru) ru { return ru{r.a + o.a, r.b + o.b} } 24func (r ru) Sub(o ru) ru { return ru{max(0, r.a-o.a), max(0, r.b-o.b)} } 25func (r ru) String() string { 26 return fmt.Sprintf("a=%d b=%d", r.a, r.b) 27} 28 29type acquireResult struct { 30 slot WorkflowSlot 31 err error 32} 33 34func TestResourceSchedulerZeroLimitsDoNotApply(t *testing.T) { 35 t.Parallel() 36 37 scheduler := NewResourceScheduler(ru{}, ru{}, 0) 38 39 slot, err := scheduler.Acquire(context.Background(), ru{a: 1 << 20, b: 1 << 20}) 40 if err != nil { 41 t.Fatalf("Acquire() error = %v", err) 42 } 43 slot.Release() 44} 45 46func TestResourceSchedulerRejectsRequestsThatCanNeverFit(t *testing.T) { 47 t.Parallel() 48 49 scheduler := NewResourceScheduler(ru{a: 1024, b: 10_000}, ru{a: 512, b: 5_000}, 0) 50 51 _, err := scheduler.Acquire(context.Background(), ru{a: 768, b: 100}) 52 if !errors.Is(err, ErrNoWorkflowSlots) { 53 t.Fatalf("Acquire() error = %v, want ErrNoWorkflowSlots", err) 54 } 55 56 _, err = scheduler.Acquire(context.Background(), ru{a: 128, b: 12_000}) 57 if !errors.Is(err, ErrNoWorkflowSlots) { 58 t.Fatalf("Acquire() error = %v, want ErrNoWorkflowSlots", err) 59 } 60} 61 62func TestResourceSchedulerWaitsUntilResourcesAreReleased(t *testing.T) { 63 t.Parallel() 64 65 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, 0) 66 67 first, err := scheduler.Acquire(context.Background(), ru{a: 1024}) 68 if err != nil { 69 t.Fatalf("first Acquire() error = %v", err) 70 } 71 defer first.Release() 72 73 ch := acquireAsync(context.Background(), scheduler, ru{a: 1}) 74 assertAcquireBlocked(t, ch) 75 76 first.Release() 77 first = NoopSlot{} 78 79 second := waitAcquireOK(t, ch) 80 second.Release() 81} 82 83func TestResourceSchedulerReleaseIsIdempotent(t *testing.T) { 84 t.Parallel() 85 86 scheduler := NewResourceScheduler(ru{a: 1}, ru{}, 0) 87 88 slot, err := scheduler.Acquire(context.Background(), ru{a: 1}) 89 if err != nil { 90 t.Fatalf("Acquire() error = %v", err) 91 } 92 93 slot.Release() 94 slot.Release() 95 96 second, err := scheduler.Acquire(context.Background(), ru{a: 1}) 97 if err != nil { 98 t.Fatalf("Acquire() after double release error = %v", err) 99 } 100 second.Release() 101} 102 103func TestResourceSchedulerBackfillsPastBlockedHead(t *testing.T) { 104 t.Parallel() 105 106 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, time.Hour) // disable aging so we test pure backfill 107 108 hold, err := scheduler.Acquire(context.Background(), ru{a: 512}) 109 if err != nil { 110 t.Fatalf("hold Acquire() error = %v", err) 111 } 112 defer hold.Release() 113 114 bigCh := acquireAsync(context.Background(), scheduler, ru{a: 768}) 115 assertAcquireBlocked(t, bigCh) 116 117 smallCh := acquireAsync(context.Background(), scheduler, ru{a: 256}) 118 small := waitAcquireOK(t, smallCh) 119 small.Release() 120 121 assertAcquireBlocked(t, bigCh) 122} 123 124func TestResourceSchedulerAgingReservesCapacityForBlockedHead(t *testing.T) { 125 t.Parallel() 126 127 scheduler := NewResourceScheduler(ru{a: 1024}, ru{}, 10*time.Millisecond) 128 fakeNow := time.Now() 129 scheduler.now = func() time.Time { return fakeNow } 130 131 hold, err := scheduler.Acquire(context.Background(), ru{a: 512}) 132 if err != nil { 133 t.Fatalf("hold Acquire() error = %v", err) 134 } 135 136 bigCh := acquireAsync(context.Background(), scheduler, ru{a: 768}) 137 assertAcquireBlocked(t, bigCh) 138 139 fakeNow = fakeNow.Add(time.Second) 140 141 // big is now aged and reserves its 768. a 256 request would fit 142 // alongside the held 512, but the reservation blocks it. 143 smallCh := acquireAsync(context.Background(), scheduler, ru{a: 256}) 144 assertAcquireBlocked(t, smallCh) 145 146 hold.Release() 147 148 big := waitAcquireOK(t, bigCh) 149 small := waitAcquireOK(t, smallCh) 150 small.Release() 151 big.Release() 152} 153 154func acquireAsync(ctx context.Context, scheduler *ResourceScheduler[ru], req ru) <-chan acquireResult { 155 ch := make(chan acquireResult, 1) 156 go func() { 157 slot, err := scheduler.Acquire(ctx, req) 158 ch <- acquireResult{slot: slot, err: err} 159 }() 160 return ch 161} 162 163func assertAcquireBlocked(t *testing.T, ch <-chan acquireResult) { 164 t.Helper() 165 166 select { 167 case res := <-ch: 168 if res.slot != nil { 169 res.slot.Release() 170 } 171 t.Fatalf("Acquire() returned before resources were available: err=%v", res.err) 172 case <-time.After(25 * time.Millisecond): 173 } 174} 175 176func waitAcquireOK(t *testing.T, ch <-chan acquireResult) WorkflowSlot { 177 t.Helper() 178 179 res := waitAcquireResult(t, ch) 180 if res.err != nil { 181 t.Fatalf("Acquire() error = %v", res.err) 182 } 183 if res.slot == nil { 184 t.Fatal("Acquire() returned nil slot") 185 } 186 return res.slot 187} 188 189func waitAcquireResult(t *testing.T, ch <-chan acquireResult) acquireResult { 190 t.Helper() 191 192 select { 193 case res := <-ch: 194 return res 195 case <-time.After(time.Second): 196 t.Fatal("timed out waiting for Acquire() result") 197 } 198 199 return acquireResult{} 200}