Skip to content

Commit 47e28fb

Browse files
authored
Add FlushAll (#23088)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
1 parent 3e95df7 commit 47e28fb

File tree

18 files changed

+828
-292
lines changed

18 files changed

+828
-292
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ require (
2929
github.com/klauspost/compress v1.14.2
3030
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
3131
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
32-
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd
32+
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230324025554-5bbe6698c2b0
3333
github.com/minio/minio-go/v7 v7.0.17
3434
github.com/opentracing/opentracing-go v1.2.0
3535
github.com/panjf2000/ants/v2 v2.4.8

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,8 +493,14 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
493493
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
494494
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd h1:9ilgTEqZSdEPbJKSrRGB1TIHTaF7DqVDIwn8/azcaBk=
495495
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
496+
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230322065753-aa8a66130217 h1:58lCM3+oh3ZuCemnOE3V2VdaPnIL+LS7eoEyrFfrxOM=
497+
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230322065753-aa8a66130217/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
498+
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230324025554-5bbe6698c2b0 h1:k1VzMLw+FO6G4JUDIliN0+8UBnOGwSeiJUlO6/iJrXg=
499+
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230324025554-5bbe6698c2b0/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
496500
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
497501
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
502+
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
503+
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
498504
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
499505
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
500506
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=

internal/datacoord/server_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3507,6 +3507,87 @@ func TestGetFlushState(t *testing.T) {
35073507
})
35083508
}
35093509

