Skip to content

Commit 4b44211

Browse files
committed
Add force acquire implementation
1 parent ffeb2ba commit 4b44211

File tree

5 files changed

+156
-74
lines changed

5 files changed

+156
-74
lines changed

README.md

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The implementation does not interfere with Go's runtime semaphore. It is opt-in
1515
- **Priority-based scheduling**: Tasks are executed based on their priority. High priority tasks are started before low priority tasks once the maximum concurrency limit is reached.
1616
- **Configurable maximum concurrency limit**: The number of concurrent tasks is configurable, defaulting to [`GOMAXPROCS`](https://pkg.go.dev/runtime#GOMAXPROCS).
1717
- **Context cancellation**: Waiting tasks can optionally be cancelled using a context.
18+
- **Force acquire**: Tasks can bypass the maximum concurrency limit using force acquire. These tasks will execute immediately but still count towards the concurrency limit for regular tasks. This ensures critical tasks are never blocked while maintaining backpressure on non-critical tasks.
1819

1920
## Installation
2021

@@ -64,68 +65,64 @@ Then, for each task to be prioritised:
6465

6566
Note the importance of calling the `Release` method to signal the completion of the task. This is necessary to allow other tasks to be executed by the semaphore.
6667

67-
If the context needs to be taken into account in order to support cancellation, the `AcquireContext` method can be used instead.
68+
If the context needs to be taken into account in order to support cancellation, the `AcquireContext` method can be used instead. If a highly critical task needs to be executed, the `ForceAcquire` method can be used to bypass the maximum concurrency limit.
6869

69-
## Example use case: Prioritizing `/alive` endpoint
70+
## Example use case: Prioritizing critical endpoints
7071

71-
In this example, we will create a semaphore that prioritises an `/alive` endpoint over other endpoints. This is useful in scenarios where the `/alive` endpoint is critical and needs to be executed before other endpoints.
72+
This example demonstrates the key features of the semaphore:
7273

73-
It also demonstrates use of the `AcquireContext` method to support context cancellation. This is useful in scenarios where the client cancels the request, and the server should dispose of the task.
74+
- Priority-based task execution
75+
- Force acquire for critical tasks
76+
- Context cancellation support
7477

7578
```go
7679
package main
7780

7881
import (
7982
"context"
80-
"errors"
83+
"log"
8184
"net/http"
8285
"time"
8386

8487
"github.com/aertje/semaphore/semaphore"
8588
)
8689

8790
func main() {
88-
// Create a new semaphore with a maximum concurrency limit of 10.
89-
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(10))
90-
91-
http.HandleFunc("/alive", func(w http.ResponseWriter, r *http.Request) {
92-
// Register a task with the semaphore with a higher priority of 1.
93-
err := s.AcquireContext(r.Context(), 1)
94-
if err != nil {
95-
if errors.Is(err, context.Canceled) {
96-
http.Error(w, context.Cause(r.Context()).Error(), 499)
97-
return
98-
}
99-
100-
http.Error(w, err.Error(), http.StatusInternalServerError)
91+
// Create a semaphore with max 2 concurrent tasks
92+
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(2))
93+
94+
// Critical health check - uses force acquire to bypass limits
95+
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
96+
s.ForceAcquire()
97+
defer s.Release()
98+
w.Write([]byte("OK"))
99+
})
100+
101+
// High priority endpoint - uses priority 1
102+
http.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
103+
if err := s.AcquireContext(r.Context(), 1); err != nil {
104+
http.Error(w, "Request cancelled", 499)
101105
return
102106
}
103107
defer s.Release()
104108

105-
w.Write([]byte("I'm alive!"))
109+
time.Sleep(100 * time.Millisecond) // Simulate work
110+
w.Write([]byte("User data"))
106111
})
107112

113+
// Low priority endpoint - uses priority 2
108114
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
109-
// Register a task with the semaphore with a lower priority of 2.
110-
err := s.AcquireContext(r.Context(), 2)
111-
if err != nil {
112-
if errors.Is(err, context.Canceled) {
113-
http.Error(w, context.Cause(r.Context()).Error(), 499)
114-
return
115-
}
116-
117-
http.Error(w, err.Error(), http.StatusInternalServerError)
115+
if err := s.AcquireContext(r.Context(), 2); err != nil {
116+
http.Error(w, "Request cancelled", 499)
118117
return
119118
}
120-
121119
defer s.Release()
122120

123-
time.Sleep(1 * time.Second)
124-
125-
w.Write([]byte("Metrics are here!"))
121+
time.Sleep(500 * time.Millisecond) // Simulate heavy work
122+
w.Write([]byte("Metrics data"))
126123
})
127124

128-
http.ListenAndServe(":8080", nil)
125+
log.Fatal(http.ListenAndServe(":8080", nil))
129126
}
130127
```
131128

queue/queue.go

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,60 @@
11
package queue
22

3-
type Item[T any] struct {
3+
import "container/heap"
4+
5+
type item[T any] struct {
46
value T
57
priority int
68
index int
79
}
810

9-
func NewItem[T any](priority int, value T) *Item[T] {
10-
return &Item[T]{value: value, priority: priority}
11-
}
12-
13-
func (item *Item[T]) Value() T {
14-
return item.value
15-
}
16-
17-
type Q[T any] []*Item[T]
11+
type Q[T any] []*item[T]
1812

19-
func (pq Q[T]) Len() int {
20-
return len(pq)
13+
// Len implements heap.Interface.
14+
func (q Q[T]) Len() int {
15+
return len(q)
2116
}
2217

23-
func (pq Q[T]) Less(i, j int) bool {
24-
return pq[i].priority < pq[j].priority
18+
// Less implements heap.Interface.
19+
func (q Q[T]) Less(i, j int) bool {
20+
return q[i].priority < q[j].priority
2521
}
2622

27-
func (pq Q[T]) Swap(i, j int) {
28-
pq[i], pq[j] = pq[j], pq[i]
29-
pq[i].index = i
30-
pq[j].index = j
23+
// Swap implements heap.Interface, do not use this method directly.
24+
func (q Q[T]) Swap(i, j int) {
25+
q[i], q[j] = q[j], q[i]
26+
q[i].index = i
27+
q[j].index = j
3128
}
3229

33-
func (pq *Q[T]) Push(x any) {
34-
n := len(*pq)
35-
item := x.(*Item[T])
30+
// Push implements heap.Interface, do not use this method directly.
31+
func (q *Q[T]) Push(x any) {
32+
n := len(*q)
33+
item := x.(*item[T])
3634
item.index = n
37-
*pq = append(*pq, item)
35+
*q = append(*q, item)
3836
}
3937

40-
func (pq *Q[T]) Pop() any {
41-
old := *pq
38+
// Pop implements heap.Interface, do not use this method directly.
39+
func (q *Q[T]) Pop() any {
40+
old := *q
4241
n := len(old)
4342
item := old[n-1]
4443
old[n-1] = nil // don't stop the GC from reclaiming the item eventually
4544
item.index = -1 // for safety
46-
*pq = old[0 : n-1]
45+
*q = old[0 : n-1]
4746
return item
4847
}
48+
49+
func (q *Q[T]) PushItem(priority int, value T) {
50+
item := &item[T]{value: value, priority: priority}
51+
heap.Push(q, item)
52+
}
53+
54+
func (q *Q[T]) PopItem() T {
55+
if q.Len() == 0 {
56+
var zero T
57+
return zero
58+
}
59+
return heap.Pop(q).(*item[T]).value
60+
}

queue/queue_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ func TestOrder(t *testing.T) {
1212
q := new(queue.Q[string])
1313

1414
heap.Init(q)
15-
heap.Push(q, queue.NewItem(2, "b"))
16-
heap.Push(q, queue.NewItem(3, "c"))
17-
heap.Push(q, queue.NewItem(1, "a"))
18-
heap.Push(q, queue.NewItem(2, "b"))
19-
heap.Push(q, queue.NewItem(1, "a"))
20-
heap.Push(q, queue.NewItem(3, "c"))
15+
16+
q.PushItem(2, "b")
17+
q.PushItem(3, "c")
18+
q.PushItem(1, "a")
19+
q.PushItem(2, "b")
20+
q.PushItem(1, "a")
21+
q.PushItem(3, "c")
2122

2223
assert.Equal(t, 6, q.Len())
2324

24-
for _, want := range []string{"a", "a", "b", "b", "c", "c"} {
25-
item := heap.Pop(q).(*queue.Item[string])
26-
assert.Equal(t, want, item.Value())
25+
for _, want := range []string{"a", "a", "b", "b", "c", "c", ""} {
26+
assert.Equal(t, want, q.PopItem())
2727
}
2828

2929
assert.Equal(t, 0, q.Len())

semaphore/semaphore.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (s *Prioritized) assessEntries() {
5858
if s.entries.Len() == 0 {
5959
return
6060
}
61-
entry := heap.Pop(s.entries).(*queue.Item[entry]).Value()
61+
entry := s.entries.PopItem()
6262

6363
select {
6464
case <-entry.cancelChan:
@@ -71,7 +71,7 @@ func (s *Prioritized) assessEntries() {
7171
}
7272
}
7373

74-
func (s *Prioritized) AcquireContext(ctx context.Context, priority int) error {
74+
func (s *Prioritized) acquireInternal(ctx context.Context, priority int, force bool) error {
7575
waitChan := make(chan struct{})
7676
cancelChan := make(chan struct{})
7777

@@ -81,7 +81,7 @@ func (s *Prioritized) AcquireContext(ctx context.Context, priority int) error {
8181
}
8282

8383
s.lock.Lock()
84-
heap.Push(s.entries, queue.NewItem(priority, entry))
84+
s.entries.PushItem(priority, entry)
8585
s.lock.Unlock()
8686

8787
go func() {
@@ -97,8 +97,18 @@ func (s *Prioritized) AcquireContext(ctx context.Context, priority int) error {
9797
}
9898
}
9999

100+
func (s *Prioritized) AcquireContext(ctx context.Context, priority int) error {
101+
return s.acquireInternal(ctx, priority, false)
102+
}
103+
100104
func (s *Prioritized) Acquire(priority int) {
101-
s.AcquireContext(context.Background(), priority)
105+
_ = s.acquireInternal(context.Background(), priority, false)
106+
}
107+
108+
func (s *Prioritized) ForceAcquire() {
109+
s.lock.Lock()
110+
defer s.lock.Unlock()
111+
s.concurrency++
102112
}
103113

104114
func (s *Prioritized) Release() {

semaphore/semaphore_test.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ func testOrderForConcurrency(maxConcurrency int, totalTasks int) []int {
5656

5757
// Saturate the scheduler otherwise subsequent tasks will be executed
5858
// immediately in undefined order.
59-
for i := 0; i < maxConcurrency; i++ {
59+
for range maxConcurrency {
6060
go func() {
6161
s.Acquire(0)
6262
defer s.Release()
63+
6364
time.Sleep(10 * time.Millisecond)
6465
}()
6566
}
@@ -72,7 +73,7 @@ func testOrderForConcurrency(maxConcurrency int, totalTasks int) []int {
7273
var wg sync.WaitGroup
7374

7475
for i := totalTasks / maxConcurrency; i > 0; i-- {
75-
for j := 0; j < maxConcurrency; j++ {
76+
for range maxConcurrency {
7677
priority := i
7778
wg.Add(1)
7879
go func() {
@@ -102,8 +103,9 @@ func TestCancel(t *testing.T) {
102103
// immediately without waiting.
103104
go func() {
104105
s.Acquire(0)
106+
defer s.Release()
107+
105108
time.Sleep(10 * time.Millisecond)
106-
s.Release()
107109
}()
108110

109111
// Give the scheduler some time to start the goroutine.
@@ -118,3 +120,64 @@ func TestCancel(t *testing.T) {
118120
require.Error(t, err)
119121
assert.Equal(t, context.DeadlineExceeded, err)
120122
}
123+
124+
func TestForceAcquire(t *testing.T) {
125+
s := semaphore.NewPrioritized(semaphore.WithMaxConcurrency(1))
126+
127+
// Saturate the scheduler otherwise subsequent tasks will be executed
128+
// immediately in undefined order.
129+
go func() {
130+
s.Acquire(0)
131+
defer s.Release()
132+
133+
time.Sleep(10 * time.Millisecond)
134+
}()
135+
136+
// Give the scheduler some time to start the goroutine.
137+
time.Sleep(1 * time.Millisecond)
138+
139+
results := make([]bool, 0)
140+
var lock sync.Mutex
141+
var wg sync.WaitGroup
142+
143+
for range 5 {
144+
wg.Add(1)
145+
go func() {
146+
defer wg.Done()
147+
148+
s.Acquire(0)
149+
defer s.Release()
150+
151+
time.Sleep(10 * time.Millisecond)
152+
lock.Lock()
153+
defer lock.Unlock()
154+
results = append(results, false)
155+
}()
156+
}
157+
// Give the scheduler some time to start the goroutine.
158+
time.Sleep(1 * time.Millisecond)
159+
160+
// These tasks start later, but they should finish earlier as they're
161+
// prioritized by the force acquire - while the normal prioritized tasks are
162+
// waiting for the concurrency to be available.
163+
for range 5 {
164+
wg.Add(1)
165+
go func() {
166+
defer wg.Done()
167+
168+
s.ForceAcquire()
169+
defer s.Release()
170+
171+
lock.Lock()
172+
time.Sleep(10 * time.Millisecond)
173+
defer lock.Unlock()
174+
results = append(results, true)
175+
}()
176+
}
177+
178+
wg.Wait()
179+
180+
expected := []bool{true, true, true, true, true, false, false, false, false, false}
181+
require.Len(t, results, len(expected))
182+
assert.Equal(t, expected, results)
183+
}

0 commit comments

Comments
 (0)