Skip to content

Commit 7e6889e

Browse files
committed
[ENH]: add batch get version file paths method to Sysdb
1 parent 8fa5a73 commit 7e6889e

File tree

7 files changed

+131
-3
lines changed

7 files changed

+131
-3
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,10 @@ func (s *Coordinator) DeleteCollectionVersion(ctx context.Context, req *coordina
259259
return s.catalog.DeleteCollectionVersion(ctx, req)
260260
}
261261

262+
func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
263+
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req)
264+
}
265+
262266
// SetDeleteMode sets the delete mode for testing
263267
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
264268
c.deleteMode = mode

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
1213
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao"
1314
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1415
"github.com/pingcap/log"
@@ -1484,6 +1485,34 @@ func (suite *APIsTestSuite) TestForkCollection() {
14841485
suite.Empty(collections)
14851486
}
14861487

1488+
func (suite *APIsTestSuite) TestBatchGetCollectionVersionFilePaths() {
1489+
ctx := context.Background()
1490+
1491+
// Create a new collection
1492+
newCollection := &model.CreateCollection{
1493+
ID: types.NewUniqueID(),
1494+
Name: "test_batch_get_collection_version_file_paths",
1495+
TenantID: suite.tenantName,
1496+
DatabaseName: suite.databaseName,
1497+
}
1498+
1499+
// Create the collection
1500+
_, _, err := suite.coordinator.CreateCollection(ctx, newCollection)
1501+
suite.NoError(err)
1502+
1503+
// Get the version file paths for the collection
1504+
versionFilePaths, err := suite.coordinator.BatchGetCollectionVersionFilePaths(ctx, &coordinatorpb.BatchGetCollectionVersionFilePathsRequest{
1505+
CollectionIds: []string{newCollection.ID.String()},
1506+
})
1507+
suite.NoError(err)
1508+
suite.Len(versionFilePaths, 1)
1509+
1510+
// Verify version file exists in S3
1511+
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, versionFilePaths.CollectionIdToVersionFilePath[newCollection.ID.String()])
1512+
suite.NoError(err)
1513+
suite.True(exists, "Version file should exist in S3")
1514+
}
1515+
14871516
func TestAPIsTestSuite(t *testing.T) {
14881517
testSuite := new(APIsTestSuite)
14891518
suite.Run(t, testSuite)

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1941,6 +1941,26 @@ func (tc *Catalog) DeleteCollectionVersion(ctx context.Context, req *coordinator
19411941
return &result, nil
19421942
}
19431943

1944+
func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
1945+
result := coordinatorpb.BatchGetCollectionVersionFilePathsResponse{
1946+
CollectionIdToVersionFilePath: make(map[string]string),
1947+
}
1948+
1949+
for _, collectionId := range req.CollectionIds {
1950+
collectionIDPtr := &collectionId
1951+
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)
1952+
if err != nil {
1953+
return nil, err
1954+
}
1955+
if collectionEntry == nil {
1956+
return nil, common.ErrCollectionNotFound
1957+
}
1958+
1959+
result.CollectionIdToVersionFilePath[collectionId] = collectionEntry.VersionFileName
1960+
}
1961+
return &result, nil
1962+
}
1963+
19441964
func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
19451965
collectionIDPtr := &collectionID
19461966
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,11 @@ func (s *Server) DeleteCollectionVersion(ctx context.Context, req *coordinatorpb
517517
}
518518
return res, nil
519519
}
520+
521+
func (s *Server) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
522+
res, err := s.coordinator.BatchGetCollectionVersionFilePaths(ctx, req)
523+
if err != nil {
524+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
525+
}
526+
return res, nil
527+
}

idl/chromadb/proto/coordinator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,14 @@ message DeleteCollectionVersionResponse {
456456
map<string, bool> collection_id_to_success = 1;
457457
}
458458

