Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions client/milvusclient/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,58 @@ func (c *Client) RestoreSnapshot(ctx context.Context, opt RestoreSnapshotOption,
return jobID, err
}

func (c *Client) RestoreExternalSnapshot(ctx context.Context, opt RestoreExternalSnapshotOption, callOptions ...grpc.CallOption) (int64, error) {
if opt == nil {
return 0, merr.WrapErrParameterInvalid("RestoreExternalSnapshotOption", "nil", "option cannot be nil")
}
req := opt.Request()
if req.DbName == "" {
req.DbName = c.getCurrentDB()
}
if timeout := opt.RequestTimeout(); timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

var jobID int64
err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
resp, err := milvusService.RestoreExternalSnapshot(ctx, req, callOptions...)
if err := merr.CheckRPCCall(resp, err); err != nil {
return err
}
jobID = resp.GetJobId()
return nil
})
return jobID, err
}

func (c *Client) ExportSnapshot(ctx context.Context, opt ExportSnapshotOption, callOptions ...grpc.CallOption) (string, error) {
if opt == nil {
return "", merr.WrapErrParameterInvalid("ExportSnapshotOption", "nil", "option cannot be nil")
}
req := opt.Request()
if req.DbName == "" {
req.DbName = c.getCurrentDB()
}
if timeout := opt.RequestTimeout(); timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

var snapshotMetadataURI string
err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
resp, err := milvusService.ExportSnapshot(ctx, req, callOptions...)
if err := merr.CheckRPCCall(resp, err); err != nil {
return err
}
snapshotMetadataURI = resp.GetSnapshotMetadataUri()
return nil
})
return snapshotMetadataURI, err
}

// GetRestoreSnapshotState gets the state of a restore snapshot job
func (c *Client) GetRestoreSnapshotState(ctx context.Context, opt GetRestoreSnapshotStateOption, callOptions ...grpc.CallOption) (*milvuspb.RestoreSnapshotInfo, error) {
if opt == nil {
Expand Down
109 changes: 109 additions & 0 deletions client/milvusclient/snapshot_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package milvusclient

import (
"time"

"github.com/milvus-io/milvus-proto/go-api/v3/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v3/milvuspb"
)
Expand Down Expand Up @@ -206,6 +208,113 @@ func NewRestoreSnapshotOption(name string, collectionName string, targetCollecti
}
}

type RestoreExternalSnapshotOption interface {
Request() *milvuspb.RestoreExternalSnapshotRequest
RequestTimeout() time.Duration
}

type restoreExternalSnapshotOption struct {
dbName string
targetCollectionName string
snapshotMetadataURI string
externalSpec string
requestTimeout time.Duration
}

func (opt *restoreExternalSnapshotOption) Request() *milvuspb.RestoreExternalSnapshotRequest {
return &milvuspb.RestoreExternalSnapshotRequest{
Base: &commonpb.MsgBase{},
DbName: opt.dbName,
TargetCollectionName: opt.targetCollectionName,
SnapshotMetadataUri: opt.snapshotMetadataURI,
ExternalSpec: opt.externalSpec,
}
}

func (opt *restoreExternalSnapshotOption) WithDbName(dbName string) *restoreExternalSnapshotOption {
opt.dbName = dbName
return opt
}

func (opt *restoreExternalSnapshotOption) WithExternalSpec(externalSpec string) *restoreExternalSnapshotOption {
opt.externalSpec = externalSpec
return opt
}

func (opt *restoreExternalSnapshotOption) WithRequestTimeout(timeout time.Duration) *restoreExternalSnapshotOption {
opt.requestTimeout = timeout
return opt
}

func (opt *restoreExternalSnapshotOption) RequestTimeout() time.Duration {
if opt.requestTimeout <= 0 {
return 120 * time.Second
}
return opt.requestTimeout
}

func NewRestoreExternalSnapshotOption(targetCollectionName string, snapshotMetadataURI string) *restoreExternalSnapshotOption {
return &restoreExternalSnapshotOption{
targetCollectionName: targetCollectionName,
snapshotMetadataURI: snapshotMetadataURI,
}
}

type ExportSnapshotOption interface {
Request() *milvuspb.ExportSnapshotRequest
RequestTimeout() time.Duration
}

type exportSnapshotOption struct {
name string
dbName string
collectionName string
targetS3Path string
externalSpec string
requestTimeout time.Duration
}

func (opt *exportSnapshotOption) Request() *milvuspb.ExportSnapshotRequest {
return &milvuspb.ExportSnapshotRequest{
Base: &commonpb.MsgBase{},
Name: opt.name,
DbName: opt.dbName,
CollectionName: opt.collectionName,
TargetS3Path: opt.targetS3Path,
ExternalSpec: opt.externalSpec,
}
}

func (opt *exportSnapshotOption) WithDbName(dbName string) *exportSnapshotOption {
opt.dbName = dbName
return opt
}

func (opt *exportSnapshotOption) WithExternalSpec(externalSpec string) *exportSnapshotOption {
opt.externalSpec = externalSpec
return opt
}

func (opt *exportSnapshotOption) WithRequestTimeout(timeout time.Duration) *exportSnapshotOption {
opt.requestTimeout = timeout
return opt
}

func (opt *exportSnapshotOption) RequestTimeout() time.Duration {
if opt.requestTimeout <= 0 {
return 120 * time.Second
}
return opt.requestTimeout
}

func NewExportSnapshotOption(name string, collectionName string, targetS3Path string) *exportSnapshotOption {
return &exportSnapshotOption{
name: name,
collectionName: collectionName,
targetS3Path: targetS3Path,
}
}

// GetRestoreSnapshotStateOption interface for getting restore snapshot state options
type GetRestoreSnapshotStateOption interface {
Request() *milvuspb.GetRestoreSnapshotStateRequest
Expand Down
127 changes: 127 additions & 0 deletions client/milvusclient/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,96 @@ func (s *SnapshotSuite) TestRestoreSnapshot() {
})
}

