Skip to content

[CLN]: add CreateSegment type #4555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *Coordinator) GetTenant(ctx context.Context, getTenant *model.GetTenant)
return tenant, nil
}

func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.Segment) (*model.Collection, bool, error) {
func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment) (*model.Collection, bool, error) {
collection, created, err := s.catalog.CreateCollectionAndSegments(ctx, createCollection, createSegments, createCollection.Ts)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *Coordinator) CountForks(ctx context.Context, sourceCollectionID types.U
return s.catalog.CountForks(ctx, sourceCollectionID)
}

func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.Segment) error {
func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.CreateSegment) error {
if err := verifyCreateSegment(segment); err != nil {
return err
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func verifyCollectionMetadata(metadata *model.CollectionMetadata[model.Collectio
return nil
}

func verifyCreateSegment(segment *model.Segment) error {
func verifyCreateSegment(segment *model.CreateSegment) error {
if err := verifySegmentMetadata(segment.Metadata); err != nil {
return err
}
Expand Down
38 changes: 19 additions & 19 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ func testSegment(t *rapid.T) {

t.Repeat(map[string]func(*rapid.T){
"create_segment": func(t *rapid.T) {
segment := rapid.Custom[*model.Segment](func(t *rapid.T) *model.Segment {
return &model.Segment{
segment := rapid.Custom[*model.CreateSegment](func(t *rapid.T) *model.CreateSegment {
return &model.CreateSegment{
ID: types.MustParse(rapid.StringMatching(`[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}`).Draw(t, "segment_id")),
Type: "test-segment-type",
Scope: "test-segment-scope",
Expand Down Expand Up @@ -295,7 +295,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
DatabaseName: suite.databaseName,
}

segments := []*model.Segment{
segments := []*model.CreateSegment{
{
ID: types.NewUniqueID(),
Type: "test_type",
Expand Down Expand Up @@ -364,7 +364,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
suite.NotNil(v0.SegmentInfo.SegmentCompactionInfo)
suite.Equal(len(v0.SegmentInfo.SegmentCompactionInfo), 2)
for _, segment := range segments {
assertExpectedSegmentInfoExist(suite, segment, v0.SegmentInfo.SegmentCompactionInfo)
assertExpectedSegmentInfoExist(suite, model.CreateSegmentToSegment(segment), v0.SegmentInfo.SegmentCompactionInfo)
}

// Attempt to create a duplicate collection (should fail)
Expand Down Expand Up @@ -506,7 +506,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
suite.Equal(createCollection.Metadata, results[0].Metadata)

// Create segments associated with collection
segment := &model.Segment{
segment := &model.CreateSegment{
ID: types.MustParse("00000000-0000-0000-0000-000000000001"),
CollectionID: createCollection.ID,
Type: "test_segment",
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {

sampleSegments := SampleSegments(suite.sampleCollections)
for _, segment := range sampleSegments {
errSegmentCreation := c.CreateSegment(ctx, &model.Segment{
errSegmentCreation := c.CreateSegment(ctx, &model.CreateSegment{
ID: segment.ID,
Type: segment.Type,
Scope: segment.Scope,
Expand All @@ -1023,7 +1023,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
suite.NoError(errSegmentCreation)

// Create segment with empty collection id fails
errSegmentCreation = c.CreateSegment(ctx, &model.Segment{
errSegmentCreation = c.CreateSegment(ctx, &model.CreateSegment{
ID: segment.ID,
Type: segment.Type,
Scope: segment.Scope,
Expand All @@ -1034,7 +1034,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {

// Create segment to test unique constraint violation on segment.id.
// This should fail because the id is already taken.
errSegmentCreation = c.CreateSegment(ctx, &model.Segment{
errSegmentCreation = c.CreateSegment(ctx, &model.CreateSegment{
ID: segment.ID,
Type: segment.Type,
Scope: segment.Scope,
Expand All @@ -1057,7 +1057,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
suite.Equal(sampleSegments, results)

// Duplicate create fails
err := c.CreateSegment(ctx, &model.Segment{
err := c.CreateSegment(ctx, &model.CreateSegment{
ID: sampleSegments[0].ID,
Type: sampleSegments[0].Type,
Scope: sampleSegments[0].Scope,
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func (suite *APIsTestSuite) TestUpdateSegment() {
}

ctx := context.Background()
errSegmentCreation := suite.coordinator.CreateSegment(ctx, &model.Segment{
errSegmentCreation := suite.coordinator.CreateSegment(ctx, &model.CreateSegment{
ID: segment.ID,
Type: segment.Type,
Scope: segment.Scope,
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func (suite *APIsTestSuite) TestCollectionVersioningWithMinio() {
DatabaseName: suite.databaseName,
}

segments := []*model.Segment{
segments := []*model.CreateSegment{
{
ID: types.NewUniqueID(),
Type: "test_type_a",
Expand Down Expand Up @@ -1410,28 +1410,28 @@ func (suite *APIsTestSuite) TestForkCollection() {
DatabaseName: suite.databaseName,
}

sourceCreateMetadataSegment := &model.Segment{
sourceCreateMetadataSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_blockfile",
Scope: "METADATA",
CollectionID: sourceCreateCollection.ID,
}

sourceCreateRecordSegment := &model.Segment{
sourceCreateRecordSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_blockfile",
Scope: "RECORD",
CollectionID: sourceCreateCollection.ID,
}

sourceCreateVectorSegment := &model.Segment{
sourceCreateVectorSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_hnsw",
Scope: "VECTOR",
CollectionID: sourceCreateCollection.ID,
}

segments := []*model.Segment{
segments := []*model.CreateSegment{
sourceCreateMetadataSegment,
sourceCreateRecordSegment,
sourceCreateVectorSegment,
Expand Down Expand Up @@ -1568,28 +1568,28 @@ func (suite *APIsTestSuite) TestCountForks() {
DatabaseName: suite.databaseName,
}

sourceCreateMetadataSegment := &model.Segment{
sourceCreateMetadataSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_blockfile",
Scope: "METADATA",
CollectionID: sourceCreateCollection.ID,
}

sourceCreateRecordSegment := &model.Segment{
sourceCreateRecordSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_blockfile",
Scope: "RECORD",
CollectionID: sourceCreateCollection.ID,
}

sourceCreateVectorSegment := &model.Segment{
sourceCreateVectorSegment := &model.CreateSegment{
ID: types.NewUniqueID(),
Type: "test_hnsw",
Scope: "VECTOR",
CollectionID: sourceCreateCollection.ID,
}

segments := []*model.Segment{
segments := []*model.CreateSegment{
sourceCreateMetadataSegment,
sourceCreateRecordSegment,
sourceCreateVectorSegment,
Expand Down
22 changes: 22 additions & 0 deletions go/pkg/sysdb/coordinator/model/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ type Segment struct {
FilePaths map[string][]string
}

type CreateSegment struct {
ID types.UniqueID
Type string
Scope string
CollectionID types.UniqueID
Metadata *SegmentMetadata[SegmentMetadataValueType]
Ts types.Timestamp
FilePaths map[string][]string
}

func CreateSegmentToSegment(createSegment *CreateSegment) *Segment {
return &Segment{
ID: createSegment.ID,
Type: createSegment.Type,
Scope: createSegment.Scope,
CollectionID: createSegment.CollectionID,
Metadata: createSegment.Metadata,
Ts: createSegment.Ts,
FilePaths: createSegment.FilePaths,
}
}

type UpdateSegment struct {
ID types.UniqueID
ResetTopic bool
Expand Down
12 changes: 6 additions & 6 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,11 +932,11 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs,
}

createSegments := []*model.Segment{}
createSegments := []*model.CreateSegment{}
flushFilePaths := []*model.FlushSegmentCompaction{}
for _, segment := range sourceSegments {
newSegmentID := types.NewUniqueID()
createSegment := &model.Segment{
createSegment := &model.CreateSegment{
ID: newSegmentID,
Type: segment.Type,
Scope: segment.Scope,
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (tc *Catalog) CountForks(ctx context.Context, sourceCollectionID types.Uniq
return uint64(len(lineageFile.Dependencies)), nil
}

func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.Segment, ts types.Timestamp) (*model.Segment, error) {
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
var result *model.Segment

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
Expand All @@ -1051,7 +1051,7 @@ func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.Segme
return result, nil
}

func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model.Segment, ts types.Timestamp) (*model.Segment, error) {
func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
var result *model.Segment

// insert segment
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model
return result, nil
}

func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.Segment, ts types.Timestamp) (string, error) {
func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (string, error) {
segmentCompactionInfos := make([]*coordinatorpb.FlushSegmentCompactionInfo, 0, len(createSegments))
for _, segment := range createSegments {
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
Expand Down Expand Up @@ -1135,7 +1135,7 @@ func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string
return fullFilePath, nil
}

func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.Segment, ts types.Timestamp) (*model.Collection, bool, error) {
func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (*model.Collection, bool, error) {
var resultCollection *model.Collection
created := false

Expand Down
4 changes: 2 additions & 2 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (s *Server) CreateCollection(ctx context.Context, req *coordinatorpb.Create
}

// Convert the request segments to create segment models
createSegments := []*model.Segment{}
createSegments := []*model.CreateSegment{}
for _, segment := range req.Segments {
createSegment, err := convertProtoSegment(segment)
createSegment, err := convertSegmentToModel(segment)
if err != nil {
log.Error("Error in creating segments for the collection", zap.Error(err))
res.Collection = nil // We don't need to set the collection in case of error
Expand Down
4 changes: 2 additions & 2 deletions go/pkg/sysdb/grpc/proto_model_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func convertSegmentMetadataToProto(segmentMetadata *model.SegmentMetadata[model.
return metadatapb
}

func convertProtoSegment(segmentpb *coordinatorpb.Segment) (*model.Segment, error) {
func convertSegmentToModel(segmentpb *coordinatorpb.Segment) (*model.CreateSegment, error) {
segmentID, err := types.ToUniqueID(&segmentpb.Id)
if err != nil {
log.Error("segment id format error", zap.String("segment.id", segmentpb.Id))
Expand All @@ -241,7 +241,7 @@ func convertProtoSegment(segmentpb *coordinatorpb.Segment) (*model.Segment, erro
filePaths[t] = paths.Paths
}

return &model.Segment{
return &model.CreateSegment{
ID: segmentID,
Type: segmentpb.Type,
Scope: segmentpb.Scope.String(),
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/sysdb/grpc/segment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (s *Server) CreateSegment(ctx context.Context, req *coordinatorpb.CreateSeg

res := &coordinatorpb.CreateSegmentResponse{}

segment, err := convertProtoSegment(segmentpb)
segment, err := convertSegmentToModel(segmentpb)
if err != nil {
log.Error("CreateSegment failed. convert segment to model error", zap.Error(err), zap.String("request", segmentpb.String()))
return res, grpcutils.BuildInternalGrpcError(err.Error())
Expand Down
Loading