Skip to content

Commit 211b9cd

Browse files
authored
feat: support state window (#3757)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent a0c9a88 commit 211b9cd

File tree

9 files changed

+174
-6
lines changed

9 files changed

+174
-6
lines changed

docs/en_US/sqls/windows.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ A session window begins when the first event occurs. If another event occurs wit
7272

7373
If events keep occurring within the specified timeout, the session window will keep extending until maximum duration is reached. The maximum duration checking intervals are set to be the same size as the specified max duration. For example, if the max duration is 10, then the checks on if the window exceed maximum duration will happen at t = 0, 10, 20, 30, etc.
7474

75+
## Conditional state window
76+
77+
The conditional state window does not focus on time, but only on the impact of each piece of data on the window state. It has two main parameters, the start window trigger condition and the send window trigger condition.
78+
79+
```sql
80+
SELECT * from demo group by statewindow(a > 1, a > 5)
81+
```
82+
83+
The conditional state window is initially in the untriggered state, at which point all data entering the window will be directly discarded. When the data entering the window meets the start window trigger condition, it changes from the untriggered state to the triggered state, at which point all data entering the window will be stored.
84+
When the conditional state window is in the triggered state, when the data entering the window meets the send window trigger condition, it changes from the triggered state to the untriggered state, and all data previously stored in the window will be sent as a window.
85+
7586
## Count window
7687

7788
Please notice that the count window does not concern time, it only concern about events count.

docs/zh_CN/sqls/windows.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ SELECT count(*) FROM demo GROUP BY ID, SESSIONWINDOW(mi, 2, 1);
7272

7373
如果事件在指定的超时时间内持续发生,则会话窗口将继续扩展直到达到最大持续时间。 最大持续时间检查间隔设置为与指定的最大持续时间相同的大小。 例如,如果最大持续时间为10,则检查窗口是否超过最大持续时间将在 t = 0、10、20、30等处进行。
7474

75+
## 条件状态窗口
76+
77+
条件状态窗口不关注时间,只关注每条数据对窗口状态的影响。他有两个主要参数, 开始窗口触发条件与发送窗口触发条件。
78+
79+
```sql
80+
SELECT * from demo group by statewindow(a > 1, a > 5)
81+
```
82+
83+
条件状态窗口最初是未触发状态,此时所有进入窗口的数据都会被直接丢弃。当进入窗口的数据满足开始窗口触发条件时,则从未触发状态转变为触发状态,此时所有进入窗口的数据都会被存储。
84+
条件状态窗口处于触发状态时,当进入窗口的数据满足发送窗口触发条件时,则从触发状态转变为未触发状态,且此前所有被存储在窗口的数据都会被作为一个窗口发送。
85+
7586
## 计数窗口
7687

7788
请注意计数窗口不关注时间,只关注事件发生的次数。

internal/topo/node/window_op.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
)
3737

3838
type WindowConfig struct {
39+
BeginCondition ast.Expr
40+
EmitCondition ast.Expr
3941
TriggerCondition ast.Expr
4042
StateFuncs []*ast.Call
4143
Type ast.WindowType

internal/topo/node/window_v2_op.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/lf-edge/ekuiper/v2/pkg/timex"
2828
)
2929

30+
var InfTime = time.Unix(1<<63-62135596801, 999999999)
31+
3032
type WindowV2Operator struct {
3133
*defaultSinkNode
3234
windowConfig WindowConfig
@@ -46,6 +48,8 @@ func NewWindowV2Op(name string, w WindowConfig, options *def.RuleOption) (*Windo
4648
} else {
4749
o.wExec = NewSlidingWindowOp(o)
4850
}
51+
case ast.STATE_WINDOW:
52+
o.wExec = NewStateWindowOp(o)
4953
default:
5054
return nil, fmt.Errorf("unsupported window type:%v", w.Type.String())
5155
}
@@ -87,6 +91,63 @@ type WindowV2Exec interface {
8791
exec(ctx api.StreamContext, errCh chan<- error)
8892
}
8993

