Skip to content

Commit 71c953d

Browse files
authored
[ENH]: SysDb should return lineage, version file paths and root collection ID on collections (#4557)
## Description of changes See title. Needed for garbage collector tests. ## Test plan _How are these changes tested?_ Updated existing Go tests to cover changes on Go side. Rust changes will be consumed in a future PR. ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_ n/a
1 parent bbd92f6 commit 71c953d

File tree

16 files changed

+98
-45
lines changed

16 files changed

+98
-45
lines changed

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
319319
suite.True(created)
320320
suite.Equal(newCollection.ID, createdCollection.ID)
321321
suite.Equal(newCollection.Name, createdCollection.Name)
322+
suite.NotNil(createdCollection.VersionFileName)
322323
// suite.Equal(len(segments), len(createdSegments))
323324

324325
// Verify the collection was created
@@ -352,8 +353,8 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
352353
suite.ElementsMatch(expected_ids, actual_ids)
353354

354355
// Validate version file
355-
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, newCollection.ID.String(), "0")
356-
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
356+
suite.NotNil(collection.VersionFileName)
357+
versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName)
357358
suite.NoError(err)
358359
suite.NotNil(versionFile)
359360
v0 := versionFile.VersionHistory.Versions[0]
@@ -1608,8 +1609,9 @@ func (suite *APIsTestSuite) TestForkCollection() {
16081609
}
16091610

