Skip to content

Commit 331be0c

Browse files
committed
fix analyzer memroy leak because function runner not close
Signed-off-by: aoiasd <[email protected]>
1 parent 4edb1bc commit 331be0c

File tree

9 files changed

+68
-4
lines changed

9 files changed

+68
-4
lines changed

internal/datanode/importv2/util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ func RunBm25Function(task *ImportTask, data *storage.InsertData) error {
233233
continue
234234
}
235235

236+
defer runner.Close()
237+
236238
inputFieldIDs := lo.Map(runner.GetInputFields(), func(field *schemapb.FieldSchema, _ int) int64 { return field.GetFieldID() })
237239
inputDatas := make([]any, 0, len(inputFieldIDs))
238240
for _, inputFieldID := range inputFieldIDs {

internal/flushcommon/pipeline/flow_graph_embedding_node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
168168
return []Msg{fgMsg}
169169
}
170170

171+
func (eNode *embeddingNode) Close() {
172+
for _, runner := range eNode.functionRunners {
173+
runner.Close()
174+
}
175+
}
176+
171177
func BuildSparseFieldData(array *schemapb.SparseFloatArray) storage.FieldData {
172178
return &storage.SparseFloatVectorFieldData{
173179
SparseFloatArray: schemapb.SparseFloatArray{

internal/querynodev2/pipeline/embedding_node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ func (eNode *embeddingNode) Operate(in Msg) Msg {
206206
return nodeMsg
207207
}
208208

209+
func (eNode *embeddingNode) Close() {
210+
for _, functionRunner := range eNode.functionRunners {
211+
functionRunner.Close()
212+
}
213+
}
214+
209215
func getEmbeddingFieldDatas(datas []*schemapb.FieldData, fieldIDs ...int64) ([]any, error) {
210216
result := []any{}
211217
for _, fieldID := range fieldIDs {

internal/util/function/bm25_function.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ func (v *BM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
171171
return []*schemapb.FieldSchema{v.inputField}
172172
}
173173

174+
func (v *BM25FunctionRunner) Close() {
175+
v.tokenizer.Destroy()
176+
}
177+
174178
func buildSparseFloatArray(mapdata []map[uint32]float32) *schemapb.SparseFloatArray {
175179
dim := int64(0)
176180
bytes := lo.Map(mapdata, func(sparseMap map[uint32]float32, _ int) []byte {

internal/util/function/function.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type FunctionRunner interface {
3030
GetSchema() *schemapb.FunctionSchema
3131
GetOutputFields() []*schemapb.FieldSchema
3232
GetInputFields() []*schemapb.FieldSchema
33+
34+
Close()
3335
}
3436

3537
func NewFunctionRunner(coll *schemapb.CollectionSchema, schema *schemapb.FunctionSchema) (FunctionRunner, error) {

internal/util/function/mock_function.go

Lines changed: 36 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/util/function/multi_analyzer_bm25_function.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,9 @@ func (v *MultiAnalyzerBM25FunctionRunner) GetOutputFields() []*schemapb.FieldSch
235235
func (v *MultiAnalyzerBM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
236236
return v.inputFields
237237
}
238+
239+
func (v *MultiAnalyzerBM25FunctionRunner) Close() {
240+
for _, analyzer := range v.analyzers {
241+
analyzer.Destroy()
242+
}
243+
}

internal/util/pipeline/node.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type Node interface {
2424
Name() string
2525
MaxQueueLength() int32
2626
Operate(in Msg) Msg
27+
28+
Close()
2729
}
2830

2931
type nodeCtx struct {
@@ -56,6 +58,9 @@ func (node *BaseNode) MaxQueueLength() int32 {
5658
return node.maxQueueLength
5759
}
5860

61+
func (node *BaseNode) Close() {
62+
}
63+
5964
func NewBaseNode(name string, maxQueryLength int32) *BaseNode {
6065
return &BaseNode{
6166
name: name,

internal/util/pipeline/pipeline.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (p *pipeline) Start() error {
7070

7171
func (p *pipeline) Close() {
7272
for _, node := range p.nodes {
73+
node.node.Close()
7374
if node.Checker != nil {
7475
node.Checker.Close()
7576
}

0 commit comments

Comments
 (0)