File tree Expand file tree Collapse file tree 1 file changed +28
-5
lines changed
Expand file tree Collapse file tree 1 file changed +28
-5
lines changed Original file line number Diff line number Diff line change @@ -244,7 +244,23 @@ func (w *Writer) worker(ctx context.Context) {
244244 // open first file
245245 rotateCheck ()
246246
247- ticker := time .NewTicker (100 * time .Millisecond )
247+ tickerC := make (chan struct {}, 1 )
248+
249+ go func () {
250+ for {
251+ select {
252+ case <- ctx .Done ():
253+ return
254+ case <- time .After (100 * time .Millisecond ):
255+ select {
256+ case tickerC <- struct {}{}:
257+ // pass
258+ case <- ctx .Done ():
259+ return
260+ }
261+ }
262+ }
263+ }()
248264
249265 write := func (b * RowBinary.WriteBuffer ) {
250266 _ , err := outBuf .Write (b .Body [:b .Used ])
@@ -270,10 +286,8 @@ func (w *Writer) worker(ctx context.Context) {
270286 select {
271287 case b := <- w .inputChan :
272288 write (b )
273- case <- ticker .C :
274- if size > 0 {
275- rotateCheck ()
276- }
289+ case <- tickerC :
290+ rotateCheck ()
277291 case <- ctx .Done ():
278292 return
279293 default : // outBuf flush if nothing received
@@ -284,6 +298,15 @@ func (w *Writer) worker(ctx context.Context) {
284298 w .logger .Error ("CompWriter Flush() failed" , zap .Error (err ))
285299 }
286300 }
301+
302+ select {
303+ case b := <- w .inputChan :
304+ write (b )
305+ case <- tickerC :
306+ rotateCheck ()
307+ case <- ctx .Done ():
308+ return
309+ }
287310 }
288311 }
289312}
You can’t perform that action at this time.
0 commit comments