459+
message BatchGetCollectionVersionFilePathsRequest {
460+
repeated string collection_ids = 1;
461+
}
462+
463+
message BatchGetCollectionVersionFilePathsResponse {
464+
map<string, string> collection_id_to_version_file_path = 1;
465+
}
466+
459467
service SysDB {
460468
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
461469
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
@@ -486,4 +494,5 @@ service SysDB {
486494
rpc ListCollectionsToGc(ListCollectionsToGcRequest) returns (ListCollectionsToGcResponse) {}
487495
rpc MarkVersionForDeletion(MarkVersionForDeletionRequest) returns (MarkVersionForDeletionResponse) {}
488496
rpc DeleteCollectionVersion(DeleteCollectionVersionRequest) returns (DeleteCollectionVersionResponse) {}
497+
rpc BatchGetCollectionVersionFilePaths(BatchGetCollectionVersionFilePathsRequest) returns (BatchGetCollectionVersionFilePathsResponse) {}
489498
}

rust/sysdb/src/sysdb.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use chroma_types::{
1919
UpdateCollectionConfiguration, UpdateCollectionError, VectorIndexConfiguration,
2020
};
2121
use chroma_types::{
22-
Collection, CollectionConversionError, CollectionUuid, FlushCompactionResponse,
23-
FlushCompactionResponseConversionError, ForkCollectionError, Segment, SegmentConversionError,
24-
SegmentScope, Tenant,
22+
BatchGetCollectionVersionFilePathsError, Collection, CollectionConversionError, CollectionUuid,
23+
FlushCompactionResponse, FlushCompactionResponseConversionError, ForkCollectionError, Segment,
24+
SegmentConversionError, SegmentScope, Tenant,
2525
};
2626
use std::collections::HashMap;
2727
use std::fmt::Debug;
@@ -385,6 +385,20 @@ impl SysDb {
385385
}
386386
}
387387

388+
pub async fn batch_get_collection_version_file_paths(
389+
&mut self,
390+
collection_ids: Vec<CollectionUuid>,
391+
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
392+
match self {
393+
SysDb::Grpc(grpc) => {
394+
grpc.batch_get_collection_version_file_paths(collection_ids)
395+
.await
396+
}
397+
SysDb::Sqlite(_) => todo!(),
398+
SysDb::Test(_) => todo!(),
399+
}
400+
}
401+
388402
pub async fn get_last_compaction_time(
389403
&mut self,
390404
tanant_ids: Vec<String>,
@@ -1077,6 +1091,33 @@ impl GrpcSysDb {
10771091
})
10781092
}
10791093

1094+
async fn batch_get_collection_version_file_paths(
1095+
&mut self,
1096+
collection_ids: Vec<CollectionUuid>,
1097+
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
1098+
let res = self
1099+
.client
1100+
.batch_get_collection_version_file_paths(
1101+
chroma_proto::BatchGetCollectionVersionFilePathsRequest {
1102+
collection_ids: collection_ids
1103+
.into_iter()
1104+
.map(|id| id.0.to_string())
1105+
.collect(),
1106+
},
1107+
)
1108+
.await?;
1109+
let collection_id_to_path = res.into_inner().collection_id_to_version_file_path;
1110+
let mut result = HashMap::new();
1111+
for (key, value) in collection_id_to_path {
1112+
let collection_id = CollectionUuid(
1113+
Uuid::try_parse(&key)
1114+
.map_err(|err| BatchGetCollectionVersionFilePathsError::Uuid(err, key))?,
1115+
);
1116+
result.insert(collection_id, value);
1117+
}
1118+
Ok(result)
1119+
}
1120+
10801121
async fn get_last_compaction_time(
10811122
&mut self,
10821123
tenant_ids: Vec<String>,

rust/types/src/api_types.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,23 @@ impl ChromaError for GetCollectionWithSegmentsError {
9797
}
9898
}
9999

100+
#[derive(Debug, Error)]
101+
pub enum BatchGetCollectionVersionFilePathsError {
102+
#[error("Grpc error: {0}")]
103+
Grpc(#[from] Status),
104+
#[error("Could not parse UUID from string {1}: {0}")]
105+
Uuid(uuid::Error, String),
106+
}
107+
108+
impl ChromaError for BatchGetCollectionVersionFilePathsError {
109+
fn code(&self) -> ErrorCodes {
110+
match self {
111+
BatchGetCollectionVersionFilePathsError::Grpc(status) => status.code().into(),
112+
BatchGetCollectionVersionFilePathsError::Uuid(_, _) => ErrorCodes::InvalidArgument,
113+
}
114+
}
115+
}
116+
100117
#[derive(Serialize, ToSchema)]
101118
pub struct ResetResponse {}
102119

0 commit comments

Comments
 (0)