Skip to content

Commit ec35f46

Browse files
committed
[BUG]: Handle version creation with empty file paths
1 parent 4f042b3 commit ec35f46

File tree

4 files changed

+361
-26
lines changed

4 files changed

+361
-26
lines changed

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package model
22

33
import (
4+
"time"
5+
46
"github.com/chroma-core/chroma/go/pkg/types"
57
)
68

@@ -21,6 +23,10 @@ type Collection struct {
2123
TotalRecordsPostCompaction uint64
2224
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
2325
LastCompactionTimeSecs uint64
26+
IsDeleted bool
27+
VersionFileName string
28+
CreatedAt time.Time
29+
DatabaseId types.UniqueID
2430
}
2531

2632
type CollectionToGc struct {

go/pkg/sysdb/coordinator/model_db_convert.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
3333
LastCompactionTimeSecs: collectionAndMetadata.Collection.LastCompactionTimeSecs,
3434
RootCollectionID: rootCollectionID,
3535
LineageFileName: collectionAndMetadata.Collection.LineageFileName,
36+
IsDeleted: collectionAndMetadata.Collection.IsDeleted,
37+
VersionFileName: collectionAndMetadata.Collection.VersionFileName,
38+
CreatedAt: collectionAndMetadata.Collection.CreatedAt,
39+
DatabaseId: types.MustParse(collectionAndMetadata.Collection.DatabaseID),
3640
}
3741
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
3842
collections = append(collections, collection)

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,20 +1389,35 @@ func (tc *Catalog) ListCollectionVersions(ctx context.Context,
13891389
return filteredVersions, nil
13901390
}
13911391

1392-
func (tc *Catalog) updateVersionFileInS3(ctx context.Context, existingVersionFilePb *coordinatorpb.CollectionVersionFile, flushCollectionCompaction *model.FlushCollectionCompaction, ts_secs int64) (string, error) {
1392+
func (tc *Catalog) updateVersionFileInS3(existingVersionFilePb *coordinatorpb.CollectionVersionFile, flushCollectionCompaction *model.FlushCollectionCompaction, previousSegmentInfo []*model.Segment, ts_secs int64) (string, error) {
13931393
segmentCompactionInfos := make([]*coordinatorpb.FlushSegmentCompactionInfo, 0, len(flushCollectionCompaction.FlushSegmentCompactions))
1394-
for _, compaction := range flushCollectionCompaction.FlushSegmentCompactions {
1395-
// Convert map[string][]string to map[string]*coordinatorpb.FilePaths
1396-
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
1397-
for k, v := range compaction.FilePaths {
1398-
convertedPaths[k] = &coordinatorpb.FilePaths{Paths: v}
1394+
// If flushCollectionCompaction.FlushSegmentCompactions is empty then use previousSegmentInfo.
1395+
if len(flushCollectionCompaction.FlushSegmentCompactions) == 0 {
1396+
for _, segment := range previousSegmentInfo {
1397+
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
1398+
for k, v := range segment.FilePaths {
1399+
convertedPaths[k] = &coordinatorpb.FilePaths{Paths: v}
1400+
}
1401+
info := &coordinatorpb.FlushSegmentCompactionInfo{
1402+
SegmentId: segment.ID.String(),
1403+
FilePaths: convertedPaths,
1404+
}
1405+
segmentCompactionInfos = append(segmentCompactionInfos, info)
13991406
}
1407+
} else {
1408+
for _, compaction := range flushCollectionCompaction.FlushSegmentCompactions {
1409+
// Convert map[string][]string to map[string]*coordinatorpb.FilePaths
1410+
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
1411+
for k, v := range compaction.FilePaths {
1412+
convertedPaths[k] = &coordinatorpb.FilePaths{Paths: v}
1413+
}
14001414

1401-
info := &coordinatorpb.FlushSegmentCompactionInfo{
1402-
SegmentId: compaction.ID.String(),
1403-
FilePaths: convertedPaths,
1415+
info := &coordinatorpb.FlushSegmentCompactionInfo{
1416+
SegmentId: compaction.ID.String(),
1417+
FilePaths: convertedPaths,
1418+
}
1419+
segmentCompactionInfos = append(segmentCompactionInfos, info)
14041420
}
1405-
segmentCompactionInfos = append(segmentCompactionInfos, info)
14061421
}
14071422

14081423
existingVersionFilePb.GetVersionHistory().Versions = append(existingVersionFilePb.GetVersionHistory().Versions, &coordinatorpb.CollectionVersionInfo{
@@ -1544,7 +1559,8 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
15441559
for numAttempts < maxAttempts {
15451560
numAttempts++
15461561
// Get the current version info and the version file from the table.
1547-
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(types.FromUniqueID(flushCollectionCompaction.ID), nil)
1562+
collectionEntry, segments, err := tc.GetCollectionWithSegments(ctx, flushCollectionCompaction.ID)
1563+
// collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(types.FromUniqueID(flushCollectionCompaction.ID), nil)
15481564
if err != nil {
15491565
return nil, err
15501566
}
@@ -1574,15 +1590,16 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
15741590
}
15751591

15761592
existingVersionFileName := collectionEntry.VersionFileName
1593+
// existingSegments := segments
15771594
var existingVersionFilePb *coordinatorpb.CollectionVersionFile
15781595
if existingVersionFileName == "" {
15791596
// The VersionFile has not been created.
15801597
existingVersionFilePb = &coordinatorpb.CollectionVersionFile{
15811598
CollectionInfoImmutable: &coordinatorpb.CollectionInfoImmutable{
1582-
TenantId: collectionEntry.Tenant,
1583-
DatabaseId: collectionEntry.DatabaseID,
1584-
CollectionId: collectionEntry.ID,
1585-
CollectionName: *collectionEntry.Name,
1599+
TenantId: collectionEntry.TenantID,
1600+
DatabaseId: collectionEntry.DatabaseId.String(),
1601+
CollectionId: collectionEntry.ID.String(),
1602+
CollectionName: collectionEntry.Name,
15861603
CollectionCreationSecs: collectionEntry.CreatedAt.Unix(),
15871604
},
15881605
VersionHistory: &coordinatorpb.CollectionVersionHistory{
@@ -1598,10 +1615,10 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
15981615

15991616
// There was previously a bug that resulted in the tenant ID missing from some version files (https://github.com/chroma-core/chroma/pull/4408).
16001617
// This line can be removed once all corrupted version files are fixed.
1601-
existingVersionFilePb.CollectionInfoImmutable.TenantId = collectionEntry.Tenant
1618+
existingVersionFilePb.CollectionInfoImmutable.TenantId = collectionEntry.TenantID
16021619

16031620
// Do a simple validation of the version file.
1604-
err = tc.validateVersionFile(existingVersionFilePb, collectionEntry.ID, existingVersion)
1621+
err = tc.validateVersionFile(existingVersionFilePb, collectionEntry.ID.String(), existingVersion)
16051622
if err != nil {
16061623
log.Error("version file validation failed", zap.Error(err))
16071624
return nil, err
@@ -1611,7 +1628,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
16111628
// The update function takes the content of the existing version file,
16121629
// and the set of segments that are part of the new version file.
16131630
// NEW VersionFile is created in S3 at this step.
1614-
newVersionFileName, err := tc.updateVersionFileInS3(ctx, existingVersionFilePb, flushCollectionCompaction, time.Now().Unix())
1631+
newVersionFileName, err := tc.updateVersionFileInS3(existingVersionFilePb, flushCollectionCompaction, segments, time.Now().Unix())
16151632
if err != nil {
16161633
return nil, err
16171634
}

0 commit comments

Comments
 (0)