Monorepo for Tangled
tangled.org
1package queue
2
3import (
4 "sync"
5 "testing"
6 "time"
7
8 "github.com/bluesky-social/indigo/atproto/syntax"
9)
10
11const (
12 alice = syntax.DID("did:plc:alice")
13 eve = syntax.DID("did:plc:eve")
14 dawn = syntax.DID("did:plc:dawn")
15)
16
17func TestQueueDrainsAllJobs(t *testing.T) {
18 t.Parallel()
19
20 q := NewQueue(10, 2)
21 q.Start()
22
23 var mu sync.Mutex
24 var ran []string
25 done := make(chan struct{}, 5)
26
27 for _, name := range []string{"a", "b", "c", "d", "e"} {
28 q.Enqueue(dawn, Job{Run: func() error {
29 mu.Lock()
30 ran = append(ran, name)
31 mu.Unlock()
32 done <- struct{}{}
33 return nil
34 }})
35 }
36
37 for range 5 {
38 select {
39 case <-done:
40 case <-time.After(time.Second):
41 t.Fatal("timed out waiting for jobs to finish")
42 }
43 }
44
45 q.Stop()
46
47 if len(ran) != 5 {
48 t.Fatalf("expected 5 jobs, ran %d", len(ran))
49 }
50}
51
52func TestQueueRejectsWhenFull(t *testing.T) {
53 t.Parallel()
54
55 // no workers, so the queue never drains
56 q := NewQueue(2, 0)
57
58 if !q.Enqueue(dawn, Job{Run: func() error { return nil }}) {
59 t.Fatal("first Enqueue() returned false, want true")
60 }
61 if !q.Enqueue(dawn, Job{Run: func() error { return nil }}) {
62 t.Fatal("second Enqueue() returned false, want true")
63 }
64 if q.Enqueue(dawn, Job{Run: func() error { return nil }}) {
65 t.Fatal("third Enqueue() returned true on full queue, want false")
66 }
67}
68
69func TestQueuePrefersOwnerWithFewestRunning(t *testing.T) {
70 t.Parallel()
71
72 // 2 workers. alice gets the first slot; while she's holding it, eve's
73 // job should win the second slot over alice's own queued waiters.
74 q := NewQueue(20, 2)
75
76 releaseAlice1 := make(chan struct{})
77 gotEve := make(chan struct{}, 1)
78
79 q.Enqueue(alice, Job{Run: func() error {
80 <-releaseAlice1
81 return nil
82 }})
83 // alice queues two more
84 q.Enqueue(alice, Job{Run: func() error { return nil }})
85 q.Enqueue(alice, Job{Run: func() error { return nil }})
86 // eve queues one
87 q.Enqueue(eve, Job{Run: func() error {
88 gotEve <- struct{}{}
89 return nil
90 }})
91
92 q.Start()
93
94 // eve should run while alice's first is still held
95 select {
96 case <-gotEve:
97 case <-time.After(time.Second):
98 t.Fatal("eve's job did not run while alice was blocked")
99 }
100
101 close(releaseAlice1)
102 q.Stop()
103}