diff --git a/go/pkg/sysdb/coordinator/coordinator_test.go b/go/pkg/sysdb/coordinator/coordinator_test.go index 634e8b0395c..c9f59b9da07 100644 --- a/go/pkg/sysdb/coordinator/coordinator_test.go +++ b/go/pkg/sysdb/coordinator/coordinator_test.go @@ -319,6 +319,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() { suite.True(created) suite.Equal(newCollection.ID, createdCollection.ID) suite.Equal(newCollection.Name, createdCollection.Name) + suite.NotNil(createdCollection.VersionFileName) // suite.Equal(len(segments), len(createdSegments)) // Verify the collection was created @@ -352,8 +353,8 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() { suite.ElementsMatch(expected_ids, actual_ids) // Validate version file - versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, newCollection.ID.String(), "0") - versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix) + suite.NotNil(collection.VersionFileName) + versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName) suite.NoError(err) suite.NotNil(versionFile) v0 := versionFile.VersionHistory.Versions[0] @@ -1515,8 +1516,9 @@ func (suite *APIsTestSuite) TestForkCollection() { } // Check version file of forked collection - versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, forkCollection.TargetCollectionID.String(), "0") - versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix) + suite.Equal(collection.RootCollectionID, &sourceCreateCollection.ID) + suite.NotNil(collection.VersionFileName) + versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName) suite.NoError(err) suite.NotNil(versionFile) v0 := versionFile.VersionHistory.Versions[0] @@ -1553,7 +1555,11 @@ func (suite *APIsTestSuite) TestForkCollection() { // ListCollectionsToGc groups by fork trees and should always return the root of the tree suite.Equal(forkCollectionWithSameName.SourceCollectionID, res[0].ID) - exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *res[0].LineageFilePath) + // Get source collection to grab lineage path and validate it exists + sourceCollection, err := suite.coordinator.GetCollections(ctx, sourceCreateCollection.ID, nil, sourceCreateCollection.TenantID, sourceCreateCollection.DatabaseName, nil, nil) + suite.NoError(err) + suite.Equal(1, len(sourceCollection)) + exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *sourceCollection[0].LineageFileName) suite.NoError(err) suite.True(exists, "Lineage file should exist in S3") } diff --git a/go/pkg/sysdb/coordinator/model/collection.go b/go/pkg/sysdb/coordinator/model/collection.go index 603e3efdf4a..7d6519c63b1 100644 --- a/go/pkg/sysdb/coordinator/model/collection.go +++ b/go/pkg/sysdb/coordinator/model/collection.go @@ -16,11 +16,12 @@ type Collection struct { LogPosition int64 Version int32 RootCollectionID *types.UniqueID - LineageFileName string + LineageFileName *string UpdatedAt types.Timestamp TotalRecordsPostCompaction uint64 SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log LastCompactionTimeSecs uint64 + VersionFileName string } type CollectionToGc struct { @@ -42,7 +43,7 @@ type CreateCollection struct { DatabaseName string Ts types.Timestamp LogPosition int64 - RootCollectionId string + RootCollectionId *string TotalRecordsPostCompaction uint64 SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log LastCompactionTimeSecs uint64 diff --git a/go/pkg/sysdb/coordinator/model_db_convert.go b/go/pkg/sysdb/coordinator/model_db_convert.go index 8ab6395f21e..f20ff35e549 100644 --- a/go/pkg/sysdb/coordinator/model_db_convert.go +++ b/go/pkg/sysdb/coordinator/model_db_convert.go @@ -15,8 +15,10 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd collections := make([]*model.Collection, 0, len(collectionAndMetadataList)) for _, collectionAndMetadata := range collectionAndMetadataList { var rootCollectionID *types.UniqueID - if id, err := types.Parse(collectionAndMetadata.Collection.RootCollectionId); err == nil { - rootCollectionID = &id + if collectionAndMetadata.Collection.RootCollectionId != nil { + if id, err := types.Parse(*collectionAndMetadata.Collection.RootCollectionId); err == nil { + rootCollectionID = &id + } } collection := &model.Collection{ ID: types.MustParse(collectionAndMetadata.Collection.ID), @@ -33,6 +35,7 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd LastCompactionTimeSecs: collectionAndMetadata.Collection.LastCompactionTimeSecs, RootCollectionID: rootCollectionID, LineageFileName: collectionAndMetadata.Collection.LineageFileName, + VersionFileName: collectionAndMetadata.Collection.VersionFileName, } collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata) collections = append(collections, collection) diff --git a/go/pkg/sysdb/coordinator/table_catalog.go b/go/pkg/sysdb/coordinator/table_catalog.go index 852d86d79c0..1e25a2efcd6 100644 --- a/go/pkg/sysdb/coordinator/table_catalog.go +++ b/go/pkg/sysdb/coordinator/table_catalog.go @@ -819,14 +819,14 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model } func (tc *Catalog) getLineageFile(ctx context.Context, collection *model.Collection) (*coordinatorpb.CollectionLineageFile, error) { - if len(collection.LineageFileName) == 0 { + if collection.LineageFileName == nil { // There is no lineage file for the given collection return &coordinatorpb.CollectionLineageFile{ Dependencies: []*coordinatorpb.CollectionVersionDependency{}, }, nil } - return tc.s3Store.GetLineageFile(collection.LineageFileName) + return tc.s3Store.GetLineageFile(*collection.LineageFileName) } func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) { @@ -852,8 +852,8 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For return err } - if len(sourceCollectionDb.RootCollectionId) > 0 { - rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId) + if sourceCollectionDb.RootCollectionId != nil { + rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId) if err != nil { return err } @@ -926,7 +926,7 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For DatabaseName: sourceCollection.DatabaseName, Ts: ts.Unix(), LogPosition: sourceCollection.LogPosition, - RootCollectionId: rootCollectionIDStr, + RootCollectionId: &rootCollectionIDStr, TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction, SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction, LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs, @@ -1005,8 +1005,8 @@ func (tc *Catalog) CountForks(ctx context.Context, sourceCollectionID types.Uniq return 0, common.ErrCollectionNotFound } - if len(sourceCollectionDb.RootCollectionId) > 0 { - rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId) + if sourceCollectionDb.RootCollectionId != nil { + rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId) if err != nil { return 0, err } diff --git a/go/pkg/sysdb/grpc/proto_model_convert.go b/go/pkg/sysdb/grpc/proto_model_convert.go index 9cade1c5cd2..1290309ee9e 100644 --- a/go/pkg/sysdb/grpc/proto_model_convert.go +++ b/go/pkg/sysdb/grpc/proto_model_convert.go @@ -51,7 +51,15 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle TotalRecordsPostCompaction: collection.TotalRecordsPostCompaction, SizeBytesPostCompaction: collection.SizeBytesPostCompaction, LastCompactionTimeSecs: collection.LastCompactionTimeSecs, + VersionFilePath: &collection.VersionFileName, + LineageFilePath: collection.LineageFileName, } + + if collection.RootCollectionID != nil { + rootCollectionId := collection.RootCollectionID.String() + collectionpb.RootCollectionId = &rootCollectionId + } + if collection.Metadata == nil { return collectionpb } diff --git a/go/pkg/sysdb/metastore/db/dao/collection.go b/go/pkg/sysdb/metastore/db/dao/collection.go index 0e380ed3935..5084debf806 100644 --- a/go/pkg/sysdb/metastore/db/dao/collection.go +++ b/go/pkg/sysdb/metastore/db/dao/collection.go @@ -30,7 +30,7 @@ func (s *collectionDb) DeleteAll() error { func (s *collectionDb) GetCollectionEntry(collectionID *string, databaseName *string) (*dbmodel.Collection, error) { var collections []*dbmodel.Collection query := s.db.Table("collections"). - Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id"). + Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name"). Joins("INNER JOIN databases ON collections.database_id = databases.id"). Where("collections.id = ?", collectionID) @@ -116,8 +116,8 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string, LogPosition int64 `gorm:"column:log_position"` Version int32 `gorm:"column:version"` VersionFileName string `gorm:"column:version_file_name"` - RootCollectionId string `gorm:"column:root_collection_id"` - LineageFileName string `gorm:"column:lineage_file_name"` + RootCollectionId *string `gorm:"column:root_collection_id"` + LineageFileName *string `gorm:"column:lineage_file_name"` TotalRecordsPostCompaction uint64 `gorm:"column:total_records_post_compaction"` SizeBytesPostCompaction uint64 `gorm:"column:size_bytes_post_compaction"` LastCompactionTimeSecs uint64 `gorm:"column:last_compaction_time_secs"` @@ -136,7 +136,7 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string, } query := s.db.Table("collections"). - Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, collections.lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant"). + Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant"). Joins("INNER JOIN databases ON collections.database_id = databases.id"). Order("collections.created_at ASC") @@ -519,7 +519,7 @@ func (s *collectionDb) LockCollection(collectionID string) error { return nil } -func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName string, newLineageFileName string) error { +func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName *string, newLineageFileName string) error { return s.db.Model(&dbmodel.Collection{}). Where("id = ? AND (lineage_file_name IS NULL OR lineage_file_name = ?)", collectionID, currentLineageFileName). Updates(map[string]interface{}{ diff --git a/go/pkg/sysdb/metastore/db/dbmodel/collection.go b/go/pkg/sysdb/metastore/db/dbmodel/collection.go index 8eea555d53d..9afc3eb0742 100644 --- a/go/pkg/sysdb/metastore/db/dbmodel/collection.go +++ b/go/pkg/sysdb/metastore/db/dbmodel/collection.go @@ -19,8 +19,8 @@ type Collection struct { LogPosition int64 `gorm:"log_position;default:0"` Version int32 `gorm:"version;default:0"` VersionFileName string `gorm:"version_file_name"` - RootCollectionId string `gorm:"column:root_collection_id"` - LineageFileName string `gorm:"column:lineage_file_name"` + RootCollectionId *string `gorm:"column:root_collection_id"` + LineageFileName *string `gorm:"column:lineage_file_name"` TotalRecordsPostCompaction uint64 `gorm:"total_records_post_compaction;default:0"` SizeBytesPostCompaction uint64 `gorm:"size_bytes_post_compaction;default:0"` LastCompactionTimeSecs uint64 `gorm:"last_compaction_time_secs;default:0"` @@ -67,5 +67,5 @@ type ICollectionDb interface { ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error) UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error) LockCollection(collectionID string) error - UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error + UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error } diff --git a/go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go b/go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go index eb09e6be0d2..9d5878b6541 100644 --- a/go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go +++ b/go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go @@ -321,7 +321,7 @@ func (_m *ICollectionDb) Update(in *dbmodel.Collection) error { } // UpdateCollectionLineageFilePath provides a mock function with given fields: collectionID, currentLineageFilePath, newLineageFilePath -func (_m *ICollectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error { +func (_m *ICollectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error { ret := _m.Called(collectionID, currentLineageFilePath, newLineageFilePath) if len(ret) == 0 { @@ -329,7 +329,7 @@ func (_m *ICollectionDb) UpdateCollectionLineageFilePath(collectionID string, cu } var r0 error - if rf, ok := ret.Get(0).(func(string, string, string) error); ok { + if rf, ok := ret.Get(0).(func(string, *string, string) error); ok { r0 = rf(collectionID, currentLineageFilePath, newLineageFilePath) } else { r0 = ret.Error(0) diff --git a/idl/chromadb/proto/chroma.proto b/idl/chromadb/proto/chroma.proto index 82c2a202490..18c761df3ba 100644 --- a/idl/chromadb/proto/chroma.proto +++ b/idl/chromadb/proto/chroma.proto @@ -58,6 +58,9 @@ message Collection { uint64 total_records_post_compaction = 10; uint64 size_bytes_post_compaction = 11; uint64 last_compaction_time_secs = 12; + optional string version_file_path = 13; + optional string root_collection_id = 14; + optional string lineage_file_path = 15; } message Database { diff --git a/rust/garbage_collector/src/helper.rs b/rust/garbage_collector/src/helper.rs index 71d297c790e..41d3eddf4bb 100644 --- a/rust/garbage_collector/src/helper.rs +++ b/rust/garbage_collector/src/helper.rs @@ -190,6 +190,9 @@ impl ChromaGrpcClients { total_records_post_compaction: 0, size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: None, + root_collection_id: None, + lineage_file_path: None, }), knn: None, metadata: None, diff --git a/rust/segment/src/distributed_spann.rs b/rust/segment/src/distributed_spann.rs index 51956fd8a78..db74e5d4206 100644 --- a/rust/segment/src/distributed_spann.rs +++ b/rust/segment/src/distributed_spann.rs @@ -634,11 +634,7 @@ mod test { dimension: None, tenant: "test".to_string(), database: "test".to_string(), - log_position: 0, - version: 0, - total_records_post_compaction: 0, - size_bytes_post_compaction: 0, - last_compaction_time_secs: 0, + ..Default::default() }; let spann_writer = SpannSegmentWriter::from_segment( diff --git a/rust/sysdb/src/sqlite.rs b/rust/sysdb/src/sqlite.rs index a2107ef7dcc..a0b9b9d294d 100644 --- a/rust/sysdb/src/sqlite.rs +++ b/rust/sysdb/src/sqlite.rs @@ -338,6 +338,9 @@ impl SqliteSysDb { version: 0, size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: None, + root_collection_id: None, + lineage_file_path: None, }) } @@ -724,6 +727,9 @@ impl SqliteSysDb { database: first_row.get(5), size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: None, + root_collection_id: None, + lineage_file_path: None, })) }) .collect::, GetCollectionsError>>()?; diff --git a/rust/sysdb/src/sysdb.rs b/rust/sysdb/src/sysdb.rs index 4627150c9c6..ef83c064094 100644 --- a/rust/sysdb/src/sysdb.rs +++ b/rust/sysdb/src/sysdb.rs @@ -257,6 +257,9 @@ impl SysDb { total_records_post_compaction: 0, size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: None, + root_collection_id: None, + lineage_file_path: None, }; test_sysdb.add_collection(collection.clone()); diff --git a/rust/types/src/collection.rs b/rust/types/src/collection.rs index 5153bbb964b..e1c9ccfae28 100644 --- a/rust/types/src/collection.rs +++ b/rust/types/src/collection.rs @@ -94,6 +94,12 @@ pub struct Collection { pub size_bytes_post_compaction: u64, #[serde(skip)] pub last_compaction_time_secs: u64, + #[serde(skip)] + pub version_file_path: Option, + #[serde(skip)] + pub root_collection_id: Option, + #[serde(skip)] + pub lineage_file_path: Option, } impl Default for Collection { @@ -111,6 +117,9 @@ impl Default for Collection { total_records_post_compaction: 0, size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: None, + root_collection_id: None, + lineage_file_path: None, } } } @@ -226,6 +235,11 @@ impl TryFrom for Collection { total_records_post_compaction: proto_collection.total_records_post_compaction, size_bytes_post_compaction: proto_collection.size_bytes_post_compaction, last_compaction_time_secs: proto_collection.last_compaction_time_secs, + version_file_path: proto_collection.version_file_path, + root_collection_id: proto_collection + .root_collection_id + .map(|uuid| CollectionUuid(Uuid::try_parse(&uuid).unwrap())), + lineage_file_path: proto_collection.lineage_file_path, }) } } @@ -261,6 +275,9 @@ impl TryFrom for chroma_proto::Collection { total_records_post_compaction: value.total_records_post_compaction, size_bytes_post_compaction: value.size_bytes_post_compaction, last_compaction_time_secs: value.last_compaction_time_secs, + version_file_path: value.version_file_path, + root_collection_id: value.root_collection_id.map(|uuid| uuid.0.to_string()), + lineage_file_path: value.lineage_file_path, }) } } @@ -306,6 +323,9 @@ mod test { total_records_post_compaction: 0, size_bytes_post_compaction: 0, last_compaction_time_secs: 0, + version_file_path: Some("version_file_path".to_string()), + root_collection_id: Some("00000000-0000-0000-0000-000000000000".to_string()), + lineage_file_path: Some("lineage_file_path".to_string()), }; let converted_collection: Collection = proto_collection.try_into().unwrap(); assert_eq!( @@ -318,5 +338,19 @@ mod test { assert_eq!(converted_collection.tenant, "baz".to_string()); assert_eq!(converted_collection.database, "qux".to_string()); assert_eq!(converted_collection.total_records_post_compaction, 0); + assert_eq!(converted_collection.size_bytes_post_compaction, 0); + assert_eq!(converted_collection.last_compaction_time_secs, 0); + assert_eq!( + converted_collection.version_file_path, + Some("version_file_path".to_string()) + ); + assert_eq!( + converted_collection.root_collection_id, + Some(CollectionUuid(Uuid::nil())) + ); + assert_eq!( + converted_collection.lineage_file_path, + Some("lineage_file_path".to_string()) + ); } } diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index 84e332a976e..9af81f76944 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -501,11 +501,7 @@ mod tests { dimension: None, tenant: "test-tenant".to_string(), database: "test-database".to_string(), - log_position: 0, - version: 0, - total_records_post_compaction: 0, - size_bytes_post_compaction: 0, - last_compaction_time_secs: 0, + ..Default::default() }), knn: Some(chroma_proto::Segment { id: Uuid::new_v4().to_string(), @@ -606,11 +602,7 @@ mod tests { dimension: None, tenant: "test-tenant".to_string(), database: "test-database".to_string(), - log_position: 0, - version: 0, - total_records_post_compaction: 0, - size_bytes_post_compaction: 0, - last_compaction_time_secs: 0, + ..Default::default() }); let request = chroma_proto::GetPlan { scan: Some(scan_operator.clone()),