Skip to content

Commit 42f58be

Browse files
Converted flush to use single access model
1 parent 4e21f04 commit 42f58be

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

triton/writer_batch.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ func NewBatchWriterSize(w Writer, size int, intr time.Duration) *BatchWriter {
2929
bufferSize := 10000
3030

3131
bw := &BatchWriter{
32-
w: w,
33-
buf: make(chan Record, bufferSize),
34-
signal: make(chan struct{}, bufferSize),
35-
errors: make(chan error, 1),
32+
w: w,
33+
buf: make(chan Record, bufferSize),
34+
signal: make(chan struct{}, bufferSize),
35+
flushSignal: make(chan struct{}, 0), // must be blocking write/read
36+
errors: make(chan error, 1),
3637

3738
size: size,
3839
intr: intr,
@@ -60,8 +61,9 @@ type BatchWriter struct {
6061
size int
6162
intr time.Duration
6263

63-
ticker *time.Ticker
64-
signal chan struct{}
64+
ticker *time.Ticker
65+
signal chan struct{}
66+
flushSignal chan struct{}
6567
}
6668

6769
func (bw *BatchWriter) writeLoop() {
@@ -77,11 +79,17 @@ func (bw *BatchWriter) writeLoop() {
7779
}
7880
case <-bw.ticker.C:
7981
bw.flush()
82+
case _, ok := <-bw.flushSignal:
83+
if ok {
84+
bw.flush()
85+
bw.flushSignal <- struct{}{}
86+
}
8087
}
8188
}
8289
}
8390

84-
// flush writes everything in the current buffer
91+
// flush writes everything in the current buffer. To prevent concurrent flushes,
92+
// only write_loop() is allowed to call this.
8593
func (bw *BatchWriter) flush() {
8694
numBuffered := len(bw.buf)
8795
if numBuffered == 0 {
@@ -109,7 +117,8 @@ func (bw *BatchWriter) flush() {
109117
// Flush forces all buffered records to be sent.
110118
// If there is an error it will have been written to the Errors chan.
111119
func (bw *BatchWriter) Flush() {
112-
bw.flush()
120+
bw.flushSignal <- struct{}{} // ask to flush
121+
<-bw.flushSignal // block until finish
113122
}
114123

115124
// Close prevents future writes and flushes all currently buffered records.
@@ -118,6 +127,7 @@ func (bw *BatchWriter) Close() {
118127
bw.ticker.Stop()
119128
close(bw.signal)
120129
bw.Flush()
130+
close(bw.flushSignal)
121131
close(bw.errors)
122132
}
123133

@@ -143,6 +153,6 @@ func (bw *BatchWriter) WriteRecords(rs ...Record) error {
143153
for _, r := range rs {
144154
bw.buf <- r
145155
}
146-
bw.signal <- struct{}{} // sigal that buffer changed
156+
bw.signal <- struct{}{} // signal that buffer changed
147157
return nil
148158
}

0 commit comments

Comments
 (0)