94+
type StateWindowOp struct {
95+
*WindowV2Operator
96+
BeginCondition ast.Expr
97+
EmitCondition ast.Expr
98+
onBegin bool
99+
stateFuncs []*ast.Call
100+
}
101+
102+
func NewStateWindowOp(o *WindowV2Operator) *StateWindowOp {
103+
return &StateWindowOp{
104+
WindowV2Operator: o,
105+
BeginCondition: o.windowConfig.BeginCondition,
106+
EmitCondition: o.windowConfig.EmitCondition,
107+
stateFuncs: o.windowConfig.StateFuncs,
108+
}
109+
}
110+
111+
func (s *StateWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
112+
fv, _ := xsql.NewFunctionValuersForOp(ctx)
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
return
117+
case input := <-s.input:
118+
data, processed := s.commonIngest(ctx, input)
119+
if processed {
120+
continue
121+
}
122+
s.onProcessStart(ctx, input)
123+
switch row := data.(type) {
124+
case *xsql.Tuple:
125+
var canBegin bool
126+
var canEmit bool
127+
if !s.onBegin {
128+
canBegin = isMatchCondition(ctx, s.BeginCondition, fv, row, s.stateFuncs)
129+
if canBegin {
130+
s.onBegin = true
131+
}
132+
}
133+
if s.onBegin {
134+
s.scanner.addTuple(row)
135+
canEmit = isMatchCondition(ctx, s.EmitCondition, fv, row, s.stateFuncs)
136+
}
137+
if s.onBegin && canEmit {
138+
s.emitWindow(ctx, time.Time{}, InfTime)
139+
s.scanner.gc(InfTime)
140+
s.onBegin = false
141+
}
142+
if canBegin && !s.onBegin {
143+
s.onBegin = true
144+
}
145+
}
146+
s.onProcessEnd(ctx)
147+
}
148+
}
149+
}
150+
90151
type SlidingWindowOp struct {
91152
*WindowV2Operator
92153
Delay time.Duration
@@ -154,13 +215,13 @@ func (s *SlidingWindowOp) exec(ctx api.StreamContext, errCh chan<- error) {
154215
}
155216
}
156217