3510+
func TestGetFlushAllState(t *testing.T) {
3511+
tests := []struct {
3512+
testName string
3513+
ChannelCPs []Timestamp
3514+
FlushAllTs Timestamp
3515+
ServerIsHealthy bool
3516+
ShowCollectionFailed bool
3517+
DescribeCollectionFailed bool
3518+
ExpectedSuccess bool
3519+
ExpectedFlushed bool
3520+
}{
3521+
{"test FlushAll flushed", []Timestamp{100, 200}, 99,
3522+
true, false, false, true, true},
3523+
{"test FlushAll not flushed", []Timestamp{100, 200}, 150,
3524+
true, false, false, true, false},
3525+
{"test Sever is not healthy", nil, 0,
3526+
false, false, false, false, false},
3527+
{"test ShowCollections failed", nil, 0,
3528+
true, true, false, false, false},
3529+
{"test DescribeCollection failed", nil, 0,
3530+
true, false, true, false, false},
3531+
}
3532+
for _, test := range tests {
3533+
t.Run(test.testName, func(t *testing.T) {
3534+
collection := UniqueID(0)
3535+
vchannels := []string{"mock-vchannel-0", "mock-vchannel-1"}
3536+
3537+
svr := &Server{}
3538+
if test.ServerIsHealthy {
3539+
svr.stateCode.Store(commonpb.StateCode_Healthy)
3540+
}
3541+
var err error
3542+
svr.meta = &meta{}
3543+
svr.rootCoordClient = mocks.NewRootCoord(t)
3544+
if test.ShowCollectionFailed {
3545+
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything).
3546+
Return(&milvuspb.ShowCollectionsResponse{
3547+
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
3548+
}, nil).Maybe()
3549+
} else {
3550+
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything).
3551+
Return(&milvuspb.ShowCollectionsResponse{
3552+
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
3553+
CollectionIds: []int64{collection},
3554+
}, nil).Maybe()
3555+
}
3556+
3557+
if test.DescribeCollectionFailed {
3558+
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
3559+
Return(&milvuspb.DescribeCollectionResponse{
3560+
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
3561+
}, nil).Maybe()
3562+
} else {
3563+
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
3564+
Return(&milvuspb.DescribeCollectionResponse{
3565+
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
3566+
VirtualChannelNames: vchannels,
3567+
}, nil).Maybe()
3568+
}
3569+
3570+
svr.meta.channelCPs = make(map[string]*internalpb.MsgPosition)
3571+
for i, ts := range test.ChannelCPs {
3572+
channel := vchannels[i]
3573+
svr.meta.channelCPs[channel] = &internalpb.MsgPosition{
3574+
ChannelName: channel,
3575+
Timestamp: ts,
3576+
}
3577+
}
3578+
3579+
resp, err := svr.GetFlushAllState(context.TODO(), &milvuspb.GetFlushAllStateRequest{FlushAllTs: test.FlushAllTs})
3580+
assert.Nil(t, err)
3581+
if test.ExpectedSuccess {
3582+
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
3583+
} else {
3584+
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
3585+
}
3586+
assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed())
3587+
})
3588+
}
3589+
}
3590+
35103591
func TestDataCoordServer_SetSegmentState(t *testing.T) {
35113592
t.Run("normal case", func(t *testing.T) {
35123593
svr := newTestServer(t, nil)

internal/datacoord/services.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,51 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
11801180
return resp, nil
11811181
}
11821182

1183+
// GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed.
1184+
func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
1185+
resp := &milvuspb.GetFlushAllStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}
1186+
if s.isClosed() {
1187+
log.Warn("DataCoord receive GetFlushAllState request, server closed")
1188+
setNotServingStatus(resp.Status, s.GetStateCode())
1189+
return resp, nil
1190+
}
1191+
1192+
showColRsp, err := s.rootCoordClient.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
1193+
Base: commonpbutil.NewMsgBase(
1194+
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
1195+
)})
1196+
if err = VerifyResponse(showColRsp, err); err != nil {
1197+
log.Warn("failed to ShowCollections", zap.Error(err))
1198+
resp.Status.Reason = err.Error()
1199+
return resp, nil
1200+
}
1201+
1202+
for _, collection := range showColRsp.GetCollectionIds() {
1203+
describeColRsp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
1204+
Base: commonpbutil.NewMsgBase(
1205+
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
1206+
),
1207+
CollectionID: collection,
1208+
})
1209+
if err = VerifyResponse(describeColRsp, err); err != nil {
1210+
log.Warn("failed to DescribeCollectionInternal", zap.Error(err))
1211+
resp.Status.Reason = err.Error()
1212+
return resp, nil
1213+
}
1214+
for _, channel := range describeColRsp.GetVirtualChannelNames() {
1215+
channelCP := s.meta.GetChannelCheckpoint(channel)
1216+
if channelCP == nil || channelCP.GetTimestamp() < req.GetFlushAllTs() {
1217+
resp.Flushed = false
1218+
resp.Status.ErrorCode = commonpb.ErrorCode_Success
1219+
return resp, nil
1220+
}
1221+
}
1222+
}
1223+
resp.Flushed = true
1224+
resp.Status.ErrorCode = commonpb.ErrorCode_Success
1225+
return resp, nil
1226+
}
1227+
11831228
// Import distributes the import tasks to dataNodes.
11841229
// It returns a failed status if no dataNode is available or if any error occurs.
11851230
func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {

internal/distributed/datacoord/client/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,20 @@ func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
581581
return ret.(*milvuspb.GetFlushStateResponse), err
582582
}
583583

584+
// GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed.
585+
func (c *Client) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
586+
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
587+
if !funcutil.CheckCtxValid(ctx) {
588+
return nil, ctx.Err()
589+
}
590+
return client.GetFlushAllState(ctx, req)
591+
})
592+
if err != nil || ret == nil {
593+
return nil, err
594+
}
595+
return ret.(*milvuspb.GetFlushAllStateResponse), err
596+
}
597+
584598
// DropVirtualChannel drops virtual channel in datacoord.
585599
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
586600
req = typeutil.Clone(req)

internal/distributed/datacoord/client/client_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ func Test_NewClient(t *testing.T) {
156156
r32, err := client.ListSegmentsInfo(ctx, nil)
157157
retCheck(retNotNil, r32, err)
158158

159+
r, err := client.GetFlushAllState(ctx, nil)
160+
retCheck(retNotNil, r, err)
161+
159162
{
160163
ret, err := client.BroadcastAlteredCollection(ctx, nil)
161164
retCheck(retNotNil, ret, err)

internal/distributed/datacoord/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
360360
return s.dataCoord.GetFlushState(ctx, req)
361361
}
362362

363+
// GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed.
364+
func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
365+
return s.dataCoord.GetFlushAllState(ctx, req)
366+
}
367+
363368
// DropVirtualChannel drop virtual channel in datacoord
364369
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
365370
return s.dataCoord.DropVirtualChannel(ctx, req)

