Skip to content

Commit 6f89352

Browse files
authored
fix(sink): sink bufferLength is not applied (#3608)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 77d23f9 commit 6f89352

File tree

3 files changed

+29
-19
lines changed

3 files changed

+29
-19
lines changed

internal/topo/node/sink_node.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"github.com/lf-edge/ekuiper/contract/v2/api"
2424

25-
"github.com/lf-edge/ekuiper/v2/internal/conf"
2625
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
2726
kctx "github.com/lf-edge/ekuiper/v2/internal/topo/context"
2827
"github.com/lf-edge/ekuiper/v2/internal/xsql"
@@ -50,15 +49,19 @@ type SinkNode struct {
5049
// 1. Set cache settings to enable diskCache
5150
// 2. Set resendInterval and bufferLength will use bufferLength as the memory cache
5251
// 3. By default, drop if it cannot sends out.
53-
func newSinkNode(ctx api.StreamContext, name string, rOpt def.RuleOption, eoflimit int, sc *conf.SinkConf, isRetry bool) *SinkNode {
52+
func newSinkNode(ctx api.StreamContext, name string, rOpt def.RuleOption, eoflimit int, sc *SinkConf, isRetry bool) *SinkNode {
5453
// set collect retry according to cache setting
5554
retry := time.Duration(sc.ResendInterval)
5655
if (sc.EnableCache || isRetry) && retry <= 0 {
5756
// default retry interval to 100ms
5857
retry = 100 * time.Millisecond
5958
}
6059
// Sink input channel as buffer
61-
rOpt.BufferLength = sc.MemoryCacheThreshold
60+
if isRetry || (sc.EnableCache && !sc.ResendAlterQueue) {
61+
rOpt.BufferLength = sc.MemoryCacheThreshold
62+
} else {
63+
rOpt.BufferLength = sc.BufferLength
64+
}
6265
ctx.GetLogger().Infof("create sink node %s with isRetry %v, resendInterval %d, bufferLength %d", name, isRetry, retry, rOpt.BufferLength)
6366
return &SinkNode{
6467
defaultSinkNode: newDefaultSinkNode(name, &rOpt),
@@ -191,7 +194,7 @@ func (s *SinkNode) ingest(ctx api.StreamContext, item any) (any, bool) {
191194
}
192195

193196
// NewBytesSinkNode creates a sink node that collects data from the stream. Do some static validation
194-
func NewBytesSinkNode(ctx api.StreamContext, name string, sink api.BytesCollector, rOpt def.RuleOption, eoflimit int, sc *conf.SinkConf, isRetry bool) (*SinkNode, error) {
197+
func NewBytesSinkNode(ctx api.StreamContext, name string, sink api.BytesCollector, rOpt def.RuleOption, eoflimit int, sc *SinkConf, isRetry bool) (*SinkNode, error) {
195198
ctx.GetLogger().Infof("create bytes sink node %s", name)
196199
n := newSinkNode(ctx, name, rOpt, eoflimit, sc, isRetry)
197200
n.sink = sink
@@ -215,7 +218,7 @@ func bytesCollect(ctx api.StreamContext, sink api.Sink, data any) (err error) {
215218
}
216219

217220
// NewTupleSinkNode creates a sink node that collects data from the stream. Do some static validation
218-
func NewTupleSinkNode(ctx api.StreamContext, name string, sink api.TupleCollector, rOpt def.RuleOption, eoflimit int, sc *conf.SinkConf, isRetry bool) (*SinkNode, error) {
221+
func NewTupleSinkNode(ctx api.StreamContext, name string, sink api.TupleCollector, rOpt def.RuleOption, eoflimit int, sc *SinkConf, isRetry bool) (*SinkNode, error) {
219222
ctx.GetLogger().Infof("create message sink node %s", name)
220223
n := newSinkNode(ctx, name, rOpt, eoflimit, sc, isRetry)
221224
n.sink = sink

internal/topo/node/sink_node_test.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ func TestNewSinkNode(t *testing.T) {
9191
t.Run(tt.name, func(t *testing.T) {
9292
n := newSinkNode(ctx, "test", def.RuleOption{
9393
BufferLength: 1024,
94-
}, 1, tt.sc, tt.isRetry)
94+
}, 1, &SinkConf{
95+
SinkConf: *tt.sc,
96+
BufferLength: 1024,
97+
}, tt.isRetry)
9598
assert.Equal(t, tt.resendInterval, n.resendInterval, "resend interval")
9699
assert.Equal(t, tt.bufferLength, cap(n.input))
97100
})
@@ -103,10 +106,12 @@ func TestRetry(t *testing.T) {
103106
s := &mockResendSink{failTimes: 2}
104107
n, err := NewBytesSinkNode(ctx, "resendout_sink", s, def.RuleOption{
105108
BufferLength: 1024,
106-
}, 1, &conf.SinkConf{
107-
ResendInterval: cast.DurationConf(100 * time.Millisecond),
108-
EnableCache: true,
109-
MemoryCacheThreshold: 10,
109+
}, 1, &SinkConf{
110+
SinkConf: conf.SinkConf{
111+
ResendInterval: cast.DurationConf(100 * time.Millisecond),
112+
EnableCache: true,
113+
MemoryCacheThreshold: 10,
114+
},
110115
}, true)
111116
assert.NoError(t, err)
112117
data := &xsql.RawTuple{
@@ -133,11 +138,13 @@ func TestResendOut(t *testing.T) {
133138
s := &mockResendSink{failTimes: 10}
134139
n, err := NewBytesSinkNode(ctx, "resendout_sink", s, def.RuleOption{
135140
BufferLength: 1024,
136-
}, 1, &conf.SinkConf{
137-
ResendInterval: cast.DurationConf(100 * time.Millisecond),
138-
EnableCache: true,
139-
MemoryCacheThreshold: 10,
140-
ResendAlterQueue: true,
141+
}, 1, &SinkConf{
142+
SinkConf: conf.SinkConf{
143+
ResendInterval: cast.DurationConf(100 * time.Millisecond),
144+
EnableCache: true,
145+
MemoryCacheThreshold: 10,
146+
ResendAlterQueue: true,
147+
},
141148
}, true)
142149
assert.NoError(t, err)
143150
alertCh := make(chan any, 10)

internal/topo/planner/planner_sink.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[strin
102102
var snk node.DataSinkNode
103103
switch ss := s.(type) {
104104
case api.BytesCollector:
105-
snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false)
105+
snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, commonConf, false)
106106
case api.TupleCollector:
107-
snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false)
107+
snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, commonConf, false)
108108
default:
109109
err = fmt.Errorf("sink type %s does not implement any collector", sinkType)
110110
}
@@ -135,9 +135,9 @@ func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[strin
135135
var snk node.DataSinkNode
136136
switch ss := s.(type) {
137137
case api.BytesCollector:
138-
snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true)
138+
snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, commonConf, true)
139139
case api.TupleCollector:
140-
snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true)
140+
snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, commonConf, true)
141141
default:
142142
err = fmt.Errorf("sink type %s does not implement any collector", sinkType)
143143
}

0 commit comments

Comments
 (0)