Monorepo for Tangled
tangled.org
1package engine
2
3import (
4 "context"
5 "errors"
6
7 "tangled.org/core/spindle/models"
8)
9
10var ErrNoWorkflowSlots = errors.New("no workflow slots available")
11
12type WorkflowSlot interface {
13 Release()
14}
15
16type WorkflowSlotter interface {
17 AcquireWorkflowSlot(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) (WorkflowSlot, error)
18}
19
20type releaseFunc func()
21
22func (f releaseFunc) Release() {
23 if f != nil {
24 f()
25 }
26}
27
28type NoopSlot struct{}
29
30func (NoopSlot) Release() {}
31
32// limit by concurrent workflow count
33type SemaphoreSlotter struct {
34 slots chan struct{}
35}
36
37func NewSemaphoreSlotter(maxConcurrent int) *SemaphoreSlotter {
38 if maxConcurrent <= 0 {
39 return &SemaphoreSlotter{}
40 }
41 return &SemaphoreSlotter{slots: make(chan struct{}, maxConcurrent)}
42}
43
44func (a *SemaphoreSlotter) AcquireWorkflowSlot(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) (WorkflowSlot, error) {
45 if a == nil || a.slots == nil {
46 return NoopSlot{}, nil
47 }
48 select {
49 case a.slots <- struct{}{}:
50 return releaseFunc(func() { <-a.slots }), nil
51 case <-ctx.Done():
52 return nil, ctx.Err()
53 }
54}