Skip to content

[ENH]: add batch get version file paths method to Sysdb #4432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ func (s *Coordinator) DeleteCollectionVersion(ctx context.Context, req *coordina
return s.catalog.DeleteCollectionVersion(ctx, req)
}

func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req.CollectionIds)
}

// SetDeleteMode sets the delete mode for testing
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
c.deleteMode = mode
Expand Down
31 changes: 31 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,37 @@ func (suite *APIsTestSuite) TestForkCollection() {
suite.True(exists, "Lineage file should exist in S3")
}

func (suite *APIsTestSuite) TestBatchGetCollectionVersionFilePaths() {
ctx := context.Background()

// Create a new collection
newCollection := &model.CreateCollection{
ID: types.NewUniqueID(),
Name: "test_batch_get_collection_version_file_paths",
TenantID: suite.tenantName,
DatabaseName: suite.databaseName,
}

newSegments := []*model.Segment{}

// Create the collection
suite.coordinator.catalog.versionFileEnabled = true
_, _, err := suite.coordinator.CreateCollectionAndSegments(ctx, newCollection, newSegments)
suite.NoError(err)

// Get the version file paths for the collection
versionFilePaths, err := suite.coordinator.BatchGetCollectionVersionFilePaths(ctx, &coordinatorpb.BatchGetCollectionVersionFilePathsRequest{
CollectionIds: []string{newCollection.ID.String()},
})
suite.NoError(err)
suite.Len(versionFilePaths.CollectionIdToVersionFilePath, 1)

// Verify version file exists in S3
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, versionFilePaths.CollectionIdToVersionFilePath[newCollection.ID.String()])
suite.NoError(err)
suite.True(exists, "Version file should exist in S3")
}

func (suite *APIsTestSuite) TestCountForks() {
ctx := context.Background()

Expand Down
14 changes: 14 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,20 @@ func (tc *Catalog) DeleteCollectionVersion(ctx context.Context, req *coordinator
return &result, nil
}

func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, collectionIds []string) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
result := coordinatorpb.BatchGetCollectionVersionFilePathsResponse{
CollectionIdToVersionFilePath: make(map[string]string),
}

paths, err := tc.metaDomain.CollectionDb(ctx).BatchGetCollectionVersionFilePaths(collectionIds)
if err != nil {
return nil, err
}
result.CollectionIdToVersionFilePath = paths

return &result, nil
}

func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
collectionIDPtr := &collectionID
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)
Expand Down
8 changes: 8 additions & 0 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,11 @@ func (s *Server) DeleteCollectionVersion(ctx context.Context, req *coordinatorpb
}
return res, nil
}

func (s *Server) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
res, err := s.coordinator.BatchGetCollectionVersionFilePaths(ctx, req)
if err != nil {
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}
return res, nil
}
17 changes: 17 additions & 0 deletions go/pkg/sysdb/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,20 @@ func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, curr
}).Error

}

func (s *collectionDb) BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error) {
var collections []dbmodel.Collection
err := s.read_db.Model(&dbmodel.Collection{}).
Select("id, version_file_name").
Where("id IN ?", collectionIDs).
Find(&collections).Error
if err != nil {
return nil, err
}

result := make(map[string]string)
for _, collection := range collections {
result[collection.ID] = collection.VersionFileName
}
return result, nil
}
1 change: 1 addition & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ type ICollectionDb interface {
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
LockCollection(collectionID string) error
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error
BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error)
}
30 changes: 30 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ message DeleteCollectionVersionResponse {
map<string, bool> collection_id_to_success = 1;
}

message BatchGetCollectionVersionFilePathsRequest {
repeated string collection_ids = 1;
}

message BatchGetCollectionVersionFilePathsResponse {
map<string, string> collection_id_to_version_file_path = 1;
}

service SysDB {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
Expand Down Expand Up @@ -498,4 +506,5 @@ service SysDB {
rpc ListCollectionsToGc(ListCollectionsToGcRequest) returns (ListCollectionsToGcResponse) {}
rpc MarkVersionForDeletion(MarkVersionForDeletionRequest) returns (MarkVersionForDeletionResponse) {}
rpc DeleteCollectionVersion(DeleteCollectionVersionRequest) returns (DeleteCollectionVersionResponse) {}
rpc BatchGetCollectionVersionFilePaths(BatchGetCollectionVersionFilePathsRequest) returns (BatchGetCollectionVersionFilePathsResponse) {}
}
50 changes: 47 additions & 3 deletions rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use chroma_types::{
UpdateCollectionConfiguration, UpdateCollectionError, VectorIndexConfiguration,
};
use chroma_types::{
Collection, CollectionConversionError, CollectionUuid, CountForksError,
FlushCompactionResponse, FlushCompactionResponseConversionError, ForkCollectionError, Segment,
SegmentConversionError, SegmentScope, Tenant,
BatchGetCollectionVersionFilePathsError, Collection, CollectionConversionError, CollectionUuid,
CountForksError, FlushCompactionResponse, FlushCompactionResponseConversionError,
ForkCollectionError, Segment, SegmentConversionError, SegmentScope, Tenant,
};
use std::collections::HashMap;
use std::fmt::Debug;
Expand Down Expand Up @@ -400,6 +400,23 @@ impl SysDb {
}
}

