Skip to content

[ENH]: SysDb should return lineage, version file paths and root collection ID on collections #4557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
suite.True(created)
suite.Equal(newCollection.ID, createdCollection.ID)
suite.Equal(newCollection.Name, createdCollection.Name)
suite.NotNil(createdCollection.VersionFileName)
// suite.Equal(len(segments), len(createdSegments))

// Verify the collection was created
Expand Down Expand Up @@ -352,8 +353,8 @@ func (suite *APIsTestSuite) TestCreateCollectionAndSegments() {
suite.ElementsMatch(expected_ids, actual_ids)

// Validate version file
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, newCollection.ID.String(), "0")
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
suite.NotNil(collection.VersionFileName)
versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName)
suite.NoError(err)
suite.NotNil(versionFile)
v0 := versionFile.VersionHistory.Versions[0]
Expand Down Expand Up @@ -1515,8 +1516,9 @@ func (suite *APIsTestSuite) TestForkCollection() {
}

// Check version file of forked collection
versionFilePathPrefix := suite.s3MetaStore.GetVersionFilePath(collection.TenantID, suite.databaseId, forkCollection.TargetCollectionID.String(), "0")
versionFile, err := suite.s3MetaStore.GetVersionFile(versionFilePathPrefix)
suite.Equal(collection.RootCollectionID, &sourceCreateCollection.ID)
suite.NotNil(collection.VersionFileName)
versionFile, err := suite.s3MetaStore.GetVersionFile(collection.VersionFileName)
suite.NoError(err)
suite.NotNil(versionFile)
v0 := versionFile.VersionHistory.Versions[0]
Expand Down Expand Up @@ -1553,7 +1555,11 @@ func (suite *APIsTestSuite) TestForkCollection() {
// ListCollectionsToGc groups by fork trees and should always return the root of the tree
suite.Equal(forkCollectionWithSameName.SourceCollectionID, res[0].ID)

exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *res[0].LineageFilePath)
// Get source collection to grab lineage path and validate it exists
sourceCollection, err := suite.coordinator.GetCollections(ctx, sourceCreateCollection.ID, nil, sourceCreateCollection.TenantID, sourceCreateCollection.DatabaseName, nil, nil)
suite.NoError(err)
suite.Equal(1, len(sourceCollection))
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *sourceCollection[0].LineageFileName)
suite.NoError(err)
suite.True(exists, "Lineage file should exist in S3")
}
Expand Down
5 changes: 3 additions & 2 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ type Collection struct {
LogPosition int64
Version int32
RootCollectionID *types.UniqueID
LineageFileName string
LineageFileName *string
UpdatedAt types.Timestamp
TotalRecordsPostCompaction uint64
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
LastCompactionTimeSecs uint64
VersionFileName string
}