16101611
// Check version file of forked collection
1611-
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, forkCollection.TargetCollectionID.String(), "0")
1612-
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
1612+
suite.Equal(collection.RootCollectionID, &sourceCreateCollection.ID)
1613+
suite.NotNil(collection.VersionFileName)
1614+
versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName)
16131615
suite.NoError(err)
16141616
suite.NotNil(versionFile)
16151617
v0 := versionFile.VersionHistory.Versions[0]
@@ -1646,7 +1648,11 @@ func (suite *APIsTestSuite) TestForkCollection() {
16461648
// ListCollectionsToGc groups by fork trees and should always return the root of the tree
16471649
suite.Equal(forkCollectionWithSameName.SourceCollectionID, res[0].ID)
16481650

1649-
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *res[0].LineageFilePath)
1651+
// Get source collection to grab lineage path and validate it exists
1652+
sourceCollection, err := suite.coordinator.GetCollections(ctx, sourceCreateCollection.ID, nil, sourceCreateCollection.TenantID, sourceCreateCollection.DatabaseName, nil, nil)
1653+
suite.NoError(err)
1654+
suite.Equal(1, len(sourceCollection))
1655+
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *sourceCollection[0].LineageFileName)
16501656
suite.NoError(err)
16511657
suite.True(exists, "Lineage file should exist in S3")
16521658
}

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Collection struct {
1818
LogPosition int64
1919
Version int32
2020
RootCollectionID *types.UniqueID
21-
LineageFileName string
21+
LineageFileName *string
2222
UpdatedAt types.Timestamp
2323
TotalRecordsPostCompaction uint64
2424
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
@@ -48,7 +48,7 @@ type CreateCollection struct {
4848
DatabaseName string
4949
Ts types.Timestamp
5050
LogPosition int64
51-
RootCollectionId string
51+
RootCollectionId *string
5252
TotalRecordsPostCompaction uint64
5353
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
5454
LastCompactionTimeSecs uint64

go/pkg/sysdb/coordinator/model_db_convert.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
1515
collections := make([]*model.Collection, 0, len(collectionAndMetadataList))
1616
for _, collectionAndMetadata := range collectionAndMetadataList {
1717
var rootCollectionID *types.UniqueID
18-
if id, err := types.Parse(collectionAndMetadata.Collection.RootCollectionId); err == nil {
19-
rootCollectionID = &id
18+
if collectionAndMetadata.Collection.RootCollectionId != nil {
19+
if id, err := types.Parse(*collectionAndMetadata.Collection.RootCollectionId); err == nil {
20+
rootCollectionID = &id
21+
}
2022
}
2123
collection := &model.Collection{
2224
ID: types.MustParse(collectionAndMetadata.Collection.ID),

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -865,14 +865,14 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
865865
}
866866

867867
func (tc *Catalog) getLineageFile(ctx context.Context, collection *model.Collection) (*coordinatorpb.CollectionLineageFile, error) {
868-
if len(collection.LineageFileName) == 0 {
868+
if collection.LineageFileName == nil {
869869
// There is no lineage file for the given collection
870870
return &coordinatorpb.CollectionLineageFile{
871871
Dependencies: []*coordinatorpb.CollectionVersionDependency{},
872872
}, nil
873873
}
874874

875-
return tc.s3Store.GetLineageFile(collection.LineageFileName)
875+
return tc.s3Store.GetLineageFile(*collection.LineageFileName)
876876
}
877877

878878
func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
@@ -898,8 +898,8 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
898898
return err
899899
}
900900

901-
if len(sourceCollectionDb.RootCollectionId) > 0 {
902-
rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId)
901+
if sourceCollectionDb.RootCollectionId != nil {
902+
rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId)
903903
if err != nil {
904904
return err
905905
}
@@ -972,7 +972,7 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
972972
DatabaseName: sourceCollection.DatabaseName,
973973
Ts: ts.Unix(),
974974
LogPosition: sourceCollection.LogPosition,
975-
RootCollectionId: rootCollectionIDStr,
975+
RootCollectionId: &rootCollectionIDStr,
976976
TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction,
977977
SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction,
978978
LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs,
@@ -1051,8 +1051,8 @@ func (tc *Catalog) CountForks(ctx context.Context, sourceCollectionID types.Uniq
10511051
return 0, common.ErrCollectionNotFound
10521052
}
10531053

1054-
if len(sourceCollectionDb.RootCollectionId) > 0 {
1055-
rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId)
1054+
if sourceCollectionDb.RootCollectionId != nil {
1055+
rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId)
10561056
if err != nil {
10571057
return 0, err
10581058
}

go/pkg/sysdb/coordinator/table_catalog_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ func TestCatalog_FlushCollectionCompactionForVersionedCollection(t *testing.T) {
308308
LogPosition: 10,
309309
Version: int32(currentVersion),
310310
VersionFileName: "version_1.pb",
311-
RootCollectionId: "",
312-
LineageFileName: "",
311+
RootCollectionId: nil,
312+
LineageFileName: nil,
313313
TotalRecordsPostCompaction: 10,
314314
SizeBytesPostCompaction: 100,
315315
LastCompactionTimeSecs: 0,
@@ -512,8 +512,8 @@ func TestCatalog_FlushCollectionCompactionForVersionedCollectionWithEmptyFilePat
512512
LogPosition: 10,
513513
Version: int32(currentVersion),
514514
VersionFileName: "version_1.pb",
515-
RootCollectionId: "",
516-
LineageFileName: "",
515+
RootCollectionId: nil,
516+
LineageFileName: nil,
517517
TotalRecordsPostCompaction: 10,
518518
SizeBytesPostCompaction: 100,
519519
LastCompactionTimeSecs: 0,

go/pkg/sysdb/grpc/proto_model_convert.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,15 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
5151
TotalRecordsPostCompaction: collection.TotalRecordsPostCompaction,
5252
SizeBytesPostCompaction: collection.SizeBytesPostCompaction,
5353
LastCompactionTimeSecs: collection.LastCompactionTimeSecs,
54+
VersionFilePath: &collection.VersionFileName,
55+
LineageFilePath: collection.LineageFileName,
5456
}
57+
58+
if collection.RootCollectionID != nil {
59+
rootCollectionId := collection.RootCollectionID.String()
60+
collectionpb.RootCollectionId = &rootCollectionId
61+
}
62+
5563
if collection.Metadata == nil {
5664
return collectionpb
5765
}

go/pkg/sysdb/metastore/db/dao/collection.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (s *collectionDb) DeleteAll() error {
3030
func (s *collectionDb) GetCollectionEntry(collectionID *string, databaseName *string) (*dbmodel.Collection, error) {
3131
var collections []*dbmodel.Collection
3232
query := s.db.Table("collections").
33-
Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id").
33+
Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name").
3434
Joins("INNER JOIN databases ON collections.database_id = databases.id").
3535
Where("collections.id = ?", collectionID)
3636

@@ -117,8 +117,8 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string,
117117
LogPosition int64 `gorm:"column:log_position"`
118118
Version int32 `gorm:"column:version"`
119119
VersionFileName string `gorm:"column:version_file_name"`
120-
RootCollectionId string `gorm:"column:root_collection_id"`
121-
LineageFileName string `gorm:"column:lineage_file_name"`
120+
RootCollectionId *string `gorm:"column:root_collection_id"`
121+
LineageFileName *string `gorm:"column:lineage_file_name"`
122122
TotalRecordsPostCompaction uint64 `gorm:"column:total_records_post_compaction"`
123123
SizeBytesPostCompaction uint64 `gorm:"column:size_bytes_post_compaction"`
124124
LastCompactionTimeSecs uint64 `gorm:"column:last_compaction_time_secs"`
@@ -137,7 +137,7 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string,
137137
}
138138

139139
query := s.db.Table("collections").
140-
Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, collections.lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant").
140+
Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant").
141141
Joins("INNER JOIN databases ON collections.database_id = databases.id").
142142
Order("collections.created_at ASC")
143143

@@ -546,7 +546,7 @@ func (s *collectionDb) LockCollection(collectionID string) error {
546546
return nil
547547
}
548548

549-
func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName string, newLineageFileName string) error {
549+
func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName *string, newLineageFileName string) error {
550550
return s.db.Model(&dbmodel.Collection{}).
551551
Where("id = ? AND (lineage_file_name IS NULL OR lineage_file_name = ?)", collectionID, currentLineageFileName).
552552
Updates(map[string]interface{}{

go/pkg/sysdb/metastore/db/dbmodel/collection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ type Collection struct {
1919
LogPosition int64 `gorm:"log_position;default:0"`
2020
Version int32 `gorm:"version;default:0"`
2121
VersionFileName string `gorm:"version_file_name"`
22-
RootCollectionId string `gorm:"column:root_collection_id"`
23-
LineageFileName string `gorm:"column:lineage_file_name"`
22+
RootCollectionId *string `gorm:"column:root_collection_id"`
23+
LineageFileName *string `gorm:"column:lineage_file_name"`
2424
TotalRecordsPostCompaction uint64 `gorm:"total_records_post_compaction;default:0"`
2525
SizeBytesPostCompaction uint64 `gorm:"size_bytes_post_compaction;default:0"`
2626
LastCompactionTimeSecs uint64 `gorm:"last_compaction_time_secs;default:0"`
@@ -68,5 +68,5 @@ type ICollectionDb interface {
6868
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error)
6969
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
7070
LockCollection(collectionID string) error
71-
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error
71+
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error
7272
}

go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/chromadb/proto/chroma.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ message Collection {
5858
uint64 total_records_post_compaction = 10;
5959
uint64 size_bytes_post_compaction = 11;
6060
uint64 last_compaction_time_secs = 12;
61+
optional string version_file_path = 13;
62+
optional string root_collection_id = 14;
63+
optional string lineage_file_path = 15;
6164
}
6265

6366
message Database {

rust/garbage_collector/src/helper.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ impl ChromaGrpcClients {
190190
total_records_post_compaction: 0,
191191
size_bytes_post_compaction: 0,
192192
last_compaction_time_secs: 0,
193+
version_file_path: None,
194+
root_collection_id: None,
195+
lineage_file_path: None,
193196
}),
194197
knn: None,
195198
metadata: None,

rust/segment/src/distributed_spann.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,7 @@ mod test {
634634
dimension: None,
635635
tenant: "test".to_string(),
636636
database: "test".to_string(),
637-
log_position: 0,
638-
version: 0,
639-
total_records_post_compaction: 0,
640-
size_bytes_post_compaction: 0,
641-
last_compaction_time_secs: 0,
637+
..Default::default()
642638
};
643639

644640
let spann_writer = SpannSegmentWriter::from_segment(

rust/sysdb/src/sqlite.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,9 @@ impl SqliteSysDb {
338338
version: 0,
339339
size_bytes_post_compaction: 0,
340340
last_compaction_time_secs: 0,
341+
version_file_path: None,
342+
root_collection_id: None,
343+
lineage_file_path: None,
341344
})
342345
}
343346

@@ -724,6 +727,9 @@ impl SqliteSysDb {
724727
database: first_row.get(5),
725728
size_bytes_post_compaction: 0,
726729
last_compaction_time_secs: 0,
730+
version_file_path: None,
731+
root_collection_id: None,
732+
lineage_file_path: None,
727733
}))
728734
})
729735
.collect::<Result<Vec<_>, GetCollectionsError>>()?;

rust/sysdb/src/sysdb.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ impl SysDb {
257257
total_records_post_compaction: 0,
258258
size_bytes_post_compaction: 0,
259259
last_compaction_time_secs: 0,
260+
version_file_path: None,
261+
root_collection_id: None,
262+
lineage_file_path: None,
260263
};
261264

262265
test_sysdb.add_collection(collection.clone());

rust/types/src/collection.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ pub struct Collection {
9494
pub size_bytes_post_compaction: u64,
9595
#[serde(skip)]
9696
pub last_compaction_time_secs: u64,
97+
#[serde(skip)]
98+
pub version_file_path: Option<String>,
99+
#[serde(skip)]
100+
pub root_collection_id: Option<CollectionUuid>,
101+
#[serde(skip)]
102+
pub lineage_file_path: Option<String>,
97103
}
98104

99105
impl Default for Collection {
@@ -111,6 +117,9 @@ impl Default for Collection {
111117
total_records_post_compaction: 0,
112118
size_bytes_post_compaction: 0,
113119
last_compaction_time_secs: 0,
120+
version_file_path: None,
121+
root_collection_id: None,
122+
lineage_file_path: None,
114123
}
115124
}
116125
}
@@ -226,6 +235,11 @@ impl TryFrom<chroma_proto::Collection> for Collection {
226235
total_records_post_compaction: proto_collection.total_records_post_compaction,
227236
size_bytes_post_compaction: proto_collection.size_bytes_post_compaction,
228237
last_compaction_time_secs: proto_collection.last_compaction_time_secs,
238+
version_file_path: proto_collection.version_file_path,
239+
root_collection_id: proto_collection
240+
.root_collection_id
241+
.map(|uuid| CollectionUuid(Uuid::try_parse(&uuid).unwrap())),
242+
lineage_file_path: proto_collection.lineage_file_path,
229243
})
230244
}
231245
}
@@ -261,6 +275,9 @@ impl TryFrom<Collection> for chroma_proto::Collection {
261275
total_records_post_compaction: value.total_records_post_compaction,
262276
size_bytes_post_compaction: value.size_bytes_post_compaction,
263277
last_compaction_time_secs: value.last_compaction_time_secs,
278+
version_file_path: value.version_file_path,
279+
root_collection_id: value.root_collection_id.map(|uuid| uuid.0.to_string()),
280+
lineage_file_path: value.lineage_file_path,
264281
})
265282
}
266283
}
@@ -306,6 +323,9 @@ mod test {
306323
total_records_post_compaction: 0,
307324
size_bytes_post_compaction: 0,
308325
last_compaction_time_secs: 0,
326+
version_file_path: Some("version_file_path".to_string()),
327+
root_collection_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
328+
lineage_file_path: Some("lineage_file_path".to_string()),
309329
};
310330
let converted_collection: Collection = proto_collection.try_into().unwrap();
311331
assert_eq!(
@@ -318,5 +338,19 @@ mod test {
318338
assert_eq!(converted_collection.tenant, "baz".to_string());
319339
assert_eq!(converted_collection.database, "qux".to_string());
320340
assert_eq!(converted_collection.total_records_post_compaction, 0);
341+
assert_eq!(converted_collection.size_bytes_post_compaction, 0);
342+
assert_eq!(converted_collection.last_compaction_time_secs, 0);
343+
assert_eq!(
344+
converted_collection.version_file_path,
345+
Some("version_file_path".to_string())
346+
);
347+
assert_eq!(
348+
converted_collection.root_collection_id,
349+
Some(CollectionUuid(Uuid::nil()))
350+
);
351+
assert_eq!(
352+
converted_collection.lineage_file_path,
353+
Some("lineage_file_path".to_string())
354+
);
321355
}
322356
}

0 commit comments

Comments
 (0)