157-
func isMatchCondition(ctx api.StreamContext, triggerCondition ast.Expr, fv *xsql.FunctionValuer, d *xsql.Tuple, stateFuncs []*ast.Call) bool {
158-
if triggerCondition == nil {
218+
func isMatchCondition(ctx api.StreamContext, condition ast.Expr, fv *xsql.FunctionValuer, d *xsql.Tuple, stateFuncs []*ast.Call) bool {
219+
if condition == nil {
159220
return true
160221
}
161222
log := ctx.GetLogger()
162223
ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
163-
result := ve.Eval(triggerCondition)
224+
result := ve.Eval(condition)
164225
// not match trigger condition
165226
if result == nil {
166227
return false

internal/topo/node/window_v2_op_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,59 @@ func init() {
3737
testx.InitEnv("node_test")
3838
}
3939

40+
func TestStateWindow(t *testing.T) {
41+
conf.IsTesting = true
42+
now := time.Now()
43+
o := &def.RuleOption{
44+
BufferLength: 10,
45+
}
46+
kv, err := store.GetKV("stream")
47+
require.NoError(t, err)
48+
require.NoError(t, prepareStream())
49+
sql := "select count(*) from stream group by statewindow(a>1,a >5)"
50+
stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse()
51+
require.NoError(t, err)
52+
p, err := planner.CreateLogicalPlan(stmt, o, kv)
53+
require.NoError(t, err)
54+
require.NotNil(t, p)
55+
windowPlan := extractWindowPlan(p)
56+
require.NotNil(t, windowPlan)
57+
op, err := node.NewWindowV2Op("window", node.WindowConfig{
58+
Type: windowPlan.WindowType(),
59+
BeginCondition: windowPlan.GetBeginCondition(),
60+
EmitCondition: windowPlan.GetEmitCondition(),
61+
}, o)
62+
require.NoError(t, err)
63+
require.NotNil(t, op)
64+
input, _ := op.GetInput()
65+
output := make(chan any, 10)
66+
op.AddOutput(output, "output")
67+
errCh := make(chan error, 10)
68+
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
69+
op.Exec(ctx, errCh)
70+
waitExecute()
71+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}, Timestamp: now}
72+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(2)}, Timestamp: now.Add(500 * time.Millisecond)}
73+
input <- &xsql.Tuple{Message: map[string]any{"a": int64(6)}, Timestamp: now.Add(500 * time.Millisecond)}
74+
waitExecute()
75+
got := <-output
76+
wt, ok := got.(*xsql.WindowTuples)
77+
require.True(t, ok)
78+
require.NotNil(t, wt)
79+
d := wt.ToMaps()
80+
require.Equal(t, []map[string]any{
81+
{
82+
"a": int64(2),
83+
},
84+
{
85+
"a": int64(6),
86+
},
87+
}, d)
88+
cancel()
89+
waitExecute()
90+
op.Close()
91+
}
92+
4093
func TestWindowV2SlidingWindowDelay(t *testing.T) {
4194
conf.IsTesting = true
4295
now := time.Now()

internal/topo/planner/planner.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources ma
409409
RawInterval: rawInterval,
410410
TimeUnit: t.timeUnit,
411411
TriggerCondition: t.triggerCondition,
412+
BeginCondition: t.beginCondition,
413+
EmitCondition: t.emitCondition,
412414
StateFuncs: t.stateFuncs,
413415
}
414416
if options.PlanOptimizeStrategy.GetWindowVersion() == "v2" {
@@ -614,10 +616,14 @@ func createLogicalPlanFull(stmt *ast.SelectStatement, opt *def.RuleOption, store
614616
p = incWp
615617
} else {
616618
wp := WindowPlan{
617-
wtype: w.WindowType,
618-
length: int(w.Length.Val),
619-
isEventTime: opt.IsEventTime,
619+
wtype: w.WindowType,
620+
isEventTime: opt.IsEventTime,
621+
beginCondition: w.BeginCondition,
622+
emitCondition: w.EmitCondition,
620623
}.Init()
624+
if w.Length != nil {
625+
wp.length = int(w.Length.Val)
626+
}
621627
if w.Delay != nil {
622628
wp.delay = w.Delay.Val
623629
}

internal/topo/planner/windowPlan.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
type WindowPlan struct {
2626
baseLogicalPlan
27+
beginCondition ast.Expr
28+
emitCondition ast.Expr
2729
triggerCondition ast.Expr
2830
condition ast.Expr
2931
wtype ast.WindowType
@@ -51,6 +53,14 @@ func (p *WindowPlan) GetTriggerCondition() ast.Expr {
5153
return p.triggerCondition
5254
}
5355

56+
func (p *WindowPlan) GetBeginCondition() ast.Expr {
57+
return p.beginCondition
58+
}
59+
60+
func (p *WindowPlan) GetEmitCondition() ast.Expr {
61+
return p.emitCondition
62+
}
63+
5464
func (p *WindowPlan) BuildExplainInfo() {
5565
t := p.wtype.String()
5666
info := "{ length:" + strconv.Itoa(p.length) + ", "

internal/xsql/parser.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,7 @@ var WindowFuncs = map[string]struct{}{
873873
"sessionwindow": {},
874874
"slidingwindow": {},
875875
"countwindow": {},
876+
"statewindow": {},
876877
"dedup_trigger": {},
877878
}
878879

@@ -1045,6 +1046,11 @@ loop:
10451046

10461047
func validateWindows(fname string, args []ast.Expr) (ast.WindowType, error) {
10471048
switch fname {
1049+
case "statewindow":
1050+
if len(args) != 2 {
1051+
return ast.STATE_WINDOW, fmt.Errorf("The arguments for %s should be %d.\n", fname, 2)
1052+
}
1053+
return ast.STATE_WINDOW, nil
10481054
case "tumblingwindow":
10491055
if err := validateWindow(fname, 2, args); err != nil {
10501056
return ast.TUMBLING_WINDOW, err
@@ -1112,6 +1118,11 @@ func validateWindow(funcName string, expectLen int, args []ast.Expr) error {
11121118

11131119
func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error) {
11141120
win := &ast.Window{WindowType: wtype}
1121+
if wtype == ast.STATE_WINDOW {
1122+
win.BeginCondition = args[0]
1123+
win.EmitCondition = args[1]
1124+
return win, nil
1125+
}
11151126
if wtype == ast.COUNT_WINDOW {
11161127
win.Length = &ast.IntegerLiteral{Val: args[0].(*ast.IntegerLiteral).Val}
11171128
if len(args) == 2 {

pkg/ast/statement.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ const (
189189
SLIDING_WINDOW
190190
SESSION_WINDOW
191191
COUNT_WINDOW
192+
STATE_WINDOW
192193
)
193194

194195
func (w WindowType) String() string {
@@ -211,6 +212,8 @@ func (w WindowType) String() string {
211212

212213
type Window struct {
213214
TriggerCondition Expr
215+
BeginCondition Expr
216+
EmitCondition Expr
214217
WindowType WindowType
215218
Delay *IntegerLiteral
216219
Length *IntegerLiteral

0 commit comments

Comments
 (0)