type CollectionToGc struct {
Expand All @@ -42,7 +43,7 @@ type CreateCollection struct {
DatabaseName string
Ts types.Timestamp
LogPosition int64
RootCollectionId string
RootCollectionId *string
TotalRecordsPostCompaction uint64
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
LastCompactionTimeSecs uint64
Expand Down
7 changes: 5 additions & 2 deletions go/pkg/sysdb/coordinator/model_db_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
collections := make([]*model.Collection, 0, len(collectionAndMetadataList))
for _, collectionAndMetadata := range collectionAndMetadataList {
var rootCollectionID *types.UniqueID
if id, err := types.Parse(collectionAndMetadata.Collection.RootCollectionId); err == nil {
rootCollectionID = &id
if collectionAndMetadata.Collection.RootCollectionId != nil {
if id, err := types.Parse(*collectionAndMetadata.Collection.RootCollectionId); err == nil {
rootCollectionID = &id
}
}
collection := &model.Collection{
ID: types.MustParse(collectionAndMetadata.Collection.ID),
Expand All @@ -33,6 +35,7 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
LastCompactionTimeSecs: collectionAndMetadata.Collection.LastCompactionTimeSecs,
RootCollectionID: rootCollectionID,
LineageFileName: collectionAndMetadata.Collection.LineageFileName,
VersionFileName: collectionAndMetadata.Collection.VersionFileName,
}
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
collections = append(collections, collection)
Expand Down
14 changes: 7 additions & 7 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,14 +819,14 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
}

func (tc *Catalog) getLineageFile(ctx context.Context, collection *model.Collection) (*coordinatorpb.CollectionLineageFile, error) {
if len(collection.LineageFileName) == 0 {
if collection.LineageFileName == nil {
// There is no lineage file for the given collection
return &coordinatorpb.CollectionLineageFile{
Dependencies: []*coordinatorpb.CollectionVersionDependency{},
}, nil
}

return tc.s3Store.GetLineageFile(collection.LineageFileName)
return tc.s3Store.GetLineageFile(*collection.LineageFileName)
}

func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
Expand All @@ -852,8 +852,8 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
return err
}

if len(sourceCollectionDb.RootCollectionId) > 0 {
rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId)
if sourceCollectionDb.RootCollectionId != nil {
rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId)
if err != nil {
return err
}
Expand Down Expand Up @@ -926,7 +926,7 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
DatabaseName: sourceCollection.DatabaseName,
Ts: ts.Unix(),
LogPosition: sourceCollection.LogPosition,
RootCollectionId: rootCollectionIDStr,
RootCollectionId: &rootCollectionIDStr,
TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction,
SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction,
LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs,
Expand Down Expand Up @@ -1005,8 +1005,8 @@ func (tc *Catalog) CountForks(ctx context.Context, sourceCollectionID types.Uniq
return 0, common.ErrCollectionNotFound
}

if len(sourceCollectionDb.RootCollectionId) > 0 {
rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId)
if sourceCollectionDb.RootCollectionId != nil {
rootCollectionID, err = types.Parse(*sourceCollectionDb.RootCollectionId)
if err != nil {
return 0, err
}
Expand Down
8 changes: 8 additions & 0 deletions go/pkg/sysdb/grpc/proto_model_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
TotalRecordsPostCompaction: collection.TotalRecordsPostCompaction,
SizeBytesPostCompaction: collection.SizeBytesPostCompaction,
LastCompactionTimeSecs: collection.LastCompactionTimeSecs,
VersionFilePath: &collection.VersionFileName,
LineageFilePath: collection.LineageFileName,
}

if collection.RootCollectionID != nil {
rootCollectionId := collection.RootCollectionID.String()
collectionpb.RootCollectionId = &rootCollectionId
}

if collection.Metadata == nil {
return collectionpb
}
Expand Down
10 changes: 5 additions & 5 deletions go/pkg/sysdb/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *collectionDb) DeleteAll() error {
func (s *collectionDb) GetCollectionEntry(collectionID *string, databaseName *string) (*dbmodel.Collection, error) {
var collections []*dbmodel.Collection
query := s.db.Table("collections").
Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id").
Select("collections.id, collections.name, collections.database_id, collections.is_deleted, collections.tenant, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name").
Joins("INNER JOIN databases ON collections.database_id = databases.id").
Where("collections.id = ?", collectionID)

Expand Down Expand Up @@ -116,8 +116,8 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string,
LogPosition int64 `gorm:"column:log_position"`
Version int32 `gorm:"column:version"`
VersionFileName string `gorm:"column:version_file_name"`
RootCollectionId string `gorm:"column:root_collection_id"`
LineageFileName string `gorm:"column:lineage_file_name"`
RootCollectionId *string `gorm:"column:root_collection_id"`
LineageFileName *string `gorm:"column:lineage_file_name"`
TotalRecordsPostCompaction uint64 `gorm:"column:total_records_post_compaction"`
SizeBytesPostCompaction uint64 `gorm:"column:size_bytes_post_compaction"`
LastCompactionTimeSecs uint64 `gorm:"column:last_compaction_time_secs"`
Expand All @@ -136,7 +136,7 @@ func (s *collectionDb) getCollections(id *string, name *string, tenantID string,
}

query := s.db.Table("collections").
Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, collections.lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant").
Select("collections.id as collection_id, collections.name as collection_name, collections.configuration_json_str, collections.dimension, collections.database_id, collections.ts as collection_ts, collections.is_deleted, collections.created_at as collection_created_at, collections.updated_at as collection_updated_at, collections.log_position, collections.version, collections.version_file_name, collections.root_collection_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name, collections.total_records_post_compaction, collections.size_bytes_post_compaction, collections.last_compaction_time_secs, databases.name as database_name, databases.tenant_id as db_tenant_id, collections.tenant as tenant").
Joins("INNER JOIN databases ON collections.database_id = databases.id").
Order("collections.created_at ASC")

Expand Down Expand Up @@ -519,7 +519,7 @@ func (s *collectionDb) LockCollection(collectionID string) error {
return nil
}

func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName string, newLineageFileName string) error {
func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, currentLineageFileName *string, newLineageFileName string) error {
return s.db.Model(&dbmodel.Collection{}).
Where("id = ? AND (lineage_file_name IS NULL OR lineage_file_name = ?)", collectionID, currentLineageFileName).
Updates(map[string]interface{}{
Expand Down
6 changes: 3 additions & 3 deletions go/pkg/sysdb/metastore/db/dbmodel/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Collection struct {
LogPosition int64 `gorm:"log_position;default:0"`
Version int32 `gorm:"version;default:0"`
VersionFileName string `gorm:"version_file_name"`
RootCollectionId string `gorm:"column:root_collection_id"`
LineageFileName string `gorm:"column:lineage_file_name"`
RootCollectionId *string `gorm:"column:root_collection_id"`
LineageFileName *string `gorm:"column:lineage_file_name"`
TotalRecordsPostCompaction uint64 `gorm:"total_records_post_compaction;default:0"`
SizeBytesPostCompaction uint64 `gorm:"size_bytes_post_compaction;default:0"`
LastCompactionTimeSecs uint64 `gorm:"last_compaction_time_secs;default:0"`
Expand Down Expand Up @@ -67,5 +67,5 @@ type ICollectionDb interface {
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error)
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
LockCollection(collectionID string) error
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error
}
4 changes: 2 additions & 2 deletions go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions idl/chromadb/proto/chroma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ message Collection {
uint64 total_records_post_compaction = 10;
uint64 size_bytes_post_compaction = 11;
uint64 last_compaction_time_secs = 12;
optional string version_file_path = 13;
optional string root_collection_id = 14;
optional string lineage_file_path = 15;
}

message Database {
Expand Down
3 changes: 3 additions & 0 deletions rust/garbage_collector/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ impl ChromaGrpcClients {
total_records_post_compaction: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: None,
root_collection_id: None,
lineage_file_path: None,
}),
knn: None,
metadata: None,
Expand Down
6 changes: 1 addition & 5 deletions rust/segment/src/distributed_spann.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,7 @@ mod test {
dimension: None,
tenant: "test".to_string(),
database: "test".to_string(),
log_position: 0,
version: 0,
total_records_post_compaction: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
..Default::default()
};

let spann_writer = SpannSegmentWriter::from_segment(
Expand Down
6 changes: 6 additions & 0 deletions rust/sysdb/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ impl SqliteSysDb {
version: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: None,
root_collection_id: None,
lineage_file_path: None,
})
}

Expand Down Expand Up @@ -724,6 +727,9 @@ impl SqliteSysDb {
database: first_row.get(5),
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: None,
root_collection_id: None,
lineage_file_path: None,
}))
})
.collect::<Result<Vec<_>, GetCollectionsError>>()?;
Expand Down
3 changes: 3 additions & 0 deletions rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ impl SysDb {
total_records_post_compaction: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: None,
root_collection_id: None,
lineage_file_path: None,
};

test_sysdb.add_collection(collection.clone());
Expand Down
34 changes: 34 additions & 0 deletions rust/types/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ pub struct Collection {
pub size_bytes_post_compaction: u64,
#[serde(skip)]
pub last_compaction_time_secs: u64,
#[serde(skip)]
pub version_file_path: Option<String>,
#[serde(skip)]
pub root_collection_id: Option<CollectionUuid>,
#[serde(skip)]
pub lineage_file_path: Option<String>,
}

impl Default for Collection {
Expand All @@ -111,6 +117,9 @@ impl Default for Collection {
total_records_post_compaction: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: None,
root_collection_id: None,
lineage_file_path: None,
}
}
}
Expand Down Expand Up @@ -226,6 +235,11 @@ impl TryFrom<chroma_proto::Collection> for Collection {
total_records_post_compaction: proto_collection.total_records_post_compaction,
size_bytes_post_compaction: proto_collection.size_bytes_post_compaction,
last_compaction_time_secs: proto_collection.last_compaction_time_secs,
version_file_path: proto_collection.version_file_path,
root_collection_id: proto_collection
.root_collection_id
.map(|uuid| CollectionUuid(Uuid::try_parse(&uuid).unwrap())),
lineage_file_path: proto_collection.lineage_file_path,
})
}
}
Expand Down Expand Up @@ -261,6 +275,9 @@ impl TryFrom<Collection> for chroma_proto::Collection {
total_records_post_compaction: value.total_records_post_compaction,
size_bytes_post_compaction: value.size_bytes_post_compaction,
last_compaction_time_secs: value.last_compaction_time_secs,
version_file_path: value.version_file_path,
root_collection_id: value.root_collection_id.map(|uuid| uuid.0.to_string()),
lineage_file_path: value.lineage_file_path,
})
}
}
Expand Down Expand Up @@ -306,6 +323,9 @@ mod test {
total_records_post_compaction: 0,
size_bytes_post_compaction: 0,
last_compaction_time_secs: 0,
version_file_path: Some("version_file_path".to_string()),
root_collection_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
lineage_file_path: Some("lineage_file_path".to_string()),
};
let converted_collection: Collection = proto_collection.try_into().unwrap();
assert_eq!(
Expand All @@ -318,5 +338,19 @@ mod test {
assert_eq!(converted_collection.tenant, "baz".to_string());
assert_eq!(converted_collection.database, "qux".to_string());
assert_eq!(converted_collection.total_records_post_compaction, 0);
assert_eq!(converted_collection.size_bytes_post_compaction, 0);
assert_eq!(converted_collection.last_compaction_time_secs, 0);
assert_eq!(
converted_collection.version_file_path,
Some("version_file_path".to_string())
);
assert_eq!(
converted_collection.root_collection_id,
Some(CollectionUuid(Uuid::nil()))
);
assert_eq!(
converted_collection.lineage_file_path,
Some("lineage_file_path".to_string())
);
}
}
Loading
Loading