Skip to content

Commit 6430af1

Browse files
committed
fix: race in TumblingWindowIncAggOp FirstTimer
Protected FirstTimer access with mutex and added thread-safe getter for testing. Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent c033bff commit 6430af1

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

internal/topo/node/node_test/window_inc_agg_op_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func TestIncAggAlignTumblingWindow(t *testing.T) {
261261
time.Sleep(10 * time.Millisecond)
262262
to, ok := op.WindowExec.(*node.TumblingWindowIncAggOp)
263263
require.True(t, ok)
264-
require.NotNil(t, to.FirstTimer)
264+
require.NotNil(t, to.GetFirstTimer())
265265
}
266266

267267
func TestIncAggTumblingWindow(t *testing.T) {

internal/topo/node/window_inc_agg_op.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,9 @@ func (to *TumblingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error
386386
if !EnableAlignWindow {
387387
to.ticker = timex.GetTicker(to.Interval)
388388
} else {
389+
to.mu.Lock()
389390
_, to.FirstTimer = getFirstTimer(ctx, to.windowConfig.RawInterval, to.windowConfig.TimeUnit)
391+
to.mu.Unlock()
390392
if to.CurrWindow == nil {
391393
to.CurrWindow = newIncAggWindow(ctx, now)
392394
}
@@ -465,6 +467,12 @@ outer:
465467
}
466468
}
467469

470+
func (to *TumblingWindowIncAggOp) GetFirstTimer() *clock.Timer {
471+
to.mu.Lock()
472+
defer to.mu.Unlock()
473+
return to.FirstTimer
474+
}
475+
468476
func (to *TumblingWindowIncAggOp) emit(ctx api.StreamContext, errCh chan<- error, now time.Time) {
469477
results := &xsql.WindowTuples{
470478
Content: make([]xsql.Row, 0),

0 commit comments

Comments
 (0)