Skip to content

Commit 7202f83

Browse files
authored
[BUG]: v0 in version file for forked collection missing segment file paths from source collection (#4550)
## Description of changes Prior to this fix, `v0` in the version file for forked collections was missing the segment file paths from the source collection. There was a similar bug where if segment file paths were provided during the creation of a new collection, those file paths would also be absent from `v0` in the version file (we currently never provide segment file paths upon creation, but seemed good to fix as well). ## Test plan _How are these changes tested?_ - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust Updated existing Go integration tests to check the created version file. ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_ n/a
1 parent 953bd11 commit 7202f83

File tree

7 files changed

+125
-41
lines changed

7 files changed

+125
-41
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (s *Coordinator) GetTenant(ctx context.Context, getTenant *model.GetTenant)
9797
return tenant, nil
9898
}
9999

100-
func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment) (*model.Collection, bool, error) {
100+
func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.Segment) (*model.Collection, bool, error) {
101101
collection, created, err := s.catalog.CreateCollectionAndSegments(ctx, createCollection, createSegments, createCollection.Ts)
102102
if err != nil {
103103
return nil, false, err
@@ -165,7 +165,7 @@ func (s *Coordinator) CountForks(ctx context.Context, sourceCollectionID types.U
165165
return s.catalog.CountForks(ctx, sourceCollectionID)
166166
}
167167

168-
func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.CreateSegment) error {
168+
func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.Segment) error {
169169
if err := verifyCreateSegment(segment); err != nil {
170170
return err
171171
}
@@ -212,7 +212,7 @@ func verifyCollectionMetadata(metadata *model.CollectionMetadata[model.Collectio
212212
return nil
213213
}
214214

215-
func verifyCreateSegment(segment *model.CreateSegment) error {
215+
func verifyCreateSegment(segment *model.Segment) error {
216216
if err := verifySegmentMetadata(segment.Metadata); err != nil {
217217
return err
218218
}

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
1213
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao"
1314
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1415
"github.com/pingcap/log"
@@ -179,8 +180,8 @@ func testSegment(t *rapid.T) {
179180

180181
t.Repeat(map[string]func(*rapid.T){
181182
"create_segment": func(t *rapid.T) {
182-
segment := rapid.Custom[*model.CreateSegment](func(t *rapid.T) *model.CreateSegment {
183-
return &model.CreateSegment{
183+
segment := rapid.Custom[*model.Segment](func(t *rapid.T) *model.Segment {
184+
return &model.Segment{
184185
ID: types.MustParse(rapid.StringMatching(`[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}`).Draw(t, "segment_id")),
185186
Type: "test-segment-type",
186187
Scope: "test-segment-scope",
@@ -294,7 +295,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
294295
DatabaseName: suite.databaseName,
295296
}
296297

297-
segments := []*model.CreateSegment{
298+
segments := []*model.Segment{
298299
{
299300
ID: types.NewUniqueID(),
300301
Type: "test_type",
@@ -306,6 +307,9 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
306307
Type: "test_type",
307308
Scope: "VECTOR",
308309
CollectionID: newCollection.ID,
310+
FilePaths: map[string][]string{
311+
"test_path": {"test_file1"},
312+
},
309313
},
310314
}
311315

@@ -347,6 +351,22 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
347351
}
348352
suite.ElementsMatch(expected_ids, actual_ids)
349353

354+
// Validate version file
355+
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, newCollection.ID.String(), "0")
356+
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
357+
suite.NoError(err)
358+
suite.NotNil(versionFile)
359+
v0 := versionFile.VersionHistory.Versions[0]
360+
suite.NotNil(v0)
361+
362+
// Validate file paths of segments
363+
suite.NotNil(v0.SegmentInfo)
364+
suite.NotNil(v0.SegmentInfo.SegmentCompactionInfo)
365+
suite.Equal(len(v0.SegmentInfo.SegmentCompactionInfo), 2)
366+
for _, segment := range segments {
367+
assertExpectedSegmentInfoExist(suite, segment, v0.SegmentInfo.SegmentCompactionInfo)
368+
}
369+
350370
// Attempt to create a duplicate collection (should fail)
351371
_, _, err = suite.coordinator.CreateCollectionAndSegments(ctx, newCollection, segments)
352372
suite.Error(err)
@@ -486,7 +506,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
486506
suite.Equal(createCollection.Metadata, results[0].Metadata)
487507

488508
// Create segments associated with collection
489-
segment := &model.CreateSegment{
509+
segment := &model.Segment{
490510
ID: types.MustParse("00000000-0000-0000-0000-000000000001"),
491511
CollectionID: createCollection.ID,
492512
Type: "test_segment",
@@ -993,7 +1013,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
9931013

9941014
sampleSegments := SampleSegments(suite.sampleCollections)
9951015
for _, segment := range sampleSegments {
996-
errSegmentCreation := c.CreateSegment(ctx, &model.CreateSegment{
1016+
errSegmentCreation := c.CreateSegment(ctx, &model.Segment{
9971017
ID: segment.ID,
9981018
Type: segment.Type,
9991019
Scope: segment.Scope,
@@ -1003,7 +1023,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
10031023
suite.NoError(errSegmentCreation)
10041024

10051025
// Create segment with empty collection id fails
1006-
errSegmentCreation = c.CreateSegment(ctx, &model.CreateSegment{
1026+
errSegmentCreation = c.CreateSegment(ctx, &model.Segment{
10071027
ID: segment.ID,
10081028
Type: segment.Type,
10091029
Scope: segment.Scope,
@@ -1014,7 +1034,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
10141034

10151035
// Create segment to test unique constraint violation on segment.id.
10161036
// This should fail because the id is already taken.
1017-
errSegmentCreation = c.CreateSegment(ctx, &model.CreateSegment{
1037+
errSegmentCreation = c.CreateSegment(ctx, &model.Segment{
10181038
ID: segment.ID,
10191039
Type: segment.Type,
10201040
Scope: segment.Scope,
@@ -1037,7 +1057,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteSegments() {
10371057
suite.Equal(sampleSegments, results)
10381058

10391059
// Duplicate create fails
1040-
err := c.CreateSegment(ctx, &model.CreateSegment{
1060+
err := c.CreateSegment(ctx, &model.Segment{
10411061
ID: sampleSegments[0].ID,
10421062
Type: sampleSegments[0].Type,
10431063
Scope: sampleSegments[0].Scope,
@@ -1108,7 +1128,7 @@ func (suite *APIsTestSuite) TestUpdateSegment() {
11081128
}
11091129

11101130
ctx := context.Background()
1111-
errSegmentCreation := suite.coordinator.CreateSegment(ctx, &model.CreateSegment{
1131+
errSegmentCreation := suite.coordinator.CreateSegment(ctx, &model.Segment{
11121132
ID: segment.ID,
11131133
Type: segment.Type,
11141134
Scope: segment.Scope,
@@ -1313,7 +1333,7 @@ func (suite *APIsTestSuite) TestCollectionVersioningWithMinio() {
13131333
DatabaseName: suite.databaseName,
13141334
}
13151335

1316-
segments := []*model.CreateSegment{
1336+
segments := []*model.Segment{
13171337
{
13181338
ID: types.NewUniqueID(),
13191339
Type: "test_type_a",
@@ -1353,6 +1373,33 @@ func (suite *APIsTestSuite) TestCollectionVersioningWithMinio() {
13531373
// suite.True(exists, "Version file should exist in S3")
13541374
}
13551375

1376+
func findSegmentInfo(segmentID types.UniqueID, segmentInfos []*coordinatorpb.FlushSegmentCompactionInfo) *coordinatorpb.FlushSegmentCompactionInfo {
1377+
for _, segmentInfo := range segmentInfos {
1378+
if segmentInfo.SegmentId == segmentID.String() {
1379+
return segmentInfo
1380+
}
1381+
}
1382+
return nil
1383+
}
1384+
1385+
func assertExpectedSegmentInfoExist(suite *APIsTestSuite, expectedSegment *model.Segment, segmentInfos []*coordinatorpb.FlushSegmentCompactionInfo) {
1386+
segmentInfo := findSegmentInfo(expectedSegment.ID, segmentInfos)
1387+
suite.NotNil(segmentInfo)
1388+
1389+
if expectedSegment.FilePaths == nil {
1390+
suite.Nil(segmentInfo.FilePaths)
1391+
return
1392+
}
1393+
1394+
suite.NotNil(segmentInfo.FilePaths)
1395+
1396+
filePaths := map[string][]string{}
1397+
for key, filePath := range segmentInfo.FilePaths {
1398+
filePaths[key] = filePath.Paths
1399+
}
1400+
suite.Equal(filePaths, expectedSegment.FilePaths)
1401+
}
1402+
13561403
func (suite *APIsTestSuite) TestForkCollection() {
13571404
ctx := context.Background()
13581405

@@ -1363,28 +1410,28 @@ func (suite *APIsTestSuite) TestForkCollection() {
13631410
DatabaseName: suite.databaseName,
13641411
}
13651412

1366-
sourceCreateMetadataSegment := &model.CreateSegment{
1413+
sourceCreateMetadataSegment := &model.Segment{
13671414
ID: types.NewUniqueID(),
13681415
Type: "test_blockfile",
13691416
Scope: "METADATA",
13701417
CollectionID: sourceCreateCollection.ID,
13711418
}
13721419

1373-
sourceCreateRecordSegment := &model.CreateSegment{
1420+
sourceCreateRecordSegment := &model.Segment{
13741421
ID: types.NewUniqueID(),
13751422
Type: "test_blockfile",
13761423
Scope: "RECORD",
13771424
CollectionID: sourceCreateCollection.ID,
13781425
}
13791426

1380-
sourceCreateVectorSegment := &model.CreateSegment{
1427+
sourceCreateVectorSegment := &model.Segment{
13811428
ID: types.NewUniqueID(),
13821429
Type: "test_hnsw",
13831430
Scope: "VECTOR",
13841431
CollectionID: sourceCreateCollection.ID,
13851432
}
13861433

1387-
segments := []*model.CreateSegment{
1434+
segments := []*model.Segment{
13881435
sourceCreateMetadataSegment,
13891436
sourceCreateRecordSegment,
13901437
sourceCreateVectorSegment,
@@ -1467,6 +1514,22 @@ func (suite *APIsTestSuite) TestForkCollection() {
14671514
}
14681515
}
14691516

1517+
// Check version file of forked collection
1518+
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, forkCollection.TargetCollectionID.String(), "0")
1519+
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
1520+
suite.NoError(err)
1521+
suite.NotNil(versionFile)
1522+
v0 := versionFile.VersionHistory.Versions[0]
1523+
suite.NotNil(v0)
1524+
// Validate file paths of segments
1525+
suite.NotNil(v0.SegmentInfo)
1526+
suite.NotNil(v0.SegmentInfo.SegmentCompactionInfo)
1527+
suite.Equal(len(v0.SegmentInfo.SegmentCompactionInfo), 3)
1528+
1529+
for _, segment := range collection_segments {
1530+
assertExpectedSegmentInfoExist(suite, segment, v0.SegmentInfo.SegmentCompactionInfo)
1531+
}
1532+
14701533
// Attempt to fork a collcetion with same name (should fail)
14711534
forkCollectionWithSameName := &model.ForkCollection{
14721535
SourceCollectionID: sourceCreateCollection.ID,
@@ -1505,28 +1568,28 @@ func (suite *APIsTestSuite) TestCountForks() {
15051568
DatabaseName: suite.databaseName,
15061569
}
15071570

1508-
sourceCreateMetadataSegment := &model.CreateSegment{
1571+
sourceCreateMetadataSegment := &model.Segment{
15091572
ID: types.NewUniqueID(),
15101573
Type: "test_blockfile",
15111574
Scope: "METADATA",
15121575
CollectionID: sourceCreateCollection.ID,
15131576
}
15141577

1515-
sourceCreateRecordSegment := &model.CreateSegment{
1578+
sourceCreateRecordSegment := &model.Segment{
15161579
ID: types.NewUniqueID(),
15171580
Type: "test_blockfile",
15181581
Scope: "RECORD",
15191582
CollectionID: sourceCreateCollection.ID,
15201583
}
15211584

1522-
sourceCreateVectorSegment := &model.CreateSegment{
1585+
sourceCreateVectorSegment := &model.Segment{
15231586
ID: types.NewUniqueID(),
15241587
Type: "test_hnsw",
15251588
Scope: "VECTOR",
15261589
CollectionID: sourceCreateCollection.ID,
15271590
}
15281591

1529-
segments := []*model.CreateSegment{
1592+
segments := []*model.Segment{
15301593
sourceCreateMetadataSegment,
15311594
sourceCreateRecordSegment,
15321595
sourceCreateVectorSegment,

go/pkg/sysdb/coordinator/model/segment.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,6 @@ type Segment struct {
1414
FilePaths map[string][]string
1515
}
1616

17-
type CreateSegment struct {
18-
ID types.UniqueID
19-
Type string
20-
Scope string
21-
CollectionID types.UniqueID
22-
Metadata *SegmentMetadata[SegmentMetadataValueType]
23-
Ts types.Timestamp
24-
}
25-
2617
type UpdateSegment struct {
2718
ID types.UniqueID
2819
ResetTopic bool

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -932,17 +932,18 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
932932
LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs,
933933
}
934934

935-
createSegments := []*model.CreateSegment{}
935+
createSegments := []*model.Segment{}
936936
flushFilePaths := []*model.FlushSegmentCompaction{}
937937
for _, segment := range sourceSegments {
938938
newSegmentID := types.NewUniqueID()
939-
createSegment := &model.CreateSegment{
939+
createSegment := &model.Segment{
940940
ID: newSegmentID,
941941
Type: segment.Type,
942942
Scope: segment.Scope,
943943
CollectionID: forkCollection.TargetCollectionID,
944944
Metadata: segment.Metadata,
945945
Ts: ts.Unix(),
946+
FilePaths: segment.FilePaths,
946947
}
947948
createSegments = append(createSegments, createSegment)
948949
flushFilePath := &model.FlushSegmentCompaction{
@@ -1034,7 +1035,7 @@ func (tc *Catalog) CountForks(ctx context.Context, sourceCollectionID types.Uniq
10341035
return uint64(len(lineageFile.Dependencies)), nil
10351036
}
10361037

1037-
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
1038+
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.Segment, ts types.Timestamp) (*model.Segment, error) {
10381039
var result *model.Segment
10391040

10401041
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
@@ -1050,7 +1051,7 @@ func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.Creat
10501051
return result, nil
10511052
}
10521053

1053-
func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
1054+
func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model.Segment, ts types.Timestamp) (*model.Segment, error) {
10541055
var result *model.Segment
10551056

10561057
// insert segment
@@ -1090,7 +1091,21 @@ func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model
10901091
return result, nil
10911092
}
10921093

1093-
func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (string, error) {
1094+
func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.Segment, ts types.Timestamp) (string, error) {
1095+
segmentCompactionInfos := make([]*coordinatorpb.FlushSegmentCompactionInfo, 0, len(createSegments))
1096+
for _, segment := range createSegments {
1097+
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
1098+
for k, v := range segment.FilePaths {
1099+
convertedPaths[k] = &coordinatorpb.FilePaths{Paths: v}
1100+
}
1101+
1102+
info := &coordinatorpb.FlushSegmentCompactionInfo{
1103+
SegmentId: segment.ID.String(),
1104+
FilePaths: convertedPaths,
1105+
}
1106+
segmentCompactionInfos = append(segmentCompactionInfos, info)
1107+
}
1108+
10941109
collectionVersionFilePb := &coordinatorpb.CollectionVersionFile{
10951110
CollectionInfoImmutable: &coordinatorpb.CollectionInfoImmutable{
10961111
TenantId: createCollection.TenantID,
@@ -1104,6 +1119,9 @@ func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string
11041119
{
11051120
Version: 0,
11061121
CreatedAtSecs: int64(ts),
1122+
SegmentInfo: &coordinatorpb.CollectionSegmentInfo{
1123+
SegmentCompactionInfo: segmentCompactionInfos,
1124+
},
11071125
},
11081126
},
11091127
},
@@ -1117,7 +1135,7 @@ func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string
11171135
return fullFilePath, nil
11181136
}
11191137

1120-
func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (*model.Collection, bool, error) {
1138+
func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.Segment, ts types.Timestamp) (*model.Collection, bool, error) {
11211139
var resultCollection *model.Collection
11221140
created := false
11231141

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ func (s *Server) CreateCollection(ctx context.Context, req *coordinatorpb.Create
7777
}
7878

7979
// Convert the request segments to create segment models
80-
createSegments := []*model.CreateSegment{}
80+
createSegments := []*model.Segment{}
8181
for _, segment := range req.Segments {
82-
createSegment, err := convertSegmentToModel(segment)
82+
createSegment, err := convertProtoSegment(segment)
8383
if err != nil {
8484
log.Error("Error in creating segments for the collection", zap.Error(err))
8585
res.Collection = nil // We don't need to set the collection in case of error
@@ -90,6 +90,12 @@ func (s *Server) CreateCollection(ctx context.Context, req *coordinatorpb.Create
9090
}
9191
return res, grpcutils.BuildInternalGrpcError(err.Error())
9292
}
93+
filePaths := make(map[string][]string)
94+
for key, filePath := range segment.FilePaths {
95+
filePaths[key] = filePath.Paths
96+
}
97+
createSegment.FilePaths = filePaths
98+
9399
createSegments = append(createSegments, createSegment)
94100
}
95101

0 commit comments

Comments
 (0)