Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6 "testing"
7 "time"
8
9 "tangled.org/core/spindle/models"
10)
11
12func TestSemaphoreSlotterDisabledDoesNotBlock(t *testing.T) {
13 t.Parallel()
14
15 slotter := NewSemaphoreSlotter(0)
16
17 for range 10 {
18 slot, err := slotter.AcquireWorkflowSlot(context.Background(), zeroWorkflowID(), nil)
19 if err != nil {
20 t.Fatalf("AcquireWorkflowSlot() error = %v", err)
21 }
22 slot.Release()
23 }
24}
25
26func TestSemaphoreSlotterBlocksUntilRelease(t *testing.T) {
27 t.Parallel()
28
29 slotter := NewSemaphoreSlotter(1)
30
31 first, err := slotter.AcquireWorkflowSlot(context.Background(), zeroWorkflowID(), nil)
32 if err != nil {
33 t.Fatalf("first AcquireWorkflowSlot() error = %v", err)
34 }
35 releasedFirst := false
36 defer func() {
37 if !releasedFirst {
38 first.Release()
39 }
40 }()
41
42 acquired := make(chan WorkflowSlot, 1)
43 errs := make(chan error, 1)
44 go func() {
45 slot, err := slotter.AcquireWorkflowSlot(context.Background(), zeroWorkflowID(), nil)
46 if err != nil {
47 errs <- err
48 return
49 }
50 acquired <- slot
51 }()
52
53 assertNotAcquired(t, acquired, errs)
54
55 first.Release()
56 releasedFirst = true
57
58 second := waitForSlot(t, acquired, errs)
59 second.Release()
60}
61
62func TestSemaphoreSlotterHonorsContextCancellation(t *testing.T) {
63 t.Parallel()
64
65 slotter := NewSemaphoreSlotter(1)
66
67 first, err := slotter.AcquireWorkflowSlot(context.Background(), zeroWorkflowID(), nil)
68 if err != nil {
69 t.Fatalf("first AcquireWorkflowSlot() error = %v", err)
70 }
71 defer first.Release()
72
73 ctx, cancel := context.WithCancel(context.Background())
74 cancel()
75
76 _, err = slotter.AcquireWorkflowSlot(ctx, zeroWorkflowID(), nil)
77 if !errors.Is(err, context.Canceled) {
78 t.Fatalf("AcquireWorkflowSlot() error = %v, want context.Canceled", err)
79 }
80}
81
82func assertNotAcquired(t *testing.T, acquired <-chan WorkflowSlot, errs <-chan error) {
83 t.Helper()
84
85 select {
86 case slot := <-acquired:
87 slot.Release()
88 t.Fatal("AcquireWorkflowSlot() acquired a slot before one was released")
89 case err := <-errs:
90 t.Fatalf("AcquireWorkflowSlot() returned unexpected error: %v", err)
91 case <-time.After(25 * time.Millisecond):
92 }
93}
94
95func waitForSlot(t *testing.T, acquired <-chan WorkflowSlot, errs <-chan error) WorkflowSlot {
96 t.Helper()
97
98 select {
99 case slot := <-acquired:
100 return slot
101 case err := <-errs:
102 t.Fatalf("AcquireWorkflowSlot() returned error: %v", err)
103 case <-time.After(time.Second):
104 t.Fatal("timed out waiting for slot acquisition")
105 }
106
107 return nil
108}
109
110func zeroWorkflowID() models.WorkflowId {
111 return models.WorkflowId{}
112}