Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions internal/topo/node/node_test/window_inc_agg_event_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/testx"
"github.com/lf-edge/ekuiper/v2/internal/topo/node"
"github.com/lf-edge/ekuiper/v2/internal/topo/planner"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
Expand All @@ -33,9 +32,6 @@ import (
)

func TestIncEventHoppingWindowState(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
o := &def.RuleOption{
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
Expand Down
46 changes: 8 additions & 38 deletions internal/topo/node/node_test/window_inc_agg_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ func init() {
}

func TestWindowState(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
node.EnableAlignWindow = false
o := &def.RuleOption{
Expand Down Expand Up @@ -91,7 +88,7 @@ func TestWindowState(t *testing.T) {
time.Sleep(10 * time.Millisecond)
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}}
time.Sleep(10 * time.Millisecond)
op.WindowExec.PutState(ctx)
require.NoError(t, op.PutState4Test(ctx))

op2, err := node.NewWindowIncAggOp("1", &node.WindowConfig{
Type: incPlan.WType,
Expand All @@ -101,17 +98,14 @@ func TestWindowState(t *testing.T) {
require.NotNil(t, op2)
op2.Exec(ctx, errCh)
time.Sleep(10 * time.Millisecond)
require.NoError(t, op2.WindowExec.RestoreFromState(ctx))
require.NoError(t, op2.RestoreFromState4Test(ctx))
cancel()
op.Close()
op2.Close()
}
}

func TestIncAggCountWindowState(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
o := &def.RuleOption{
BufferLength: 10,
}
Expand Down Expand Up @@ -177,9 +171,6 @@ func TestIncAggCountWindowState(t *testing.T) {
}

func TestIncAggWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
o := &def.RuleOption{
BufferLength: 10,
}
Expand Down Expand Up @@ -231,9 +222,6 @@ func TestIncAggWindow(t *testing.T) {
}

func TestIncAggAlignTumblingWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
node.EnableAlignWindow = true
o := &def.RuleOption{
Expand Down Expand Up @@ -271,15 +259,12 @@ func TestIncAggAlignTumblingWindow(t *testing.T) {
}()
op.Exec(ctx, errCh)
time.Sleep(10 * time.Millisecond)
to, ok := op.WindowExec.(*node.TumblingWindowIncAggOp)
require.True(t, ok)
require.NotNil(t, to.FirstTimer)
require.Eventually(t, func() bool {
return op.FirstTimerCreated4Test()
}, time.Second, 10*time.Millisecond)
}

func TestIncAggTumblingWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
node.EnableAlignWindow = false
o := &def.RuleOption{
Expand Down Expand Up @@ -334,9 +319,6 @@ func TestIncAggTumblingWindow(t *testing.T) {
}

func TestIncAggSlidingWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
o := &def.RuleOption{
BufferLength: 10,
Expand Down Expand Up @@ -400,9 +382,6 @@ func TestIncAggSlidingWindow(t *testing.T) {
}

func TestIncAggSlidingWindowOver(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
o := &def.RuleOption{
BufferLength: 10,
Expand Down Expand Up @@ -456,9 +435,6 @@ func TestIncAggSlidingWindowOver(t *testing.T) {
}

func TestIncAggSlidingWindowDelay(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
o := &def.RuleOption{
BufferLength: 10,
Expand Down Expand Up @@ -548,9 +524,6 @@ func waitExecute() {
}

func TestIncHoppingWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
node.EnableAlignWindow = false
o := &def.RuleOption{
Expand Down Expand Up @@ -608,9 +581,6 @@ func TestIncHoppingWindow(t *testing.T) {
}

func TestIncAggAlignHoppingWindow(t *testing.T) {
if testx.Race {
t.Skip("skip race test")
}
conf.IsTesting = true
node.EnableAlignWindow = true
o := &def.RuleOption{
Expand Down Expand Up @@ -649,9 +619,9 @@ func TestIncAggAlignHoppingWindow(t *testing.T) {
}()
op.Exec(ctx, errCh)
time.Sleep(10 * time.Millisecond)
ho, ok := op.WindowExec.(*node.HoppingWindowIncAggOp)
require.True(t, ok)
require.NotNil(t, ho.FirstTimer)
require.Eventually(t, func() bool {
return op.FirstTimerCreated4Test()
}, time.Second, 10*time.Millisecond)
}

func extractIncWindowPlan(cur planner.LogicalPlan) *planner.IncWindowPlan {
Expand Down
15 changes: 15 additions & 0 deletions internal/topo/node/window_inc_agg_event_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (ho *HoppingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e
select {
case <-ctx.Done():
return
case done := <-ho.op.putStateReqCh:
ho.PutState(ctx)
done <- nil
case done := <-ho.op.restoreReqCh:
done <- ho.RestoreFromState(ctx)
case input := <-ho.op.input:
data, processed := ho.op.ingest(ctx, input)
if processed {
Expand Down Expand Up @@ -199,6 +204,11 @@ func (so *SlidingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e
select {
case <-ctx.Done():
return
case done := <-so.op.putStateReqCh:
so.PutState(ctx)
done <- nil
case done := <-so.op.restoreReqCh:
done <- so.RestoreFromState(ctx)
case input := <-so.op.input:
data, processed := so.op.ingest(ctx, input)
if processed {
Expand Down Expand Up @@ -357,6 +367,11 @@ func (co *CountWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- err
select {
case <-ctx.Done():
return
case done := <-co.op.putStateReqCh:
co.PutState(ctx)
done <- nil
case done := <-co.op.restoreReqCh:
done <- co.RestoreFromState(ctx)
case input := <-co.op.input:
data, processed := co.op.ingest(ctx, input)
if processed {
Expand Down
84 changes: 84 additions & 0 deletions internal/topo/node/window_inc_agg_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package node

import (
"context"
"encoding/gob"
"fmt"
"sync"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -54,6 +56,12 @@ type WindowIncAggOperator struct {
Dimensions ast.Dimensions
aggFields []*ast.Field
WindowExec windowIncAggExec

putStateReqCh chan chan error
restoreReqCh chan chan error

firstTimerMu sync.Mutex
firstTimerCreated bool
}

func NewWindowIncAggOp(name string, w *WindowConfig, dimensions ast.Dimensions, aggFields []*ast.Field, options *def.RuleOption) (*WindowIncAggOperator, error) {
Expand All @@ -62,6 +70,8 @@ func NewWindowIncAggOp(name string, w *WindowConfig, dimensions ast.Dimensions,
o.windowConfig = w
o.Dimensions = dimensions
o.aggFields = aggFields
o.putStateReqCh = make(chan chan error, 2)
o.restoreReqCh = make(chan chan error, 2)
switch w.Type {
case ast.COUNT_WINDOW:
if options.IsEventTime {
Expand Down Expand Up @@ -121,6 +131,49 @@ func (o *WindowIncAggOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
}()
}

func (o *WindowIncAggOperator) PutState4Test(ctx context.Context) error {
return o.execStateCall4Test(ctx, o.putStateReqCh)
}

func (o *WindowIncAggOperator) RestoreFromState4Test(ctx context.Context) error {
return o.execStateCall4Test(ctx, o.restoreReqCh)
}

func (o *WindowIncAggOperator) FirstTimerCreated4Test() bool {
o.firstTimerMu.Lock()
defer o.firstTimerMu.Unlock()
return o.firstTimerCreated
}

func (o *WindowIncAggOperator) markFirstTimerCreated() {
o.firstTimerMu.Lock()
o.firstTimerCreated = true
o.firstTimerMu.Unlock()
}

func (o *WindowIncAggOperator) execStateCall4Test(ctx context.Context, reqCh chan chan error) error {
if ctx == nil {
ctx = context.Background()
}
done := make(chan error, 1)
const timeout = 5 * time.Second
select {
case reqCh <- done:
case <-ctx.Done():
return ctx.Err()
case <-time.After(timeout):
return context.DeadlineExceeded
}
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
case <-time.After(timeout):
return context.DeadlineExceeded
}
}

type windowIncAggExec interface {
exec(ctx api.StreamContext, errCh chan<- error)
PutState(ctx api.StreamContext)
Expand Down Expand Up @@ -246,6 +299,11 @@ func (co *CountWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error) {
select {
case <-ctx.Done():
return
case done := <-co.putStateReqCh:
co.PutState(ctx)
done <- nil
case done := <-co.restoreReqCh:
done <- co.RestoreFromState(ctx)
case input := <-co.input:
now := timex.GetNow()
data, processed := co.commonIngest(ctx, input)
Expand Down Expand Up @@ -370,6 +428,9 @@ func (to *TumblingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error
to.ticker = timex.GetTicker(to.Interval)
} else {
_, to.FirstTimer = getFirstTimer(ctx, to.windowConfig.RawInterval, to.windowConfig.TimeUnit)
if to.FirstTimer != nil {
to.markFirstTimerCreated()
}
if to.CurrWindow == nil {
to.CurrWindow = newIncAggWindow(ctx, now)
}
Expand All @@ -380,6 +441,11 @@ func (to *TumblingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error
select {
case <-ctx.Done():
return
case done := <-to.putStateReqCh:
to.PutState(ctx)
done <- nil
case done := <-to.restoreReqCh:
done <- to.RestoreFromState(ctx)
case now := <-to.FirstTimer.C:
to.FirstTimer.Stop()
to.FirstTimer = nil
Expand Down Expand Up @@ -414,6 +480,11 @@ outer:
select {
case <-ctx.Done():
return
case done := <-to.putStateReqCh:
to.PutState(ctx)
done <- nil
case done := <-to.restoreReqCh:
done <- to.RestoreFromState(ctx)
case input := <-to.input:
now := timex.GetNow()
data, processed := to.commonIngest(ctx, input)
Expand Down Expand Up @@ -525,6 +596,11 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error)
select {
case <-ctx.Done():
return
case done := <-so.putStateReqCh:
so.PutState(ctx)
done <- nil
case done := <-so.restoreReqCh:
done <- so.RestoreFromState(ctx)
case input := <-so.input:
now := timex.GetNow()
data, processed := so.commonIngest(ctx, input)
Expand Down Expand Up @@ -695,13 +771,21 @@ func (ho *HoppingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error)
ho.newIncWindow(ctx, now)
} else {
_, ho.FirstTimer = getFirstTimer(ctx, ho.windowConfig.RawInterval, ho.windowConfig.TimeUnit)
if ho.FirstTimer != nil {
ho.markFirstTimerCreated()
}
ho.CurrWindowList = append(ho.CurrWindowList, newIncAggWindow(ctx, now))
}
fv, _ := xsql.NewFunctionValuersForOp(ctx)
for {
select {
case <-ctx.Done():
return
case done := <-ho.putStateReqCh:
ho.PutState(ctx)
done <- nil
case done := <-ho.restoreReqCh:
done <- ho.RestoreFromState(ctx)
case task := <-ho.taskCh:
now := timex.GetNow()
ho.emit(ctx, errCh, task.window, now)
Expand Down
Loading