Skip to content

Commit 3d9d059

Browse files
committed
fix: race in TumblingWindowIncAggOp FirstTimer
Protected FirstTimer access with mutex and added thread-safe getter for testing. Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent c033bff commit 3d9d059

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

internal/topo/node/node_test/window_inc_agg_op_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func TestIncAggAlignTumblingWindow(t *testing.T) {
261261
time.Sleep(10 * time.Millisecond)
262262
to, ok := op.WindowExec.(*node.TumblingWindowIncAggOp)
263263
require.True(t, ok)
264-
require.NotNil(t, to.FirstTimer)
264+
require.NotNil(t, to.GetFirstTimer())
265265
}
266266

267267
func TestIncAggTumblingWindow(t *testing.T) {

internal/topo/node/window_inc_agg_op.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,9 @@ func (to *TumblingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error
386386
if !EnableAlignWindow {
387387
to.ticker = timex.GetTicker(to.Interval)
388388
} else {
389+
to.mu.Lock()
389390
_, to.FirstTimer = getFirstTimer(ctx, to.windowConfig.RawInterval, to.windowConfig.TimeUnit)
391+
to.mu.Unlock()
390392
if to.CurrWindow == nil {
391393
to.CurrWindow = newIncAggWindow(ctx, now)
392394
}
@@ -398,9 +400,9 @@ func (to *TumblingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error
398400
case <-ctx.Done():
399401
return
400402
case now := <-to.FirstTimer.C:
403+
to.mu.Lock()
401404
to.FirstTimer.Stop()
402405
to.FirstTimer = nil
403-
to.mu.Lock()
404406
if to.CurrWindow != nil {
405407
to.emit(ctx, errCh, now)
406408
}
@@ -465,6 +467,12 @@ outer:
465467
}
466468
}
467469

470+
func (to *TumblingWindowIncAggOp) GetFirstTimer() *clock.Timer {
471+
to.mu.Lock()
472+
defer to.mu.Unlock()
473+
return to.FirstTimer
474+
}
475+
468476
func (to *TumblingWindowIncAggOp) emit(ctx api.StreamContext, errCh chan<- error, now time.Time) {
469477
results := &xsql.WindowTuples{
470478
Content: make([]xsql.Row, 0),
@@ -733,7 +741,9 @@ func (ho *HoppingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error)
733741
ho.ticker = timex.GetTicker(ho.Interval)
734742
ho.newIncWindow(ctx, now)
735743
} else {
744+
ho.mu.Lock()
736745
_, ho.FirstTimer = getFirstTimer(ctx, ho.windowConfig.RawInterval, ho.windowConfig.TimeUnit)
746+
ho.mu.Unlock()
737747
ho.CurrWindowList = append(ho.CurrWindowList, newIncAggWindow(ctx, now))
738748
}
739749
fv, _ := xsql.NewFunctionValuersForOp(ctx)
@@ -770,9 +780,9 @@ func (ho *HoppingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error)
770780
case <-ctx.Done():
771781
return
772782
case now := <-ho.FirstTimer.C:
783+
ho.mu.Lock()
773784
ho.FirstTimer.Stop()
774785
ho.FirstTimer = nil
775-
ho.mu.Lock()
776786
ho.CurrWindowList = gcIncAggWindow(ho.CurrWindowList, ho.Length, now)
777787
ho.newIncWindow(ctx, now)
778788
ho.CurrWindowList = gcIncAggWindow(ho.CurrWindowList, ho.Length, now)

0 commit comments

Comments
 (0)