forked from valyala/batcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbytesBatcher.go
142 lines (131 loc) · 3.37 KB
/
bytesBatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package batcher
import (
"sync"
"time"
)
// BytesBatcher consructs a byte slice on every Push call and calls BatchFunc
// on every MaxBatchSize Push calls or MaxDelay interval.
//
// See also Batcher.
type BytesBatcher struct {
// BatchFunc is called when either MaxBatchSize or MaxDelay is reached.
//
// * b contains a byte slice constructed when Push is called.
// * items contains the number of Push calls used for constructing b.
//
// BytesBatcher prevents calling BatchFunc from concurrently running
// goroutines.
//
// b mustn't be accessed after returning from BatchFunc.
BatchFunc func(b []byte, items int)
// HeaderFunc is called before starting new batch.
//
// HeaderFunc must append header data to dst and return the resulting
// byte slice.
//
// dst mustn't be accessed after returning from HeaderFunc.
//
// HeaderFunc may be nil.
HeaderFunc func(dst []byte) []byte
// FooterFunc is called before the batch is passed to BatchFunc.
//
// FooterFunc must append footer data to dst and return the resulting
// byte slice.
//
// dst mustn't be accessed after returning from FooterFunc.
//
// FooterFunc may be nil.
FooterFunc func(dst []byte) []byte
// MaxBatchSize the the maximum batch size.
MaxBatchSize int
// MaxDelay is the maximum duration before BatchFunc is called
// unless MaxBatchSize is reached.
MaxDelay time.Duration
once sync.Once
lock sync.Mutex
b []byte
pendingB []byte
items int
lastExecTime time.Time
}
// Push calls appendFunc on a byte slice.
//
// appendFunc must append data to dst and return the resulting byte slice.
// dst mustn't be accessed after returning from appendFunc.
//
// The function returns false if the batch reached MaxBatchSize and BatchFunc
// isn't returned yet.
func (b *BytesBatcher) Push(appendFunc func(dst []byte) []byte) bool {
b.once.Do(b.init)
b.lock.Lock()
if b.items >= b.MaxBatchSize && !b.execNolock() {
b.lock.Unlock()
return false
}
if b.items == 0 {
if b.HeaderFunc != nil {
b.b = b.HeaderFunc(b.b)
}
}
b.items++
b.b = appendFunc(b.b)
if b.items >= b.MaxBatchSize {
b.execNolockNocheck()
}
b.lock.Unlock()
return true
}
func (b *BytesBatcher) init() {
go func() {
maxDelay := b.MaxDelay
delay := maxDelay
for {
time.Sleep(delay)
b.lock.Lock()
d := time.Since(b.lastExecTime)
if float64(d) > 0.9*float64(maxDelay) {
if b.items > 0 {
b.execNolockNocheck()
}
delay = maxDelay
} else {
delay = maxDelay - d
}
b.lock.Unlock()
}
}()
}
func (b *BytesBatcher) execNolockNocheck() {
// Do not check the returned value, since the previous batch
// may be still pending in BatchFunc.
// The error will be discovered on the next Push.
b.execNolock()
}
func (b *BytesBatcher) execNolock() bool {
if len(b.pendingB) > 0 {
return false
}
if b.FooterFunc != nil {
b.b = b.FooterFunc(b.b)
}
b.pendingB = append(b.pendingB[:0], b.b...)
b.b = b.b[:0]
items := b.items
b.items = 0
b.lastExecTime = time.Now()
go func(data []byte, items int) {
b.BatchFunc(data, items)
b.lock.Lock()
b.pendingB = b.pendingB[:0]
if cap(b.pendingB) > 64*1024 {
// A hack: throw big pendingB slice to GC in order
// to reduce memory usage between BatchFunc calls.
//
// Keep small pendingB slices in order to reduce
// load on GC.
b.pendingB = nil
}
b.lock.Unlock()
}(b.pendingB, items)
return true
}