Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions events/loop/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loop

import "sync"

type Factory[T any] struct {
size uint64

segPool sync.Pool
loopPool sync.Pool
}

func New[T any](size uint64) *Factory[T] {
f := &Factory[T]{
size: size,
segPool: sync.Pool{
New: func() any {
return new(queueSegment[T])
},
},
}

f.loopPool = sync.Pool{
New: func() any {
return &loop[T]{
factory: f,
}
},
}

return f
}

func (f *Factory[T]) NewLoop(h Handler[T]) Interface[T] {
l := f.loopPool.Get().(*loop[T])

seg := l.getSegment()
l.head = seg
l.tail = seg
l.closeCh = make(chan struct{})
l.handler = h
l.closed.Store(false)

return l
}

func (f *Factory[T]) CacheLoop(l Interface[T]) {
f.loopPool.Put(l)
}
90 changes: 31 additions & 59 deletions events/loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package loop
import (
"context"
"sync"
"sync/atomic"
)

type Handler[T any] interface {
Expand All @@ -26,32 +27,21 @@ type Interface[T any] interface {
Run(ctx context.Context) error
Enqueue(t T)
Close(t T)
Reset(h Handler[T], size uint64) Interface[T]
}

type loop[T any] struct {
factory *Factory[T]

head *queueSegment[T]
tail *queueSegment[T]

segSize uint64

handler Handler[T]

closed bool
closeCh chan struct{}

lock sync.Mutex

segPool sync.Pool
}
closed atomic.Bool

func New[T any](h Handler[T], size uint64) Interface[T] {
l := new(loop[T])
return l.Reset(h, size)
}
closeCh chan struct{}

func Empty[T any]() Interface[T] {
return new(loop[T])
lock sync.RWMutex
}

func (l *loop[T]) Run(ctx context.Context) error {
Expand All @@ -78,23 +68,37 @@ func (l *loop[T]) Run(ctx context.Context) error {
}

func (l *loop[T]) Enqueue(req T) {
l.lock.Lock()
defer l.lock.Unlock()
l.lock.RLock()

if l.closed {
if l.closed.Load() {
l.lock.RUnlock()
return
}

if l.tail == nil {
seg := l.getSegment()
l.head = seg
l.tail = seg
}

// First try to send to the current tail segment without blocking.
select {
case l.tail.ch <- req:
l.lock.RUnlock()
return
default:
l.lock.RUnlock()
}

// Tail is full; need to acquire write lock to roll over. If no longer full
// (lost race, another goroutine rolled over first), don't expand.

l.lock.Lock()
defer l.lock.Unlock()

if l.closed.Load() {
// Closed while we were waiting for the lock.
return
}

// Try again to send to the tail; if successful, another goroutine must
// have rolled over for us.
select {
case l.tail.ch <- req:
default:
// Tail is full: create a new segment, link it, close the old tail, and
// send into the new tail.
Expand All @@ -108,20 +112,13 @@ func (l *loop[T]) Enqueue(req T) {

func (l *loop[T]) Close(req T) {
l.lock.Lock()
if l.closed {
if l.closed.Load() {
// Already closed; just unlock and wait for Run to finish.
l.lock.Unlock()
<-l.closeCh
return
}
l.closed = true

// Ensure at least one segment exists.
if l.tail == nil {
seg := l.getSegment()
l.head = seg
l.tail = seg
}
l.closed.Store(true)

// Enqueue the final request; if the tail is full, roll over as in Enqueue.
select {
Expand All @@ -141,28 +138,3 @@ func (l *loop[T]) Close(req T) {
// Wait for Run to finish draining everything.
<-l.closeCh
}

func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] {
if l == nil {
return New[T](h, size)
}

l.lock.Lock()
defer l.lock.Unlock()

l.closed = false
l.closeCh = make(chan struct{})
l.handler = h
l.segSize = size

// Initialize pool for this instantiation of T.
l.segPool.New = func() any {
return new(queueSegment[T])
}

seg := l.getSegment()
l.head = seg
l.tail = seg

return l
}
15 changes: 10 additions & 5 deletions events/loop/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) {
h := &testHandler[int]{}
const segmentSize = 4

l := New[int](h, segmentSize)
f := New[int](segmentSize)
l := f.NewLoop(h)

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -103,7 +104,8 @@ func TestLoop_CloseTwiceIsSafe(t *testing.T) {
defer cancel()

h := &testHandler[int]{}
l := New[int](h, 2)
f := New[int](2)
l := f.NewLoop(h)

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -146,7 +148,8 @@ func TestLoop_Reset(t *testing.T) {
defer cancel()

h1 := &testHandler[int]{}
l := New[int](h1, 2)
f := New[int](2)
l := f.NewLoop(h1)

var wg1 sync.WaitGroup
wg1.Add(1)
Expand All @@ -167,7 +170,8 @@ func TestLoop_Reset(t *testing.T) {

// Reset to a new handler and buffer size.
h2 := &testHandler[int]{}
l = l.Reset(h2, 8)
f = New[int](8)
l = f.NewLoop(h2)

require.NotNil(t, l)

Expand Down Expand Up @@ -195,7 +199,8 @@ func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) {
defer cancel()

h := &testHandler[int]{}
l := New[int](h, 2)
f := New[int](2)
l := f.NewLoop(h)

var wg sync.WaitGroup
wg.Add(1)
Expand Down
12 changes: 3 additions & 9 deletions events/loop/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@ type queueSegment[T any] struct {
// getSegment gets a queueSegment from the pool or allocates a new one.
// It always initializes a fresh channel of the configured size.
func (l *loop[T]) getSegment() *queueSegment[T] {
if l.segPool.New == nil {
l.segPool.New = func() any {
return new(queueSegment[T])
}
}

seg := l.segPool.Get().(*queueSegment[T])
seg := l.factory.segPool.Get().(*queueSegment[T])
seg.next = nil

segSize := l.segSize
segSize := l.factory.size
if segSize == 0 {
segSize = 1
}
Expand All @@ -50,5 +44,5 @@ func (l *loop[T]) putSegment(seg *queueSegment[T]) {
}
seg.ch = nil
seg.next = nil
l.segPool.Put(seg)
l.factory.segPool.Put(seg)
}