Skip to content

Commit e8681d6

Browse files
authored
feat(window): support single condition state window (#3836)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 5cea33f commit e8681d6

File tree

7 files changed

+163
-33
lines changed

7 files changed

+163
-33
lines changed

internal/topo/node/window_op.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import (
3636
)
3737

3838
type WindowConfig struct {
39-
BeginCondition ast.Expr
40-
EmitCondition ast.Expr
4139
TriggerCondition ast.Expr
4240
StateFuncs []*ast.Call
4341
Type ast.WindowType
@@ -53,6 +51,11 @@ type WindowConfig struct {
5351

5452
// For SlidingWindow
5553
enableSlidingWindowSendTwice bool
54+
55+
// For state window
56+
SingleCondition ast.Expr
57+
BeginCondition ast.Expr
58+
EmitCondition ast.Expr
5659
}
5760

5861
type WindowOperator struct {

internal/topo/node/window_v2_op.go

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,38 @@ type WindowV2Exec interface {
9393

9494
type StateWindowOp struct {
9595
*WindowV2Operator
96-
BeginCondition ast.Expr
97-
EmitCondition ast.Expr
98-
onBegin bool
99-
stateFuncs []*ast.Call
96+
StartTime time.Time
97+
EndTime time.Time
98+
SingleCondition ast.Expr
99+
BeginCondition ast.Expr
100+
EmitCondition ast.Expr
101+
onBegin bool
102+
stateFuncs []*ast.Call
100103
}
101104

102105
func NewStateWindowOp(o *WindowV2Operator) *StateWindowOp {
103106
return &StateWindowOp{
104107
WindowV2Operator: o,
105108
BeginCondition: o.windowConfig.BeginCondition,
106109
EmitCondition: o.windowConfig.EmitCondition,
110+
SingleCondition: o.windowConfig.SingleCondition,
107111
stateFuncs: o.windowConfig.StateFuncs,
108112
}
109113
}
110114

115+
func (s *StateWindowOp) emit(ctx api.StreamContext, startTime, endTime time.Time) {
116+
tuples := s.scanner.scanWindow(time.Time{}, InfTime)
117+
results := &xsql.WindowTuples{
118+
Content: make([]xsql.Row, 0),
119+
}
120+
for _, tuple := range tuples {
121+
results.Content = append(results.Content, tuple)
122+
}
123+
results.WindowRange = xsql.NewWindowRange(startTime.UnixMilli(), endTime.UnixMilli(), endTime.UnixMilli())
124+
s.Broadcast(results)
125+
s.onSend(ctx, results)
126+
}
127+
111128
func (s *StateWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
112129
fv, _ := xsql.NewFunctionValuersForOp(ctx)
113130
for {
@@ -122,27 +139,60 @@ func (s *StateWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
122139
s.onProcessStart(ctx, input)
123140
switch row := data.(type) {
124141
case *xsql.Tuple:
125-
if !s.onBegin {
126-
canBegin := isMatchCondition(ctx, s.BeginCondition, fv, row, s.stateFuncs)
127-
if canBegin {
128-
s.onBegin = true
129-
s.scanner.addTuple(row)
130-
}
131-
} else {
132-
s.scanner.addTuple(row)
133-
canEmit := isMatchCondition(ctx, s.EmitCondition, fv, row, s.stateFuncs)
134-
if canEmit {
135-
s.emitWindow(ctx, time.Time{}, InfTime)
136-
s.scanner.gc(InfTime)
137-
s.onBegin = false
138-
}
142+
if s.BeginCondition != nil && s.EmitCondition != nil {
143+
s.handleTupleWithBeginEmitCondition(ctx, fv, row)
144+
} else if s.SingleCondition != nil {
145+
s.handleTupleWithSingleCondition(ctx, fv, row)
139146
}
140147
}
141148
s.onProcessEnd(ctx)
142149
}
143150
}
144151
}
145152

153+
func (s *StateWindowOp) handleTupleWithBeginEmitCondition(ctx api.StreamContext, fv *xsql.FunctionValuer, row *xsql.Tuple) {
154+
if !s.onBegin {
155+
canBegin := isMatchCondition(ctx, s.BeginCondition, fv, row, s.stateFuncs)
156+
if canBegin {
157+
s.StartTime = row.Timestamp
158+
s.onBegin = true
159+
s.scanner.addTuple(row)
160+
}
161+
} else {
162+
s.scanner.addTuple(row)
163+
canEmit := isMatchCondition(ctx, s.EmitCondition, fv, row, s.stateFuncs)
164+
if canEmit {
165+
s.EndTime = row.Timestamp
166+
s.emit(ctx, s.StartTime, s.EndTime)
167+
s.scanner.gc(InfTime)
168+
s.onBegin = false
169+
}
170+
}
171+
}
172+
173+
func (s *StateWindowOp) handleTupleWithSingleCondition(ctx api.StreamContext, fv *xsql.FunctionValuer, row *xsql.Tuple) {
174+
if !s.onBegin {
175+
canBegin := isMatchCondition(ctx, s.SingleCondition, fv, row, s.stateFuncs)
176+
if canBegin {
177+
s.StartTime = row.Timestamp
178+
s.onBegin = true
179+
s.scanner.addTuple(row)
180+
}
181+
} else {
182+
canEmit := isMatchCondition(ctx, s.SingleCondition, fv, row, s.stateFuncs)
183+
if canEmit {
184+
s.EndTime = row.Timestamp
185+
s.emit(ctx, s.StartTime, s.EndTime)
186+
s.scanner.gc(InfTime)
187+
s.onBegin = true
188+
s.scanner.addTuple(row)
189+
s.StartTime = row.Timestamp
190+
} else {
191+
s.scanner.addTuple(row)
192+
}
193+
}
194+
}
195+
146196
type SlidingWindowOp struct {
147197
*WindowV2Operator
148198
Delay time.Duration

internal/topo/node/window_v2_op_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,62 @@ func init() {
3737
testx.InitEnv("node_test")
3838
}
3939

40+
func TestSingleConditionStWindow(t *testing.T) {
41+
conf.IsTesting = true
42+
now := time.Now()
43+
o := &def.RuleOption{
44+
BufferLength: 10,
45+
}
46+
kv, err := store.GetKV("stream")
47+
require.NoError(t, err)
48+
require.NoError(t, prepareStream())
49+
sql := "select count(*) from stream group by statewindow(a)"
50+
stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse()
51+
require.NoError(t, err)
52+
p, err := planner.CreateLogicalPlan(stmt, o, kv)
53+
require.NoError(t, err)
54+
require.NotNil(t, p)
55+
windowPlan := extractWindowPlan(p)
56+
require.NotNil(t, windowPlan)
57+
op, err := node.NewWindowV2Op("window", node.WindowConfig{
58+
Type: windowPlan.WindowType(),
59+
SingleCondition: windowPlan.GetSingleCondition(),
60+
}, o)
61+
require.NoError(t, err)
62+
require.NotNil(t, op)
63+
input, _ := op.GetInput()
64+
output := make(chan any, 10)
65+
op.AddOutput(output, "output")
66+
errCh := make(chan error, 10)
67+
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
68+
op.Exec(ctx, errCh)
69+
waitExecute()
70+
input <- &xsql.Tuple{Message: map[string]any{"a": true}, Timestamp: now}
71+
input <- &xsql.Tuple{Message: map[string]any{"a": false}, Timestamp: now.Add(500 * time.Millisecond)}
72+
input <- &xsql.Tuple{Message: map[string]any{"a": false}, Timestamp: now.Add(500 * time.Millisecond)}
73+
input <- &xsql.Tuple{Message: map[string]any{"a": true}, Timestamp: now.Add(500 * time.Millisecond)}
74+
waitExecute()
75+
got := <-output
76+
wt, ok := got.(*xsql.WindowTuples)
77+
require.True(t, ok)
78+
require.NotNil(t, wt)
79+
d := wt.ToMaps()
80+
require.Equal(t, []map[string]any{
81+
{
82+
"a": true,
83+
},
84+
{
85+
"a": false,
86+
},
87+
{
88+
"a": false,
89+
},
90+
}, d)
91+
cancel()
92+
waitExecute()
93+
op.Close()
94+
}
95+
4096
func TestStateWindow(t *testing.T) {
4197
conf.IsTesting = true
4298
now := time.Now()

internal/topo/planner/planner.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -412,17 +412,26 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources ma
412412
TriggerCondition: t.triggerCondition,
413413
BeginCondition: t.beginCondition,
414414
EmitCondition: t.emitCondition,
415+
SingleCondition: t.singleCondition,
415416
StateFuncs: t.stateFuncs,
416417
}
417-
if options.PlanOptimizeStrategy.GetWindowVersion() == "v2" {
418+
// state window only support v2 window
419+
if wc.Type == ast.STATE_WINDOW {
418420
op, err = node.NewWindowV2Op(fmt.Sprintf("%d_window", newIndex), wc, options)
419421
if err != nil {
420422
return nil, 0, err
421423
}
422424
} else {
423-
op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), wc, options)
424-
if err != nil {
425-
return nil, 0, err
425+
if options.PlanOptimizeStrategy.GetWindowVersion() == "v2" {
426+
op, err = node.NewWindowV2Op(fmt.Sprintf("%d_window", newIndex), wc, options)
427+
if err != nil {
428+
return nil, 0, err
429+
}
430+
} else {
431+
op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), wc, options)
432+
if err != nil {
433+
return nil, 0, err
434+
}
426435
}
427436
}
428437
case *DedupTriggerPlan:
@@ -619,10 +628,11 @@ func createLogicalPlanFull(stmt *ast.SelectStatement, opt *def.RuleOption, store
619628
p = incWp
620629
} else {
621630
wp := WindowPlan{
622-
wtype: w.WindowType,
623-
isEventTime: opt.IsEventTime,
624-
beginCondition: w.BeginCondition,
625-
emitCondition: w.EmitCondition,
631+
wtype: w.WindowType,
632+
isEventTime: opt.IsEventTime,
633+
beginCondition: w.BeginCondition,
634+
emitCondition: w.EmitCondition,
635+
singleCondition: w.SingleCondition,
626636
}.Init()
627637
if w.Length != nil {
628638
wp.length = int(w.Length.Val)

internal/topo/planner/windowPlan.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
type WindowPlan struct {
2626
baseLogicalPlan
27+
singleCondition ast.Expr
2728
beginCondition ast.Expr
2829
emitCondition ast.Expr
2930
triggerCondition ast.Expr
@@ -57,6 +58,10 @@ func (p *WindowPlan) GetBeginCondition() ast.Expr {
5758
return p.beginCondition
5859
}
5960

61+
func (p *WindowPlan) GetSingleCondition() ast.Expr {
62+
return p.singleCondition
63+
}
64+
6065
func (p *WindowPlan) GetEmitCondition() ast.Expr {
6166
return p.emitCondition
6267
}

internal/xsql/parser.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,8 +1047,8 @@ loop:
10471047
func validateWindows(fname string, args []ast.Expr) (ast.WindowType, error) {
10481048
switch fname {
10491049
case "statewindow":
1050-
if len(args) != 2 {
1051-
return ast.STATE_WINDOW, fmt.Errorf("The arguments for %s should be %d.\n", fname, 2)
1050+
if len(args) != 2 && len(args) != 1 {
1051+
return ast.STATE_WINDOW, fmt.Errorf("The arguments for %s should be 1 or 2.\n", fname)
10521052
}
10531053
return ast.STATE_WINDOW, nil
10541054
case "tumblingwindow":
@@ -1119,9 +1119,14 @@ func validateWindow(funcName string, expectLen int, args []ast.Expr) error {
11191119
func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error) {
11201120
win := &ast.Window{WindowType: wtype}
11211121
if wtype == ast.STATE_WINDOW {
1122-
win.BeginCondition = args[0]
1123-
win.EmitCondition = args[1]
1124-
return win, nil
1122+
if len(args) == 2 {
1123+
win.BeginCondition = args[0]
1124+
win.EmitCondition = args[1]
1125+
return win, nil
1126+
} else if len(args) == 1 {
1127+
win.SingleCondition = args[0]
1128+
return win, nil
1129+
}
11251130
}
11261131
if wtype == ast.COUNT_WINDOW {
11271132
win.Length = &ast.IntegerLiteral{Val: args[0].(*ast.IntegerLiteral).Val}

pkg/ast/statement.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ func (w WindowType) String() string {
212212

213213
type Window struct {
214214
TriggerCondition Expr
215+
SingleCondition Expr
215216
BeginCondition Expr
216217
EmitCondition Expr
217218
WindowType WindowType

0 commit comments

Comments
 (0)