forked from valyala/batcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatcher.go
146 lines (130 loc) · 2.96 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Package batcher groups items in batches
// and calls the user-specified function on these batches.
package batcher
import (
"time"
)
// Batcher groups items in batches and calls Func on them.
//
// See also BytesBatcher.
type Batcher struct {
// Func is called by Batcher when batch is ready to be processed.
Func BatcherFunc
// Maximum batch size that will be passed to BatcherFunc.
MaxBatchSize int
// Maximum delay between Push() and BatcherFunc call.
MaxDelay time.Duration
// Maximum unprocessed items' queue size.
QueueSize int
ch chan interface{}
doneCh chan struct{}
}
// BatcherFunc is called by Batcher when batch is ready to be processed.
//
// BatcherFunc must process the given batch before returning.
// It must not hold references to the batch after returning.
type BatcherFunc func(batch []interface{})
// Start starts batch processing.
func (b *Batcher) Start() {
if b.ch != nil {
panic("batcher already started")
}
if b.Func == nil {
panic("Batcher.Func must be set")
}
if b.QueueSize <= 0 {
b.QueueSize = 8 * 1024
}
if b.MaxBatchSize <= 0 {
b.MaxBatchSize = 64 * 1024
}
if b.MaxDelay <= 0 {
b.MaxDelay = time.Millisecond
}
b.ch = make(chan interface{}, b.QueueSize)
b.doneCh = make(chan struct{})
go func() {
processBatches(b.Func, b.ch, b.MaxBatchSize, b.MaxDelay)
close(b.doneCh)
}()
}
// Stop stops batch processing.
func (b *Batcher) Stop() {
if b.ch == nil {
panic("BUG: forgot calling Batcher.Start()?")
}
close(b.ch)
<-b.doneCh
b.ch = nil
b.doneCh = nil
}
// Push pushes new item into the batcher.
//
// Don't forget calling Start() before pushing items into the batcher.
func (b *Batcher) Push(x interface{}) bool {
if b.ch == nil {
panic("BUG: forgot calling Batcher.Start()?")
}
select {
case b.ch <- x:
return true
default:
return false
}
}
// QueueLen returns the number of pending items, which weren't passed into
// BatcherFunc yet.
//
// Maximum number of pending items is Batcher.QueueSize.
func (b *Batcher) QueueLen() int {
return len(b.ch)
}
func processBatches(f BatcherFunc, ch <-chan interface{}, maxBatchSize int, maxDelay time.Duration) {
var batch []interface{}
var x interface{}
var ok bool
lastPushTime := time.Now()
for {
select {
case x, ok = <-ch:
if !ok {
call(f, batch)
return
}
batch = append(batch, x)
default:
if len(batch) == 0 {
x, ok = <-ch
if !ok {
call(f, batch)
return
}
batch = append(batch, x)
} else {
if delay := maxDelay - time.Since(lastPushTime); delay > 0 {
t := acquireTimer(delay)
select {
case x, ok = <-ch:
if !ok {
call(f, batch)
return
}
batch = append(batch, x)
case <-t.C:
}
releaseTimer(t)
}
}
}
if len(batch) >= maxBatchSize || time.Since(lastPushTime) > maxDelay {
lastPushTime = time.Now()
call(f, batch)
batch = batch[:0]
}
}
}
func call(f BatcherFunc, batch []interface{}) {
if len(batch) > 0 {
f(batch)
}
}