@@ -21,13 +21,11 @@ import (
2121 "github.com/lf-edge/ekuiper/contract/v2/api"
2222
2323 "github.com/lf-edge/ekuiper/v2/internal/xsql"
24- "github.com/lf-edge/ekuiper/v2/pkg/syncx"
2524)
2625
2726type HoppingWindowIncAggEventOp struct {
2827 op * HoppingWindowIncAggOp
2928 HoppingWindowIncAggEventOpState
30- mu syncx.Mutex
3129}
3230
3331type HoppingWindowIncAggEventOpState struct {
@@ -43,8 +41,6 @@ func NewHoppingWindowIncAggEventOp(o *WindowIncAggOperator) *HoppingWindowIncAgg
4341}
4442
4543func (ho * HoppingWindowIncAggEventOp ) PutState (ctx api.StreamContext ) {
46- ho .mu .Lock ()
47- defer ho .mu .Unlock ()
4844 for index , window := range ho .CurrWindowList {
4945 window .GenerateAllFunctionState ()
5046 ho .CurrWindowList [index ] = window
@@ -64,8 +60,6 @@ func (ho *HoppingWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) er
6460 if ! ok {
6561 return fmt .Errorf ("not HoppingWindowIncAggEventOpState" )
6662 }
67- ho .mu .Lock ()
68- defer ho .mu .Unlock ()
6963 ho .HoppingWindowIncAggEventOpState = coState
7064 for index , window := range ho .CurrWindowList {
7165 window .restoreState (ctx )
@@ -92,18 +86,14 @@ func (ho *HoppingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e
9286 switch tuple := data .(type ) {
9387 case * xsql.WatermarkTuple :
9488 now := tuple .GetTimestamp ()
95- ho .mu .Lock ()
9689 ho .emitWindow (ctx , errCh , now )
9790 ho .CurrWindowList = gcIncAggWindow (ho .CurrWindowList , ho .op .Length , now )
98- ho .mu .Unlock ()
9991 ho .PutState (ctx )
10092 case * xsql.Tuple :
10193 ho .op .onProcessStart (ctx , data )
10294 now := tuple .GetTimestamp ()
103- ho .mu .Lock ()
10495 ho .triggerWindow (ctx , now )
10596 ho .calIncAggWindow (ctx , fv , tuple , tuple .GetTimestamp ())
106- ho .mu .Unlock ()
10797 ho .PutState (ctx )
10898 ho .op .onProcessEnd (ctx )
10999 }
@@ -148,7 +138,6 @@ func (ho *HoppingWindowIncAggEventOp) triggerWindow(ctx api.StreamContext, now t
148138type SlidingWindowIncAggEventOp struct {
149139 op * SlidingWindowIncAggOp
150140 SlidingWindowIncAggEventOpState
151- mu syncx.Mutex
152141}
153142
154143type SlidingWindowIncAggEventOpState struct {
@@ -165,8 +154,6 @@ func NewSlidingWindowIncAggEventOp(o *WindowIncAggOperator) *SlidingWindowIncAgg
165154}
166155
167156func (so * SlidingWindowIncAggEventOp ) PutState (ctx api.StreamContext ) {
168- so .mu .Lock ()
169- defer so .mu .Unlock ()
170157 for index , window := range so .CurrWindowList {
171158 window .GenerateAllFunctionState ()
172159 so .CurrWindowList [index ] = window
@@ -190,8 +177,6 @@ func (so *SlidingWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) er
190177 if ! ok {
191178 return fmt .Errorf ("not SlidingWindowIncAggEventOpState" )
192179 }
193- so .mu .Lock ()
194- defer so .mu .Unlock ()
195180 so .SlidingWindowIncAggEventOpState = soState
196181 for index , window := range so .CurrWindowList {
197182 window .GenerateAllFunctionState ()
@@ -222,24 +207,18 @@ func (so *SlidingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e
222207 switch tuple := data .(type ) {
223208 case * xsql.WatermarkTuple :
224209 now := tuple .GetTimestamp ()
225- so .mu .Lock ()
226210 so .emitList (ctx , errCh , now )
227211 so .CurrWindowList = gcIncAggWindow (so .CurrWindowList , so .op .Length , now )
228- so .mu .Unlock ()
229212 so .PutState (ctx )
230213 case * xsql.Tuple :
231214 so .op .onProcessStart (ctx , tuple )
232215 if so .op .Delay > 0 {
233- so .mu .Lock ()
234216 so .appendDelayIncAggWindowInEvent (ctx , errCh , fv , tuple )
235- so .mu .Unlock ()
236217 so .PutState (ctx )
237218 so .op .onProcessEnd (ctx )
238219 continue
239220 }
240- so .mu .Lock ()
241221 so .appendIncAggWindowInEvent (ctx , errCh , fv , tuple )
242- so .mu .Unlock ()
243222 so .PutState (ctx )
244223 so .op .onProcessEnd (ctx )
245224 }
@@ -349,7 +328,6 @@ func (o *WindowIncAggOperator) ingest(ctx api.StreamContext, item any) (any, boo
349328type CountWindowIncAggEventOp struct {
350329 op * CountWindowIncAggOp
351330 CountWindowIncAggEventOpState
352- mu syncx.Mutex
353331}
354332
355333type CountWindowIncAggEventOpState struct {
@@ -388,7 +366,6 @@ func (co *CountWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- err
388366 case * xsql.WatermarkTuple :
389367 now := tuple .GetTimestamp ()
390368 var index int
391- co .mu .Lock ()
392369 for i , window := range co .EmitList {
393370 if window .StartTime .Compare (now ) <= 0 {
394371 co .emitWindow (ctx , errCh , window , now )
@@ -402,12 +379,10 @@ func (co *CountWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- err
402379 } else {
403380 co .EmitList = co .EmitList [index + 1 :]
404381 }
405- co .mu .Unlock ()
406382 co .PutState (ctx )
407383 case * xsql.Tuple :
408384 co .op .onProcessStart (ctx , tuple )
409385 now := tuple .GetTimestamp ()
410- co .mu .Lock ()
411386 if co .CurrWindow == nil {
412387 co .CurrWindow = newIncAggWindow (ctx , now )
413388 }
@@ -418,7 +393,6 @@ func (co *CountWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- err
418393 co .EmitList = append (co .EmitList , co .CurrWindow )
419394 co .CurrWindow = nil
420395 }
421- co .mu .Unlock ()
422396 co .PutState (ctx )
423397 co .op .onProcessEnd (ctx )
424398 }
@@ -442,8 +416,6 @@ func (co *CountWindowIncAggEventOp) emitWindow(ctx api.StreamContext, errCh chan
442416}
443417
444418func (co * CountWindowIncAggEventOp ) PutState (ctx api.StreamContext ) {
445- co .mu .Lock ()
446- defer co .mu .Unlock ()
447419 co .CurrWindow .GenerateAllFunctionState ()
448420 ctx .PutState (buildStateKey (ctx ), co .CountWindowIncAggEventOpState )
449421}
@@ -460,8 +432,6 @@ func (co *CountWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) erro
460432 if ! ok {
461433 return fmt .Errorf ("not CountWindowIncAggEventOpState" )
462434 }
463- co .mu .Lock ()
464- defer co .mu .Unlock ()
465435 co .CountWindowIncAggEventOpState = coState
466436 co .CurrWindow .restoreState (ctx )
467437 return nil
0 commit comments