Skip to content

Commit f3cb876

Browse files
authored
fix: fix BatchAbleSink configure (#3515)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 85a31af commit f3cb876

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

internal/topo/node/sink_node.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package node
1717
import (
1818
"fmt"
1919
"sync"
20+
"time"
2021

2122
"github.com/lf-edge/ekuiper/internal/binder/io"
2223
"github.com/lf-edge/ekuiper/internal/conf"
@@ -554,6 +555,11 @@ func resendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{},
554555
}
555556
}
556557

558+
type batchConf struct {
559+
BatchSize int `json:"batchSize"`
560+
LingerInterval time.Duration `json:"lingerInterval"`
561+
}
562+
557563
func getSink(name string, action map[string]interface{}) (api.Sink, error) {
558564
var (
559565
s api.Sink
@@ -566,6 +572,11 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
566572
if err != nil {
567573
return nil, err
568574
}
575+
if bas, ok := s.(api.BatchAbleSink); ok {
576+
bc := batchConf{}
577+
cast.MapToStruct(newAction, &bc)
578+
bas.ConfigureBatch(bc.BatchSize, bc.LingerInterval)
579+
}
569580
return s, nil
570581
} else {
571582
if err != nil {

internal/topo/planner/sink_planner.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package planner
1616

1717
import (
1818
"fmt"
19-
"time"
2019

2120
"github.com/lf-edge/ekuiper/internal/binder/io"
2221
"github.com/lf-edge/ekuiper/internal/conf"
@@ -67,8 +66,7 @@ func fulfillProps(rule *api.Rule, props map[string]any) map[string]any {
6766

6867
// Split sink node according to the sink configuration. Return the new input emitters.
6968
func splitSink(sink api.Sink, tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) {
70-
if bas, ok := sink.(api.BatchAbleSink); ok {
71-
bas.ConfigureBatch(sc.BatchSize, time.Duration(sc.LingerInterval))
69+
if _, ok := sink.(api.BatchAbleSink); ok {
7270
return inputs, nil
7371
}
7472
index := 0

0 commit comments

Comments
 (0)