Skip to content

Commit 3ac186d

Browse files
authored
events/loop: Factory & reduce lock contention (#140)
Update the loop implementation to use an optimistic RLock for enqueueing, falling back to a full Lock only when the queue segment is full. This reduces lock contention. Uses a generic typed Factory to allow for the loop to be sync.Pool cached by the consumer, on demand. The loop does not cache itself as the loop may continue to be used after being closed. Signed-off-by: joshvanl <[email protected]>
1 parent 2fd5d0c commit 3ac186d

File tree

4 files changed

+105
-73
lines changed

4 files changed

+105
-73
lines changed

events/loop/factory.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package loop
15+
16+
import "sync"
17+
18+
type Factory[T any] struct {
19+
size uint64
20+
21+
segPool sync.Pool
22+
loopPool sync.Pool
23+
}
24+
25+
func New[T any](size uint64) *Factory[T] {
26+
f := &Factory[T]{
27+
size: size,
28+
segPool: sync.Pool{
29+
New: func() any {
30+
return new(queueSegment[T])
31+
},
32+
},
33+
}
34+
35+
f.loopPool = sync.Pool{
36+
New: func() any {
37+
return &loop[T]{
38+
factory: f,
39+
}
40+
},
41+
}
42+
43+
return f
44+
}
45+
46+
func (f *Factory[T]) NewLoop(h Handler[T]) Interface[T] {
47+
l := f.loopPool.Get().(*loop[T])
48+
49+
seg := l.getSegment()
50+
l.head = seg
51+
l.tail = seg
52+
l.closeCh = make(chan struct{})
53+
l.handler = h
54+
l.closed.Store(false)
55+
56+
return l
57+
}
58+
59+
func (f *Factory[T]) CacheLoop(l Interface[T]) {
60+
f.loopPool.Put(l)
61+
}

events/loop/loop.go

Lines changed: 31 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package loop
1616
import (
1717
"context"
1818
"sync"
19+
"sync/atomic"
1920
)
2021

2122
type Handler[T any] interface {
@@ -26,32 +27,21 @@ type Interface[T any] interface {
2627
Run(ctx context.Context) error
2728
Enqueue(t T)
2829
Close(t T)
29-
Reset(h Handler[T], size uint64) Interface[T]
3030
}
3131

3232
type loop[T any] struct {
33+
factory *Factory[T]
34+
3335
head *queueSegment[T]
3436
tail *queueSegment[T]
3537

36-
segSize uint64
37-
3838
handler Handler[T]
3939

40-
closed bool
41-
closeCh chan struct{}
42-
43-
lock sync.Mutex
44-
45-
segPool sync.Pool
46-
}
40+
closed atomic.Bool
4741

48-
func New[T any](h Handler[T], size uint64) Interface[T] {
49-
l := new(loop[T])
50-
return l.Reset(h, size)
51-
}
42+
closeCh chan struct{}
5243

53-
func Empty[T any]() Interface[T] {
54-
return new(loop[T])
44+
lock sync.RWMutex
5545
}
5646

5747
func (l *loop[T]) Run(ctx context.Context) error {
@@ -78,23 +68,37 @@ func (l *loop[T]) Run(ctx context.Context) error {
7868
}
7969

8070
func (l *loop[T]) Enqueue(req T) {
81-
l.lock.Lock()
82-
defer l.lock.Unlock()
71+
l.lock.RLock()
8372

84-
if l.closed {
73+
if l.closed.Load() {
74+
l.lock.RUnlock()
8575
return
8676
}
8777

88-
if l.tail == nil {
89-
seg := l.getSegment()
90-
l.head = seg
91-
l.tail = seg
92-
}
93-
9478
// First try to send to the current tail segment without blocking.
9579
select {
9680
case l.tail.ch <- req:
81+
l.lock.RUnlock()
9782
return
83+
default:
84+
l.lock.RUnlock()
85+
}
86+
87+
// Tail is full; need to acquire write lock to roll over. If no longer full
88+
// (lost race, another goroutine rolled over first), don't expand.
89+
90+
l.lock.Lock()
91+
defer l.lock.Unlock()
92+
93+
if l.closed.Load() {
94+
// Closed while we were waiting for the lock.
95+
return
96+
}
97+
98+
// Try again to send to the tail; if successful, another goroutine must
99+
// have rolled over for us.
100+
select {
101+
case l.tail.ch <- req:
98102
default:
99103
// Tail is full: create a new segment, link it, close the old tail, and
100104
// send into the new tail.
@@ -108,20 +112,13 @@ func (l *loop[T]) Enqueue(req T) {
108112

109113
func (l *loop[T]) Close(req T) {
110114
l.lock.Lock()
111-
if l.closed {
115+
if l.closed.Load() {
112116
// Already closed; just unlock and wait for Run to finish.
113117
l.lock.Unlock()
114118
<-l.closeCh
115119
return
116120
}
117-
l.closed = true
118-
119-
// Ensure at least one segment exists.
120-
if l.tail == nil {
121-
seg := l.getSegment()
122-
l.head = seg
123-
l.tail = seg
124-
}
121+
l.closed.Store(true)
125122

126123
// Enqueue the final request; if the tail is full, roll over as in Enqueue.
127124
select {
@@ -141,28 +138,3 @@ func (l *loop[T]) Close(req T) {
141138
// Wait for Run to finish draining everything.
142139
<-l.closeCh
143140
}
144-
145-
func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] {
146-
if l == nil {
147-
return New[T](h, size)
148-
}
149-
150-
l.lock.Lock()
151-
defer l.lock.Unlock()
152-
153-
l.closed = false
154-
l.closeCh = make(chan struct{})
155-
l.handler = h
156-
l.segSize = size
157-
158-
// Initialize pool for this instantiation of T.
159-
l.segPool.New = func() any {
160-
return new(queueSegment[T])
161-
}
162-
163-
seg := l.getSegment()
164-
l.head = seg
165-
l.tail = seg
166-
167-
return l
168-
}

events/loop/loop_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) {
6161
h := &testHandler[int]{}
6262
const segmentSize = 4
6363

64-
l := New[int](h, segmentSize)
64+
f := New[int](segmentSize)
65+
l := f.NewLoop(h)
6566

6667
var wg sync.WaitGroup
6768
wg.Add(1)
@@ -103,7 +104,8 @@ func TestLoop_CloseTwiceIsSafe(t *testing.T) {
103104
defer cancel()
104105

105106
h := &testHandler[int]{}
106-
l := New[int](h, 2)
107+
f := New[int](2)
108+
l := f.NewLoop(h)
107109

108110
var wg sync.WaitGroup
109111
wg.Add(1)
@@ -146,7 +148,8 @@ func TestLoop_Reset(t *testing.T) {
146148
defer cancel()
147149

148150
h1 := &testHandler[int]{}
149-
l := New[int](h1, 2)
151+
f := New[int](2)
152+
l := f.NewLoop(h1)
150153

151154
var wg1 sync.WaitGroup
152155
wg1.Add(1)
@@ -167,7 +170,8 @@ func TestLoop_Reset(t *testing.T) {
167170

168171
// Reset to a new handler and buffer size.
169172
h2 := &testHandler[int]{}
170-
l = l.Reset(h2, 8)
173+
f = New[int](8)
174+
l = f.NewLoop(h2)
171175

172176
require.NotNil(t, l)
173177

@@ -195,7 +199,8 @@ func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) {
195199
defer cancel()
196200

197201
h := &testHandler[int]{}
198-
l := New[int](h, 2)
202+
f := New[int](2)
203+
l := f.NewLoop(h)
199204

200205
var wg sync.WaitGroup
201206
wg.Add(1)

events/loop/segment.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,10 @@ type queueSegment[T any] struct {
2525
// getSegment gets a queueSegment from the pool or allocates a new one.
2626
// It always initializes a fresh channel of the configured size.
2727
func (l *loop[T]) getSegment() *queueSegment[T] {
28-
if l.segPool.New == nil {
29-
l.segPool.New = func() any {
30-
return new(queueSegment[T])
31-
}
32-
}
33-
34-
seg := l.segPool.Get().(*queueSegment[T])
28+
seg := l.factory.segPool.Get().(*queueSegment[T])
3529
seg.next = nil
3630

37-
segSize := l.segSize
31+
segSize := l.factory.size
3832
if segSize == 0 {
3933
segSize = 1
4034
}
@@ -50,5 +44,5 @@ func (l *loop[T]) putSegment(seg *queueSegment[T]) {
5044
}
5145
seg.ch = nil
5246
seg.next = nil
53-
l.segPool.Put(seg)
47+
l.factory.segPool.Put(seg)
5448
}

0 commit comments

Comments
 (0)