pub async fn batch_get_collection_version_file_paths(
&mut self,
collection_ids: Vec<CollectionUuid>,
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
match self {
SysDb::Grpc(grpc) => {
grpc.batch_get_collection_version_file_paths(collection_ids)
.await
}
SysDb::Sqlite(_) => todo!(),
SysDb::Test(test) => {
test.batch_get_collection_version_file_paths(collection_ids)
.await
}
}
}

pub async fn get_last_compaction_time(
&mut self,
tanant_ids: Vec<String>,
Expand Down Expand Up @@ -1114,6 +1131,33 @@ impl GrpcSysDb {
})
}

async fn batch_get_collection_version_file_paths(
&mut self,
collection_ids: Vec<CollectionUuid>,
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
let res = self
.client
.batch_get_collection_version_file_paths(
chroma_proto::BatchGetCollectionVersionFilePathsRequest {
collection_ids: collection_ids
.into_iter()
.map(|id| id.0.to_string())
.collect(),
},
)
.await?;
let collection_id_to_path = res.into_inner().collection_id_to_version_file_path;
let mut result = HashMap::new();
for (key, value) in collection_id_to_path {
let collection_id = CollectionUuid(
Uuid::try_parse(&key)
.map_err(|err| BatchGetCollectionVersionFilePathsError::Uuid(err, key))?,
);
result.insert(collection_id, value);
}
Ok(result)
}

async fn get_last_compaction_time(
&mut self,
tenant_ids: Vec<String>,
Expand Down
41 changes: 37 additions & 4 deletions rust/sysdb/src/test_sysdb.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use chroma_types::{
Collection, CollectionAndSegments, CollectionUuid, CountForksError, Database,
FlushCompactionResponse, GetCollectionSizeError, GetCollectionWithSegmentsError,
GetSegmentsError, ListDatabasesError, ListDatabasesResponse, Segment, SegmentFlushInfo,
SegmentScope, SegmentType, Tenant,
BatchGetCollectionVersionFilePathsError, Collection, CollectionAndSegments, CollectionUuid,
CountForksError, Database, FlushCompactionResponse, GetCollectionSizeError,
GetCollectionWithSegmentsError, GetSegmentsError, ListDatabasesError, ListDatabasesResponse,
Segment, SegmentFlushInfo, SegmentScope, SegmentType, Tenant,
};
use chroma_types::{GetCollectionsError, SegmentUuid};
use parking_lot::Mutex;
Expand Down Expand Up @@ -93,6 +93,18 @@ impl TestSysDb {
.insert(tenant, last_compaction_time);
}

pub fn set_collection_version_file_path(
&mut self,
collection_id: CollectionUuid,
version_file_path: String,
) {
let mut inner = self.inner.lock();

inner
.collection_to_version_file_name
.insert(collection_id, version_file_path);
}

fn filter_collections(
collection: &Collection,
collection_id: Option<CollectionUuid>,
Expand Down Expand Up @@ -549,4 +561,25 @@ impl TestSysDb {
) -> Result<usize, CountForksError> {
Ok(10)
}

pub(crate) async fn batch_get_collection_version_file_paths(
&self,
collection_ids: Vec<CollectionUuid>,
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
let inner = self.inner.lock();
let mut paths = HashMap::new();
for collection_id in collection_ids {
if let Some(path) = inner.collection_to_version_file_name.get(&collection_id) {
paths.insert(collection_id, path.clone());
} else {
return Err(BatchGetCollectionVersionFilePathsError::Grpc(
tonic::Status::not_found(format!(
"Version file not found for collection: {}",
collection_id
)),
));
}
}
Ok(paths)
}
}
17 changes: 17 additions & 0 deletions rust/types/src/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ impl ChromaError for GetCollectionWithSegmentsError {
}
}

#[derive(Debug, Error)]
pub enum BatchGetCollectionVersionFilePathsError {
#[error("Grpc error: {0}")]
Grpc(#[from] Status),
#[error("Could not parse UUID from string {1}: {0}")]
Uuid(uuid::Error, String),
}

impl ChromaError for BatchGetCollectionVersionFilePathsError {
fn code(&self) -> ErrorCodes {
match self {
BatchGetCollectionVersionFilePathsError::Grpc(status) => status.code().into(),
BatchGetCollectionVersionFilePathsError::Uuid(_, _) => ErrorCodes::InvalidArgument,
}
}
}

#[derive(Serialize, ToSchema)]
pub struct ResetResponse {}

Expand Down
Loading