Skip to content

Commit 2fd5d0c

Browse files
authored
events/loop: dynamically increase queue size (#137)
* events/loop: dynamically increase queue size Update the event control loop implementation to dynamically grow the event queue when it reaches capacity. This change allows the loop to handle a larger number of events without blocking Enqueue, removing issues of circular dependencies in certain scenarios. Queues are now split into segments, each with the given fixed size. When the current segments are full, a new segment is allocated and linked to the end of the queue. This allows the queue to grow as needed. Uses `sync.Pool` to reduce allocations for segments. Signed-off-by: joshvanl <[email protected]> * lint Signed-off-by: joshvanl <[email protected]> * Use `sync.Mutex` instead of `sync.RWMutex` for loop mutexes Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 6c6f92a commit 2fd5d0c

File tree

3 files changed

+356
-24
lines changed

3 files changed

+356
-24
lines changed

events/loop/loop.go

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,24 @@ type Interface[T any] interface {
3030
}
3131

3232
type loop[T any] struct {
33-
queue chan T
33+
head *queueSegment[T]
34+
tail *queueSegment[T]
35+
36+
segSize uint64
37+
3438
handler Handler[T]
3539

3640
closed bool
3741
closeCh chan struct{}
38-
lock sync.RWMutex
42+
43+
lock sync.Mutex
44+
45+
segPool sync.Pool
3946
}
4047

4148
func New[T any](h Handler[T], size uint64) Interface[T] {
42-
return &loop[T]{
43-
queue: make(chan T, size),
44-
handler: h,
45-
closeCh: make(chan struct{}),
46-
}
49+
l := new(loop[T])
50+
return l.Reset(h, size)
4751
}
4852

4953
func Empty[T any]() Interface[T] {
@@ -53,41 +57,88 @@ func Empty[T any]() Interface[T] {
5357
func (l *loop[T]) Run(ctx context.Context) error {
5458
defer close(l.closeCh)
5559

56-
for {
57-
req, ok := <-l.queue
58-
if !ok {
59-
return nil
60+
current := l.head
61+
for current != nil {
62+
// Drain this segment in order. The channel will be closed either:
63+
// - when we "roll over" to a new segment, or
64+
// - when Close() is called for the final segment.
65+
for req := range current.ch {
66+
if err := l.handler.Handle(ctx, req); err != nil {
67+
return err
68+
}
6069
}
6170

62-
if err := l.handler.Handle(ctx, req); err != nil {
63-
return err
64-
}
71+
// Move to the next segment, and return this one to the pool.
72+
next := current.next
73+
l.putSegment(current)
74+
current = next
6575
}
76+
77+
return nil
6678
}
6779

6880
func (l *loop[T]) Enqueue(req T) {
69-
l.lock.RLock()
70-
defer l.lock.RUnlock()
81+
l.lock.Lock()
82+
defer l.lock.Unlock()
7183

7284
if l.closed {
7385
return
7486
}
7587

88+
if l.tail == nil {
89+
seg := l.getSegment()
90+
l.head = seg
91+
l.tail = seg
92+
}
93+
94+
// First try to send to the current tail segment without blocking.
7695
select {
77-
case l.queue <- req:
78-
case <-l.closeCh:
96+
case l.tail.ch <- req:
97+
return
98+
default:
99+
// Tail is full: create a new segment, link it, close the old tail, and
100+
// send into the new tail.
101+
newSeg := l.getSegment()
102+
l.tail.next = newSeg
103+
close(l.tail.ch)
104+
l.tail = newSeg
105+
l.tail.ch <- req
79106
}
80107
}
81108

82109
func (l *loop[T]) Close(req T) {
83110
l.lock.Lock()
111+
if l.closed {
112+
// Already closed; just unlock and wait for Run to finish.
113+
l.lock.Unlock()
114+
<-l.closeCh
115+
return
116+
}
84117
l.closed = true
118+
119+
// Ensure at least one segment exists.
120+
if l.tail == nil {
121+
seg := l.getSegment()
122+
l.head = seg
123+
l.tail = seg
124+
}
125+
126+
// Enqueue the final request; if the tail is full, roll over as in Enqueue.
85127
select {
86-
case l.queue <- req:
87-
case <-l.closeCh:
128+
case l.tail.ch <- req:
129+
default:
130+
newSeg := l.getSegment()
131+
l.tail.next = newSeg
132+
close(l.tail.ch)
133+
l.tail = newSeg
134+
l.tail.ch <- req
88135
}
89-
close(l.queue)
136+
137+
// No more items will be enqueued; close the tail to signal completion.
138+
close(l.tail.ch)
90139
l.lock.Unlock()
140+
141+
// Wait for Run to finish draining everything.
91142
<-l.closeCh
92143
}
93144

@@ -102,10 +153,16 @@ func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] {
102153
l.closed = false
103154
l.closeCh = make(chan struct{})
104155
l.handler = h
156+
l.segSize = size
157+
158+
// Initialize pool for this instantiation of T.
159+
l.segPool.New = func() any {
160+
return new(queueSegment[T])
161+
}
105162

106-
// TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and
107-
// improve performance.
108-
l.queue = make(chan T, size)
163+
seg := l.getSegment()
164+
l.head = seg
165+
l.tail = seg
109166

110167
return l
111168
}

events/loop/loop_test.go

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package loop
15+
16+
import (
17+
"context"
18+
"sync"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
// testHandler is a simple handler that records all values it sees.
27+
type testHandler[T any] struct {
28+
mu sync.Mutex
29+
seen []T
30+
err error
31+
delay time.Duration
32+
}
33+
34+
func (h *testHandler[T]) Handle(ctx context.Context, v T) error {
35+
if h.delay > 0 {
36+
select {
37+
case <-time.After(h.delay):
38+
case <-ctx.Done():
39+
return ctx.Err()
40+
}
41+
}
42+
43+
h.mu.Lock()
44+
defer h.mu.Unlock()
45+
h.seen = append(h.seen, v)
46+
return h.err
47+
}
48+
49+
func (h *testHandler[T]) Values() []T {
50+
h.mu.Lock()
51+
defer h.mu.Unlock()
52+
out := make([]T, len(h.seen))
53+
copy(out, h.seen)
54+
return out
55+
}
56+
57+
func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) {
58+
ctx, cancel := context.WithCancel(t.Context())
59+
defer cancel()
60+
61+
h := &testHandler[int]{}
62+
const segmentSize = 4
63+
64+
l := New[int](h, segmentSize)
65+
66+
var wg sync.WaitGroup
67+
wg.Add(1)
68+
errCh := make(chan error, 1)
69+
t.Cleanup(func() {
70+
require.NoError(t, <-errCh)
71+
})
72+
go func() {
73+
defer wg.Done()
74+
errCh <- l.Run(ctx)
75+
}()
76+
77+
// Enqueue more items than a single segment to force multiple channels.
78+
const n = 25
79+
for i := range n {
80+
l.Enqueue(i)
81+
}
82+
83+
// Close with a sentinel value so we can verify it is the last element.
84+
const final = 999
85+
l.Close(final)
86+
87+
wg.Wait()
88+
89+
got := h.Values()
90+
require.Len(t, got, n+1, "handler should see all enqueued items plus final close item")
91+
92+
// First n values should be 0..n-1 in order.
93+
for i := range n {
94+
assert.Equal(t, i, got[i], "item at index %d out of order", i)
95+
}
96+
97+
// Last one is the final close value.
98+
assert.Equal(t, final, got[len(got)-1], "last item should be the Close() value")
99+
}
100+
101+
func TestLoop_CloseTwiceIsSafe(t *testing.T) {
102+
ctx, cancel := context.WithCancel(t.Context())
103+
defer cancel()
104+
105+
h := &testHandler[int]{}
106+
l := New[int](h, 2)
107+
108+
var wg sync.WaitGroup
109+
wg.Add(1)
110+
errCh := make(chan error, 1)
111+
t.Cleanup(func() {
112+
require.NoError(t, <-errCh)
113+
})
114+
go func() {
115+
defer wg.Done()
116+
errCh <- l.Run(ctx)
117+
}()
118+
119+
l.Enqueue(1)
120+
l.Close(2)
121+
122+
// Second close should not panic or deadlock.
123+
done := make(chan struct{})
124+
go func() {
125+
l.Close(3)
126+
close(done)
127+
}()
128+
129+
select {
130+
case <-done:
131+
case <-time.After(time.Second):
132+
require.Fail(t, "second Close should not block")
133+
}
134+
135+
wg.Wait()
136+
137+
got := h.Values()
138+
// First close enqueues 2, second close should be ignored.
139+
assert.Contains(t, got, 1)
140+
assert.Contains(t, got, 2)
141+
assert.NotContains(t, got, 3)
142+
}
143+
144+
func TestLoop_Reset(t *testing.T) {
145+
ctx, cancel := context.WithCancel(t.Context())
146+
defer cancel()
147+
148+
h1 := &testHandler[int]{}
149+
l := New[int](h1, 2)
150+
151+
var wg1 sync.WaitGroup
152+
wg1.Add(1)
153+
errCh := make(chan error, 1)
154+
t.Cleanup(func() {
155+
require.NoError(t, <-errCh)
156+
})
157+
go func() {
158+
defer wg1.Done()
159+
errCh <- l.Run(ctx)
160+
}()
161+
162+
l.Enqueue(1)
163+
l.Close(2)
164+
wg1.Wait()
165+
166+
assert.ElementsMatch(t, []int{1, 2}, h1.Values())
167+
168+
// Reset to a new handler and buffer size.
169+
h2 := &testHandler[int]{}
170+
l = l.Reset(h2, 8)
171+
172+
require.NotNil(t, l)
173+
174+
var wg2 sync.WaitGroup
175+
wg2.Add(1)
176+
errCh2 := make(chan error, 1)
177+
t.Cleanup(func() {
178+
require.NoError(t, <-errCh2)
179+
})
180+
go func() {
181+
defer wg2.Done()
182+
errCh2 <- l.Run(ctx)
183+
}()
184+
185+
l.Enqueue(10)
186+
l.Enqueue(11)
187+
l.Close(12)
188+
wg2.Wait()
189+
190+
assert.ElementsMatch(t, []int{10, 11, 12}, h2.Values())
191+
}
192+
193+
func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) {
194+
ctx, cancel := context.WithCancel(t.Context())
195+
defer cancel()
196+
197+
h := &testHandler[int]{}
198+
l := New[int](h, 2)
199+
200+
var wg sync.WaitGroup
201+
wg.Add(1)
202+
errCh := make(chan error, 1)
203+
t.Cleanup(func() {
204+
require.NoError(t, <-errCh)
205+
})
206+
go func() {
207+
defer wg.Done()
208+
errCh <- l.Run(ctx)
209+
}()
210+
211+
l.Enqueue(1)
212+
l.Close(2)
213+
214+
// This enqueue should be ignored.
215+
l.Enqueue(3)
216+
217+
wg.Wait()
218+
219+
got := h.Values()
220+
assert.Equal(t, []int{1, 2}, got)
221+
}

0 commit comments

Comments
 (0)