Skip to content

Commit 1d46cb6

Browse files
authored
Merge pull request #47 from eapache/fix-batcher-race
Refactor batcher and fix race condition
2 parents 2a81223 + 8652ab4 commit 1d46cb6

File tree

2 files changed

+27
-28
lines changed

2 files changed

+27
-28
lines changed

.github/workflows/golang-ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ jobs:
2121
run: go build -v ./...
2222

2323
- name: Test
24-
run: go test -v ./...
24+
run: go test -race -v ./...

batcher/batcher.go

+26-27
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@ type Batcher struct {
1616
timeout time.Duration
1717
prefilter func(interface{}) error
1818

19-
lock sync.Mutex
20-
submit chan *work
21-
doWork func([]interface{}) error
22-
done chan bool
19+
lock sync.Mutex
20+
submit chan *work
21+
doWork func([]interface{}) error
22+
batchCounter sync.WaitGroup
23+
flushTimer *time.Timer
2324
}
2425

2526
// New constructs a new batcher that will batch all calls to Run that occur within
2627
// `timeout` time before calling doWork just once for the entire batch. The doWork
2728
// function must be safe to run concurrently with itself as this may occur, especially
28-
// when the timeout is small.
29+
// when the doWork function is slow, or the timeout is small.
2930
func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
3031
return &Batcher{
3132
timeout: timeout,
@@ -70,21 +71,23 @@ func (b *Batcher) submitWork(w *work) {
7071
b.lock.Lock()
7172
defer b.lock.Unlock()
7273

74+
// kick off a new batch if needed
7375
if b.submit == nil {
74-
b.done = make(chan bool)
76+
b.batchCounter.Add(1)
7577
b.submit = make(chan *work, 4)
76-
go b.batch()
78+
go b.batch(b.submit)
79+
b.flushTimer = time.AfterFunc(b.timeout, b.flushCurrentBatch)
7780
}
7881

82+
// then add this work to the current batch
7983
b.submit <- w
8084
}
8185

82-
func (b *Batcher) batch() {
86+
func (b *Batcher) batch(input <-chan *work) {
87+
defer b.batchCounter.Done()
88+
8389
var params []interface{}
8490
var futures []chan error
85-
input := b.submit
86-
87-
go b.timer()
8891

8992
for work := range input {
9093
params = append(params, work.param)
@@ -97,37 +100,33 @@ func (b *Batcher) batch() {
97100
future <- ret
98101
close(future)
99102
}
100-
close(b.done)
101103
}
102104

103-
func (b *Batcher) timer() {
104-
time.Sleep(b.timeout)
105-
106-
b.flush()
107-
}
108-
109-
// Shutdown flush the changes and wait to be saved
105+
// Shutdown flushes and executes any pending batches. If wait is true, it also waits for the pending batches
106+
// to finish executing before it returns. This can be used to avoid waiting for the timeout to expire when
107+
// gracefully shutting down your application. Calling Run at any point after calling Shutdown will lead to
108+
// undefined behaviour.
110109
func (b *Batcher) Shutdown(wait bool) {
111-
b.flush()
110+
b.flushCurrentBatch()
112111

113112
if wait {
114-
if b.done != nil {
115-
// wait done channel
116-
<-b.done
117-
}
113+
b.batchCounter.Wait()
118114
}
119115
}
120116

121-
// Flush saves the changes before the timer expires.
122-
// It is useful to flush the changes when you shutdown your application
123-
func (b *Batcher) flush() {
117+
func (b *Batcher) flushCurrentBatch() {
124118
b.lock.Lock()
125119
defer b.lock.Unlock()
126120

127121
if b.submit == nil {
128122
return
129123
}
130124

125+
// stop the timer to avoid spurious flushes and trigger immediate cleanup in case this flush was
126+
// triggered manually by a call to Shutdown (it has to happen inside the lock, so it can't be done
127+
// in the Shutdown method directly)
128+
b.flushTimer.Stop()
129+
131130
close(b.submit)
132131
b.submit = nil
133132
}

0 commit comments

Comments
 (0)