internal/distributed/datacoord/service_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type MockDataCoord struct {
6060
compactionPlansResp *milvuspb.GetCompactionPlansResponse
6161
watchChannelsResp *datapb.WatchChannelsResponse
6262
getFlushStateResp *milvuspb.GetFlushStateResponse
63+
getFlushAllStateResp *milvuspb.GetFlushAllStateResponse
6364
dropVChanResp *datapb.DropVirtualChannelResponse
6465
setSegmentStateResp *datapb.SetSegmentStateResponse
6566
importResp *datapb.ImportTaskResponse
@@ -188,6 +189,10 @@ func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlus
188189
return m.getFlushStateResp, m.err
189190
}
190191

192+
func (m *MockDataCoord) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
193+
return m.getFlushAllStateResp, m.err
194+
}
195+
191196
func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
192197
return m.dropVChanResp, m.err
193198
}
@@ -420,6 +425,15 @@ func Test_NewServer(t *testing.T) {
420425
assert.NotNil(t, resp)
421426
})
422427

428+
t.Run("GetFlushAllState", func(t *testing.T) {
429+
server.dataCoord = &MockDataCoord{
430+
getFlushAllStateResp: &milvuspb.GetFlushAllStateResponse{},
431+
}
432+
resp, err := server.GetFlushAllState(ctx, nil)
433+
assert.Nil(t, err)
434+
assert.NotNil(t, resp)
435+
})
436+
423437
t.Run("DropVirtualChannel", func(t *testing.T) {
424438
server.dataCoord = &MockDataCoord{
425439
dropVChanResp: &datapb.DropVirtualChannelResponse{},

internal/distributed/proxy/service.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,10 @@ func (s *Server) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanc
692692
return s.proxy.CalcDistance(ctx, request)
693693
}
694694

695+
func (s *Server) FlushAll(ctx context.Context, request *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) {
696+
return s.proxy.FlushAll(ctx, request)
697+
}
698+
695699
func (s *Server) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
696700
return s.proxy.GetDdChannel(ctx, request)
697701
}
@@ -758,6 +762,11 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
758762
return s.proxy.GetFlushState(ctx, req)
759763
}
760764

765+
// GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed.
766+
func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
767+
return s.proxy.GetFlushAllState(ctx, req)
768+
}
769+
761770
func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
762771
return s.proxy.Import(ctx, req)
763772
}

internal/distributed/proxy/service_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,10 @@ func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlus
612612
return nil, nil
613613
}
614614

615+
func (m *MockDataCoord) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
616+
return nil, nil
617+
}
618+
615619
func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
616620
return &datapb.DropVirtualChannelResponse{}, nil
617621
}
@@ -803,6 +807,10 @@ func (m *MockProxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDist
803807
return nil, nil
804808
}
805809

810+
func (m *MockProxy) FlushAll(ctx context.Context, request *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) {
811+
return nil, nil
812+
}
813+
806814
func (m *MockProxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
807815
return nil, nil
808816
}
@@ -894,6 +902,10 @@ func (m *MockProxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushSta
894902
return nil, nil
895903
}
896904

905+
func (m *MockProxy) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
906+
return nil, nil
907+
}
908+
897909
func (m *MockProxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
898910
return nil, nil
899911
}
@@ -1458,6 +1470,16 @@ func Test_NewServer(t *testing.T) {
14581470
assert.Nil(t, err)
14591471
})
14601472

1473+
t.Run("FlushAll", func(t *testing.T) {
1474+
_, err := server.FlushAll(ctx, nil)
1475+
assert.Nil(t, err)
1476+
})
1477+
1478+
t.Run("GetFlushAllState", func(t *testing.T) {
1479+
_, err := server.GetFlushAllState(ctx, nil)
1480+
assert.Nil(t, err)
1481+
})
1482+
14611483
err = server.Stop()
14621484
assert.Nil(t, err)
14631485

0 commit comments

Comments
 (0)