Skip to content

Commit 3baddd3

Browse files
congqixiaThreadDao
andauthored
enhance: [2.5][GoSDK] Cherry pick commits for v2.5.6 and bump version (#43792)
Related pr: #43612 #43732 #43774 #43771 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: ThreadDao <yufen.zong@zilliz.com> Co-authored-by: ThreadDao <yufen.zong@zilliz.com>
1 parent fa51bbe commit 3baddd3

File tree

11 files changed

+1341
-2
lines changed

11 files changed

+1341
-2
lines changed

client/common/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ package common
1818

1919
const (
2020
// SDKVersion const value for current version
21-
SDKVersion = `2.5.5`
21+
SDKVersion = `2.5.6`
2222
)

client/milvusclient/iterator.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package milvusclient
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
24+
"github.com/cockroachdb/errors"
25+
"google.golang.org/grpc"
26+
27+
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
28+
"github.com/milvus-io/milvus/client/v2/entity"
29+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
30+
)
31+
32+
const (
33+
// IteratorKey is the const search param key in indicating enabling iterator.
34+
IteratorKey = "iterator"
35+
IteratorSessionTsKey = "iterator_session_ts"
36+
IteratorSearchV2Key = "search_iter_v2"
37+
IteratorSearchBatchSizeKey = "search_iter_batch_size"
38+
IteratorSearchLastBoundKey = "search_iter_last_bound"
39+
IteratorSearchIDKey = "search_iter_id"
40+
CollectionIDKey = `collection_id`
41+
42+
// Unlimited
43+
Unlimited int64 = -1
44+
)
45+
46+
var ErrServerVersionIncompatible = errors.New("server version incompatible")
47+
48+
// SearchIterator is the interface for search iterator.
49+
type SearchIterator interface {
50+
// Next returns next batch of iterator
51+
// when iterator reaches the end, return `io.EOF`.
52+
Next(ctx context.Context) (ResultSet, error)
53+
}
54+
55+
type searchIteratorV2 struct {
56+
client *Client
57+
option SearchIteratorOption
58+
schema *entity.Schema
59+
limit int64
60+
}
61+
62+
func (it *searchIteratorV2) Next(ctx context.Context) (ResultSet, error) {
63+
// limit reached, return EOF
64+
if it.limit == 0 {
65+
return ResultSet{}, io.EOF
66+
}
67+
68+
rs, err := it.next(ctx)
69+
if err != nil {
70+
return rs, err
71+
}
72+
73+
if it.limit == Unlimited {
74+
return rs, err
75+
}
76+
77+
if int64(rs.Len()) > it.limit {
78+
rs = rs.Slice(0, int(it.limit))
79+
}
80+
it.limit -= int64(rs.Len())
81+
return rs, nil
82+
}
83+
84+
func (it *searchIteratorV2) next(ctx context.Context) (ResultSet, error) {
85+
opt := it.option.SearchOption()
86+
req, err := opt.Request()
87+
if err != nil {
88+
return ResultSet{}, err
89+
}
90+
91+
var rs ResultSet
92+
93+
err = it.client.callService(func(milvusService milvuspb.MilvusServiceClient) error {
94+
resp, err := milvusService.Search(ctx, req)
95+
err = merr.CheckRPCCall(resp, err)
96+
if err != nil {
97+
return err
98+
}
99+
100+
iteratorInfo := resp.GetResults().GetSearchIteratorV2Results()
101+
opt.annRequest.WithSearchParam(IteratorSearchIDKey, iteratorInfo.GetToken())
102+
opt.annRequest.WithSearchParam(IteratorSearchLastBoundKey, fmt.Sprintf("%v", iteratorInfo.GetLastBound()))
103+
104+
resultSets, err := it.client.handleSearchResult(it.schema, req.GetOutputFields(), int(resp.GetResults().GetNumQueries()), resp)
105+
if err != nil {
106+
return err
107+
}
108+
rs = resultSets[0]
109+
110+
if rs.IDs.Len() == 0 {
111+
return io.EOF
112+
}
113+
114+
return nil
115+
})
116+
return rs, err
117+
}
118+
119+
func (it *searchIteratorV2) setupCollectionID(ctx context.Context) error {
120+
opt := it.option.SearchOption()
121+
122+
return it.client.callService(func(milvusService milvuspb.MilvusServiceClient) error {
123+
resp, err := milvusService.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
124+
CollectionName: opt.collectionName,
125+
})
126+
if merr.CheckRPCCall(resp, err) != nil {
127+
return err
128+
}
129+
130+
opt.WithSearchParam(CollectionIDKey, fmt.Sprintf("%d", resp.GetCollectionID()))
131+
schema := &entity.Schema{}
132+
it.schema = schema.ReadProto(resp.GetSchema())
133+
return nil
134+
})
135+
}
136+
137+
// probeCompatiblity checks if the server support SearchIteratorV2.
138+
// It checks if the search result contains search iterator v2 results info and token.
139+
func (it *searchIteratorV2) probeCompatiblity(ctx context.Context) error {
140+
opt := it.option.SearchOption()
141+
opt.annRequest.topK = 1 // ok to leave it here, will be overwritten in next iteration
142+
opt.annRequest.WithSearchParam(IteratorSearchBatchSizeKey, "1")
143+
req, err := opt.Request()
144+
if err != nil {
145+
return err
146+
}
147+
return it.client.callService(func(milvusService milvuspb.MilvusServiceClient) error {
148+
resp, err := milvusService.Search(ctx, req)
149+
err = merr.CheckRPCCall(resp, err)
150+
if err != nil {
151+
return err
152+
}
153+
154+
if resp.GetResults().GetSearchIteratorV2Results() == nil || resp.GetResults().GetSearchIteratorV2Results().GetToken() == "" {
155+
return ErrServerVersionIncompatible
156+
}
157+
return nil
158+
})
159+
}
160+
161+
// newSearchIteratorV2 creates a new search iterator V2.
162+
//
163+
// It sets up the collection ID and checks if the server supports search iterator V2.
164+
// If the server does not support search iterator V2, it returns an error.
165+
func newSearchIteratorV2(ctx context.Context, client *Client, option SearchIteratorOption) (*searchIteratorV2, error) {
166+
iter := &searchIteratorV2{
167+
client: client,
168+
option: option,
169+
limit: option.Limit(),
170+
}
171+
if err := iter.setupCollectionID(ctx); err != nil {
172+
return nil, err
173+
}
174+
175+
if err := iter.probeCompatiblity(ctx); err != nil {
176+
return nil, err
177+
}
178+
179+
return iter, nil
180+
}
181+
182+
type searchIteratorV1 struct {
183+
client *Client
184+
}
185+
186+
func (s *searchIteratorV1) Next(_ context.Context) (ResultSet, error) {
187+
return ResultSet{}, errors.New("not implemented")
188+
}
189+
190+
func newSearchIteratorV1(_ *Client) (*searchIteratorV1, error) {
191+
// search iterator v1 is not supported
192+
return nil, ErrServerVersionIncompatible
193+
}
194+
195+
// SearchIterator creates a search iterator from a collection.
196+
//
197+
// If the server supports search iterator V2, it creates a search iterator V2.
198+
func (c *Client) SearchIterator(ctx context.Context, option SearchIteratorOption, callOptions ...grpc.CallOption) (SearchIterator, error) {
199+
if err := option.ValidateParams(); err != nil {
200+
return nil, err
201+
}
202+
203+
iter, err := newSearchIteratorV2(ctx, c, option)
204+
if err == nil {
205+
return iter, nil
206+
}
207+
208+
if !errors.Is(err, ErrServerVersionIncompatible) {
209+
return nil, err
210+
}
211+
212+
return newSearchIteratorV1(c)
213+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package milvusclient
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/milvus-io/milvus/client/v2/entity"
23+
"github.com/milvus-io/milvus/client/v2/index"
24+
)
25+
26+
type SearchIteratorOption interface {
27+
// SearchOption returns the search option when iterate search
28+
SearchOption() *searchOption
29+
// Limit returns the overall limit of entries to iterate
30+
Limit() int64
31+
// ValidateParams performs the static params validation
32+
ValidateParams() error
33+
}
34+
35+
type searchIteratorOption struct {
36+
*searchOption
37+
batchSize int
38+
iteratorLimit int64
39+
}
40+
41+
func (opt *searchIteratorOption) SearchOption() *searchOption {
42+
opt.annRequest.topK = opt.batchSize
43+
opt.WithSearchParam(IteratorSearchBatchSizeKey, fmt.Sprintf("%d", opt.batchSize))
44+
return opt.searchOption
45+
}
46+
47+
func (opt *searchIteratorOption) Limit() int64 {
48+
return opt.iteratorLimit
49+
}
50+
51+
// ValidateParams performs the static params validation
52+
func (opt *searchIteratorOption) ValidateParams() error {
53+
if opt.batchSize <= 0 {
54+
return fmt.Errorf("batch size must be greater than 0")
55+
}
56+
return nil
57+
}
58+
59+
func (opt *searchIteratorOption) WithBatchSize(batchSize int) *searchIteratorOption {
60+
opt.batchSize = batchSize
61+
return opt
62+
}
63+
64+
func (opt *searchIteratorOption) WithPartitions(partitionNames ...string) *searchIteratorOption {
65+
opt.partitionNames = partitionNames
66+
return opt
67+
}
68+
69+
func (opt *searchIteratorOption) WithFilter(expr string) *searchIteratorOption {
70+
opt.annRequest.WithFilter(expr)
71+
return opt
72+
}
73+
74+
func (opt *searchIteratorOption) WithTemplateParam(key string, val any) *searchIteratorOption {
75+
opt.annRequest.WithTemplateParam(key, val)
76+
return opt
77+
}
78+
79+
func (opt *searchIteratorOption) WithOffset(offset int) *searchIteratorOption {
80+
opt.annRequest.WithOffset(offset)
81+
return opt
82+
}
83+
84+
func (opt *searchIteratorOption) WithOutputFields(fieldNames ...string) *searchIteratorOption {
85+
opt.outputFields = fieldNames
86+
return opt
87+
}
88+
89+
func (opt *searchIteratorOption) WithConsistencyLevel(consistencyLevel entity.ConsistencyLevel) *searchIteratorOption {
90+
opt.consistencyLevel = consistencyLevel
91+
opt.useDefaultConsistencyLevel = false
92+
return opt
93+
}
94+
95+
func (opt *searchIteratorOption) WithANNSField(annsField string) *searchIteratorOption {
96+
opt.annRequest.WithANNSField(annsField)
97+
return opt
98+
}
99+
100+
func (opt *searchIteratorOption) WithGroupByField(groupByField string) *searchIteratorOption {
101+
opt.annRequest.WithGroupByField(groupByField)
102+
return opt
103+
}
104+
105+
func (opt *searchIteratorOption) WithGroupSize(groupSize int) *searchIteratorOption {
106+
opt.annRequest.WithGroupSize(groupSize)
107+
return opt
108+
}
109+
110+
func (opt *searchIteratorOption) WithStrictGroupSize(strictGroupSize bool) *searchIteratorOption {
111+
opt.annRequest.WithStrictGroupSize(strictGroupSize)
112+
return opt
113+
}
114+
115+
func (opt *searchIteratorOption) WithIgnoreGrowing(ignoreGrowing bool) *searchIteratorOption {
116+
opt.annRequest.WithIgnoreGrowing(ignoreGrowing)
117+
return opt
118+
}
119+
120+
func (opt *searchIteratorOption) WithAnnParam(ap index.AnnParam) *searchIteratorOption {
121+
opt.annRequest.WithAnnParam(ap)
122+
return opt
123+
}
124+
125+
func (opt *searchIteratorOption) WithSearchParam(key, value string) *searchIteratorOption {
126+
opt.annRequest.WithSearchParam(key, value)
127+
return opt
128+
}
129+
130+
// WithIteratorLimit sets the limit of entries to iterate
131+
// if limit < 0, then it will be set to Unlimited
132+
func (opt *searchIteratorOption) WithIteratorLimit(limit int64) *searchIteratorOption {
133+
if limit < 0 {
134+
limit = Unlimited
135+
}
136+
opt.iteratorLimit = limit
137+
return opt
138+
}
139+
140+
func NewSearchIteratorOption(collectionName string, vector entity.Vector) *searchIteratorOption {
141+
return &searchIteratorOption{
142+
searchOption: NewSearchOption(collectionName, 1000, []entity.Vector{vector}).
143+
WithSearchParam(IteratorKey, "true").
144+
WithSearchParam(IteratorSearchV2Key, "true"),
145+
batchSize: 1000,
146+
iteratorLimit: Unlimited,
147+
}
148+
}

0 commit comments

Comments
 (0)