func (s *SnapshotSuite) TestRestoreExternalSnapshot() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("success", func() {
targetCollectionName := fmt.Sprintf("restored_%s", s.randString(6))
metadataURI := "s3://bucket/files/snapshots/meta.json"
expectedJobID := int64(2001)

s.mock.EXPECT().RestoreExternalSnapshot(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.RestoreExternalSnapshotRequest) (*milvuspb.RestoreExternalSnapshotResponse, error) {
s.Equal(targetCollectionName, req.GetTargetCollectionName())
s.Equal(metadataURI, req.GetSnapshotMetadataUri())
return &milvuspb.RestoreExternalSnapshotResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
JobId: expectedJobID,
}, nil
}).Once()

jobID, err := s.client.RestoreExternalSnapshot(ctx,
NewRestoreExternalSnapshotOption(targetCollectionName, metadataURI))
s.NoError(err)
s.Equal(expectedJobID, jobID)
})

s.Run("failure", func() {
targetCollectionName := fmt.Sprintf("restored_%s", s.randString(6))
metadataURI := "s3://bucket/files/snapshots/meta.json"

s.mock.EXPECT().RestoreExternalSnapshot(mock.Anything, mock.Anything).Return((*milvuspb.RestoreExternalSnapshotResponse)(nil), errors.New("mocked error")).Once()

jobID, err := s.client.RestoreExternalSnapshot(ctx,
NewRestoreExternalSnapshotOption(targetCollectionName, metadataURI))
s.Error(err)
s.Equal(int64(0), jobID)
})

s.Run("nil option", func() {
jobID, err := s.client.RestoreExternalSnapshot(ctx, nil)
s.Error(err)
s.Equal(int64(0), jobID)
})
}

