Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package queue 2 3import ( 4 "sync" 5 "testing" 6 "time" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9) 10 11const ( 12 alice = syntax.DID("did:plc:alice") 13 eve = syntax.DID("did:plc:eve") 14 dawn = syntax.DID("did:plc:dawn") 15) 16 17func TestQueueDrainsAllJobs(t *testing.T) { 18 t.Parallel() 19 20 q := NewQueue(10, 2) 21 q.Start() 22 23 var mu sync.Mutex 24 var ran []string 25 done := make(chan struct{}, 5) 26 27 for _, name := range []string{"a", "b", "c", "d", "e"} { 28 q.Enqueue(dawn, Job{Run: func() error { 29 mu.Lock() 30 ran = append(ran, name) 31 mu.Unlock() 32 done <- struct{}{} 33 return nil 34 }}) 35 } 36 37 for range 5 { 38 select { 39 case <-done: 40 case <-time.After(time.Second): 41 t.Fatal("timed out waiting for jobs to finish") 42 } 43 } 44 45 q.Stop() 46 47 if len(ran) != 5 { 48 t.Fatalf("expected 5 jobs, ran %d", len(ran)) 49 } 50} 51 52func TestQueueRejectsWhenFull(t *testing.T) { 53 t.Parallel() 54 55 // no workers, so the queue never drains 56 q := NewQueue(2, 0) 57 58 if !q.Enqueue(dawn, Job{Run: func() error { return nil }}) { 59 t.Fatal("first Enqueue() returned false, want true") 60 } 61 if !q.Enqueue(dawn, Job{Run: func() error { return nil }}) { 62 t.Fatal("second Enqueue() returned false, want true") 63 } 64 if q.Enqueue(dawn, Job{Run: func() error { return nil }}) { 65 t.Fatal("third Enqueue() returned true on full queue, want false") 66 } 67} 68 69func TestQueuePrefersOwnerWithFewestRunning(t *testing.T) { 70 t.Parallel() 71 72 // 2 workers. alice gets the first slot; while she's holding it, eve's 73 // job should win the second slot over alice's own queued waiters. 74 q := NewQueue(20, 2) 75 76 releaseAlice1 := make(chan struct{}) 77 gotEve := make(chan struct{}, 1) 78 79 q.Enqueue(alice, Job{Run: func() error { 80 <-releaseAlice1 81 return nil 82 }}) 83 // alice queues two more 84 q.Enqueue(alice, Job{Run: func() error { return nil }}) 85 q.Enqueue(alice, Job{Run: func() error { return nil }}) 86 // eve queues one 87 q.Enqueue(eve, Job{Run: func() error { 88 gotEve <- struct{}{} 89 return nil 90 }}) 91 92 q.Start() 93 94 // eve should run while alice's first is still held 95 select { 96 case <-gotEve: 97 case <-time.After(time.Second): 98 t.Fatal("eve's job did not run while alice was blocked") 99 } 100 101 close(releaseAlice1) 102 q.Stop() 103}