Skip to content

Commit 8bb7d04

Browse files
authored
fix(sink): fix sink buffer length metrics (#3508)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 59495ba commit 8bb7d04

File tree

3 files changed

+10
-8
lines changed

3 files changed

+10
-8
lines changed

extensions/sources/sql/ext/sql.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
148148
consumer <- api.NewDefaultSourceTupleWithTime(data, nil, rcvTime)
149149
rcvTime = conf.GetNow()
150150
}
151+
rows.Close()
151152
case <-ctx.Done():
152153
return
153154
}

extensions/sources/sql/ext/sqlLookup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func (s *sqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []
8383
ctx.GetLogger().Errorf("sql look table failed, err:%v, query: %v", err, query)
8484
return nil, err
8585
}
86+
defer rows.Close()
8687
cols, _ := rows.Columns()
8788

8889
types, err := rows.ColumnTypes()

internal/topo/node/sink_node.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
194194
return
195195
}
196196
m.statManager.IncTotalRecordsIn()
197-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq))
197+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq))
198198
outs := itemToMap(data)
199199
if sconf.Omitempty && (data == nil || len(outs) == 0) {
200200
ctx.GetLogger().Debugf("receive empty in sink")
@@ -215,7 +215,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
215215
}
216216
normalQ := func(data []map[string]interface{}) {
217217
m.statManager.ProcessTimeStart()
218-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq))
218+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq))
219219
ctx.GetLogger().Debugf("sending data: %v", data)
220220
err := doCollectMaps(ctx, sink, sconf, data, m.statManager, false)
221221
if sconf.EnableCache {
@@ -231,14 +231,14 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
231231
// Always ack for the normal queue as fail items are handled by the resend queue
232232
select {
233233
case c.Ack <- true:
234-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq) - 1)
234+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq) - 1)
235235
case <-ctx.Done():
236236
}
237237
} else {
238238
select {
239239
case c.Ack <- ack:
240240
if ack { // -1 because the signal length is changed async, just calculate it here
241-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq) - 1)
241+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq) - 1)
242242
}
243243
case <-ctx.Done():
244244
}
@@ -249,7 +249,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
249249

250250
resendQ := func(data []map[string]interface{}) {
251251
ctx.GetLogger().Debugf("resend data: %v", data)
252-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq))
252+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq))
253253
if sconf.ResendIndicatorField != "" {
254254
for _, item := range data {
255255
item[sconf.ResendIndicatorField] = true
@@ -260,7 +260,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
260260
select {
261261
case rq.Ack <- ack:
262262
if ack {
263-
m.statManager.SetBufferLength(bufferLen(dataCh, dataOutCh, c, rq) - 1)
263+
m.statManager.SetBufferLength(bufferLen(m.input, dataCh, dataOutCh, c, rq) - 1)
264264
}
265265
case <-ctx.Done():
266266
}
@@ -358,8 +358,8 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
358358
}()
359359
}
360360

361-
func bufferLen(dataCh chan []map[string]interface{}, dataOutCh <-chan []map[string]interface{}, c *cache.SyncCache, rq *cache.SyncCache) int64 {
362-
l := len(dataCh)
361+
func bufferLen(input chan any, dataCh chan []map[string]interface{}, dataOutCh <-chan []map[string]interface{}, c *cache.SyncCache, rq *cache.SyncCache) int64 {
362+
l := len(dataCh) + len(input)
363363
if dataCh != dataOutCh {
364364
l += len(dataOutCh)
365365
}

0 commit comments

Comments
 (0)