func (s *SnapshotSuite) TestExportSnapshot() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("success", func() {
dbName := "source_db"
collectionName := fmt.Sprintf("collection_%s", s.randString(6))
snapshotName := fmt.Sprintf("snapshot_%s", s.randString(6))
targetS3Path := "s3://bucket/export-root"
expectedURI := "s3://bucket/export-root/snapshots/100/metadata/1.json"

s.mock.EXPECT().ExportSnapshot(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.ExportSnapshotRequest) (*milvuspb.ExportSnapshotResponse, error) {
s.Equal(snapshotName, req.GetName())
s.Equal(dbName, req.GetDbName())
s.Equal(collectionName, req.GetCollectionName())
s.Equal(targetS3Path, req.GetTargetS3Path())
return &milvuspb.ExportSnapshotResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
SnapshotMetadataUri: expectedURI,
}, nil
}).Once()

metadataURI, err := s.client.ExportSnapshot(ctx,
NewExportSnapshotOption(snapshotName, collectionName, targetS3Path).WithDbName(dbName))
s.NoError(err)
s.Equal(expectedURI, metadataURI)
})

s.Run("failure", func() {
collectionName := fmt.Sprintf("collection_%s", s.randString(6))
snapshotName := fmt.Sprintf("snapshot_%s", s.randString(6))

s.mock.EXPECT().ExportSnapshot(mock.Anything, mock.Anything).Return((*milvuspb.ExportSnapshotResponse)(nil), errors.New("mocked error")).Once()

metadataURI, err := s.client.ExportSnapshot(ctx,
NewExportSnapshotOption(snapshotName, collectionName, "s3://bucket/export-root"))
s.Error(err)
s.Empty(metadataURI)
})

s.Run("nil option", func() {
metadataURI, err := s.client.ExportSnapshot(ctx, nil)
s.Error(err)
s.Empty(metadataURI)
})
}

func (s *SnapshotSuite) TestSnapshotOptions() {
s.Run("CreateSnapshotOption", func() {
collectionName := "test_collection"
Expand Down Expand Up @@ -369,6 +459,43 @@ func (s *SnapshotSuite) TestSnapshotOptions() {
s.Equal(targetCollection, req.GetTargetCollectionName())
s.Equal(targetDb, req.GetTargetDbName())
})

s.Run("RestoreExternalSnapshotOption", func() {
dbName := "target_db"
targetCollection := "restored_collection"
metadataURI := "s3://bucket/files/snapshots/meta.json"
externalSpec := `{"extfs":{"cloud_provider":"aws","region":"us-west-2","use_iam":"true"}}`

opt := NewRestoreExternalSnapshotOption(targetCollection, metadataURI).
WithDbName(dbName).
WithExternalSpec(externalSpec)

req := opt.Request()
s.NotNil(req.GetBase())
s.Equal(dbName, req.GetDbName())
s.Equal(targetCollection, req.GetTargetCollectionName())
s.Equal(metadataURI, req.GetSnapshotMetadataUri())
s.Equal(externalSpec, req.GetExternalSpec())
})

s.Run("ExportSnapshotOption", func() {
dbName := "source_db"
collectionName := "source_collection"
snapshotName := "test_snapshot"
targetS3Path := "s3://bucket/export-root"
externalSpec := `{"extfs":{"cloud_provider":"aws","region":"us-west-2","use_iam":"true"}}`

opt := NewExportSnapshotOption(snapshotName, collectionName, targetS3Path).
WithDbName(dbName).
WithExternalSpec(externalSpec)

req := opt.Request()
s.Equal(snapshotName, req.GetName())
s.Equal(dbName, req.GetDbName())
s.Equal(collectionName, req.GetCollectionName())
s.Equal(targetS3Path, req.GetTargetS3Path())
s.Equal(externalSpec, req.GetExternalSpec())
})
}

func TestSnapshot(t *testing.T) {
Expand Down
Loading