Skip to content

Commit 2eb24fb

Browse files
authored
fix: analyzer memory leak because function runner not close (#41839)
relate: #41213 --------- Signed-off-by: aoiasd <[email protected]>
1 parent 373deba commit 2eb24fb

File tree

11 files changed

+81
-5
lines changed

11 files changed

+81
-5
lines changed

internal/datanode/importv2/util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ func RunBm25Function(task *ImportTask, data *storage.InsertData) error {
378378
continue
379379
}
380380

381+
defer runner.Close()
382+
381383
inputFieldIDs := lo.Map(runner.GetInputFields(), func(field *schemapb.FieldSchema, _ int) int64 { return field.GetFieldID() })
382384
inputDatas := make([]any, 0, len(inputFieldIDs))
383385
for _, inputFieldID := range inputFieldIDs {

internal/flushcommon/pipeline/flow_graph_embedding_node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
171171
return []Msg{fgMsg}
172172
}
173173

174+
func (eNode *embeddingNode) Close() {
175+
for _, runner := range eNode.functionRunners {
176+
runner.Close()
177+
}
178+
}
179+
174180
func BuildSparseFieldData(array *schemapb.SparseFloatArray) storage.FieldData {
175181
return &storage.SparseFloatVectorFieldData{
176182
SparseFloatArray: schemapb.SparseFloatArray{

internal/flushcommon/pipeline/flow_graph_embedding_node_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
7373
t.Run("normal case", func(t *testing.T) {
7474
node, err := newEmbeddingNode("test-channel", metaCache)
7575
assert.NoError(t, err)
76+
defer node.Close()
7677

7778
var output []Msg
7879
assert.NotPanics(t, func() {
@@ -114,6 +115,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
114115
t.Run("with close msg", func(t *testing.T) {
115116
node, err := newEmbeddingNode("test-channel", metaCache)
116117
assert.NoError(t, err)
118+
defer node.Close()
117119

118120
var output []Msg
119121

@@ -131,6 +133,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
131133
t.Run("prepare insert failed", func(t *testing.T) {
132134
node, err := newEmbeddingNode("test-channel", metaCache)
133135
assert.NoError(t, err)
136+
defer node.Close()
134137

135138
assert.Panics(t, func() {
136139
node.Operate([]Msg{
@@ -152,6 +155,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
152155
t.Run("embedding failed", func(t *testing.T) {
153156
node, err := newEmbeddingNode("test-channel", metaCache)
154157
assert.NoError(t, err)
158+
defer node.Close()
155159

156160
node.functionRunners[0].GetSchema().Type = 0
157161
assert.Panics(t, func() {

internal/querynodev2/pipeline/embedding_node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ func (eNode *embeddingNode) Operate(in Msg) Msg {
206206
return nodeMsg
207207
}
208208

209+
func (eNode *embeddingNode) Close() {
210+
for _, functionRunner := range eNode.functionRunners {
211+
functionRunner.Close()
212+
}
213+
}
214+
209215
func getEmbeddingFieldDatas(datas []*schemapb.FieldData, fieldIDs ...int64) ([]any, error) {
210216
result := []any{}
211217
for _, fieldID := range fieldIDs {

internal/querynodev2/pipeline/embedding_node_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,9 @@ func (suite *EmbeddingNodeSuite) TestCreateEmbeddingNode() {
140140
collSchema := proto.Clone(suite.collectionSchema).(*schemapb.CollectionSchema)
141141
collection := segments.NewCollectionWithoutSegcoreForTest(suite.collectionID, collSchema)
142142
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
143-
_, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
143+
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
144144
suite.NoError(err)
145+
defer node.Close()
145146
})
146147
}
147148

@@ -151,6 +152,7 @@ func (suite *EmbeddingNodeSuite) TestOperator() {
151152
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
152153
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
153154
suite.NoError(err)
155+
defer node.Close()
154156

155157
suite.colManager.EXPECT().Get(suite.collectionID).Return(nil).Once()
156158
suite.Panics(func() {
@@ -163,6 +165,7 @@ func (suite *EmbeddingNodeSuite) TestOperator() {
163165
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Times(2)
164166
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
165167
suite.NoError(err)
168+
defer node.Close()
166169

167170
suite.Panics(func() {
168171
node.Operate(&insertNodeMsg{
@@ -192,6 +195,7 @@ func (suite *EmbeddingNodeSuite) TestOperator() {
192195
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Times(2)
193196
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
194197
suite.NoError(err)
198+
defer node.Close()
195199

196200
suite.NotPanics(func() {
197201
output := node.Operate(&insertNodeMsg{
@@ -213,6 +217,7 @@ func (suite *EmbeddingNodeSuite) TestAddInsertData() {
213217
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
214218
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
215219
suite.NoError(err)
220+
defer node.Close()
216221

217222
// transfer insert msg failed because rowbase data not support sparse vector
218223
insertDatas := make(map[int64]*delegator.InsertData)
@@ -237,6 +242,7 @@ func (suite *EmbeddingNodeSuite) TestAddInsertData() {
237242
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
238243
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
239244
suite.NoError(err)
245+
defer node.Close()
240246

241247
insertDatas := make(map[int64]*delegator.InsertData)
242248
err = node.addInsertData(insertDatas, suite.msgs[0], collection)
@@ -250,6 +256,7 @@ func (suite *EmbeddingNodeSuite) TestBM25Embedding() {
250256
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
251257
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
252258
suite.NoError(err)
259+
defer node.Close()
253260

254261
runner := function.NewMockFunctionRunner(suite.T())
255262
runner.EXPECT().BatchRun(mock.Anything).Return(nil, errors.New("mock error"))
@@ -265,6 +272,7 @@ func (suite *EmbeddingNodeSuite) TestBM25Embedding() {
265272
suite.colManager.EXPECT().Get(suite.collectionID).Return(collection).Once()
266273
node, err := newEmbeddingNode(suite.collectionID, suite.channel, suite.manager, 128)
267274
suite.NoError(err)
275+
defer node.Close()
268276

269277
runner := function.NewMockFunctionRunner(suite.T())
270278
runner.EXPECT().BatchRun(mock.Anything).Return([]interface{}{1}, nil)

internal/util/function/bm25_function.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ func (v *BM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
251251
return []*schemapb.FieldSchema{v.inputField}
252252
}
253253

254+
func (v *BM25FunctionRunner) Close() {
255+
v.tokenizer.Destroy()
256+
}
257+
254258
func buildSparseFloatArray(mapdata []map[uint32]float32) *schemapb.SparseFloatArray {
255259
dim := int64(0)
256260
bytes := lo.Map(mapdata, func(sparseMap map[uint32]float32, _ int) []byte {

internal/util/function/function.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type FunctionRunner interface {
3030
GetSchema() *schemapb.FunctionSchema
3131
GetOutputFields() []*schemapb.FieldSchema
3232
GetInputFields() []*schemapb.FieldSchema
33+
34+
Close()
3335
}
3436

3537
func NewFunctionRunner(coll *schemapb.CollectionSchema, schema *schemapb.FunctionSchema) (FunctionRunner, error) {

internal/util/function/mock_function.go

Lines changed: 36 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/util/function/multi_analyzer_bm25_function.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,9 @@ func (v *MultiAnalyzerBM25FunctionRunner) GetOutputFields() []*schemapb.FieldSch
325325
func (v *MultiAnalyzerBM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
326326
return v.inputFields
327327
}
328+
329+
func (v *MultiAnalyzerBM25FunctionRunner) Close() {
330+
for _, analyzer := range v.analyzers {
331+
analyzer.Destroy()
332+
}
333+
}

internal/util/pipeline/node.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type Node interface {
2424
Name() string
2525
MaxQueueLength() int32
2626
Operate(in Msg) Msg
27+
28+
Close()
2729
}
2830

2931
type nodeCtx struct {
@@ -56,6 +58,9 @@ func (node *BaseNode) MaxQueueLength() int32 {
5658
return node.maxQueueLength
5759
}
5860

61+
func (node *BaseNode) Close() {
62+
}
63+
5964
func NewBaseNode(name string, maxQueryLength int32) *BaseNode {
6065
return &BaseNode{
6166
name: name,

internal/util/pipeline/pipeline.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (p *pipeline) Start() error {
7070

7171
func (p *pipeline) Close() {
7272
for _, node := range p.nodes {
73+
node.node.Close()
7374
if node.Checker != nil {
7475
node.Checker.Close()
7576
}

0 commit comments

Comments
 (0)