Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package engine 2 3import ( 4 "context" 5 "fmt" 6 "slices" 7 "sync" 8 "time" 9) 10 11const defaultAgingThreshold = 30 * time.Second 12 13type Resources[Self any] interface { 14 Fits(Self) bool 15 Add(Self) Self 16 Sub(Self) Self 17} 18 19type ResourceScheduler[R Resources[R]] struct { 20 mu sync.Mutex 21 budget R 22 max R 23 used R 24 queue []*resourceWaiter[R] 25 now func() time.Time // get time now, is a field for mocking 26 agingThreshold time.Duration 27} 28 29type resourceWaiter[R Resources[R]] struct { 30 req R 31 ready chan struct{} 32 enqueuedAt time.Time 33} 34 35type resourceLease[R Resources[R]] struct { 36 scheduler *ResourceScheduler[R] 37 req R 38 once sync.Once 39} 40 41func NewResourceScheduler[R Resources[R]](budget, max R, agingThreshold time.Duration) *ResourceScheduler[R] { 42 if agingThreshold <= 0 { 43 agingThreshold = defaultAgingThreshold 44 } 45 return &ResourceScheduler[R]{ 46 budget: budget, 47 max: max, 48 now: time.Now, 49 agingThreshold: agingThreshold, 50 } 51} 52 53func (s *ResourceScheduler[R]) Acquire(ctx context.Context, req R) (WorkflowSlot, error) { 54 if s == nil { 55 return NoopSlot{}, nil 56 } 57 58 s.mu.Lock() 59 if !req.Fits(s.budget) || !req.Fits(s.max) { 60 s.mu.Unlock() 61 return nil, fmt.Errorf("%w: request=%v budget=%v max=%v", ErrNoWorkflowSlots, req, s.budget, s.max) 62 } 63 if len(s.queue) == 0 && s.used.Add(req).Fits(s.budget) { 64 s.used = s.used.Add(req) 65 s.mu.Unlock() 66 return &resourceLease[R]{scheduler: s, req: req}, nil 67 } 68 69 waiter := &resourceWaiter[R]{req: req, ready: make(chan struct{}), enqueuedAt: s.now()} 70 s.queue = append(s.queue, waiter) 71 s.schedule() 72 s.mu.Unlock() 73 74 select { 75 case <-waiter.ready: 76 return &resourceLease[R]{scheduler: s, req: req}, nil 77 case <-ctx.Done(): 78 s.mu.Lock() 79 select { 80 case <-waiter.ready: 81 // undo committed resources, schedule already did that 82 s.used = s.used.Sub(req) 83 default: 84 // still in queue, just remove 85 s.remove(waiter) 86 } 87 s.schedule() 88 s.mu.Unlock() 89 return nil, ctx.Err() 90 } 91} 92 93func (l *resourceLease[R]) Release() { 94 if l == nil || l.scheduler == nil { 95 return 96 } 97 l.once.Do(func() { 98 l.scheduler.release(l.req) 99 }) 100} 101 102func (s *ResourceScheduler[R]) release(req R) { 103 s.mu.Lock() 104 defer s.mu.Unlock() 105 s.used = s.used.Sub(req) 106 s.schedule() 107} 108 109// start every waiter whose request fits. once a waiter is older than 110// agingThreshold, count its request as already used so younger waiters 111// stop being scheduled ahead of it. 112func (s *ResourceScheduler[R]) schedule() { 113 var reserved R 114 now := s.now() 115 i := 0 116 for i < len(s.queue) { 117 w := s.queue[i] 118 if s.used.Add(reserved).Add(w.req).Fits(s.budget) { 119 s.queue = slices.Delete(s.queue, i, i+1) 120 s.used = s.used.Add(w.req) 121 close(w.ready) 122 continue 123 } 124 if now.Sub(w.enqueuedAt) >= s.agingThreshold { 125 reserved = reserved.Add(w.req) 126 } 127 i++ 128 } 129} 130 131func (s *ResourceScheduler[R]) remove(waiter *resourceWaiter[R]) { 132 for i, candidate := range s.queue { 133 if candidate != waiter { 134 continue 135 } 136 s.queue = slices.Delete(s.queue, i, i+1) 137 return 138 } 139}