Skip to content

Commit 93be17d

Browse files
committed
[ENH]: add batch get version file paths method to Sysdb
1 parent 7202f83 commit 93be17d

File tree

10 files changed

+175
-3
lines changed

10 files changed

+175
-3
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ func (s *Coordinator) DeleteCollectionVersion(ctx context.Context, req *coordina
263263
return s.catalog.DeleteCollectionVersion(ctx, req)
264264
}
265265

266+
func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
267+
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req)
268+
}
269+
266270
// SetDeleteMode sets the delete mode for testing
267271
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
268272
c.deleteMode = mode

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,37 @@ func (suite *APIsTestSuite) TestForkCollection() {
15581558
suite.True(exists, "Lineage file should exist in S3")
15591559
}
15601560

1561+
func (suite *APIsTestSuite) TestBatchGetCollectionVersionFilePaths() {
1562+
ctx := context.Background()
1563+
1564+
// Create a new collection
1565+
newCollection := &model.CreateCollection{
1566+
ID: types.NewUniqueID(),
1567+
Name: "test_batch_get_collection_version_file_paths",
1568+
TenantID: suite.tenantName,
1569+
DatabaseName: suite.databaseName,
1570+
}
1571+
1572+
newSegments := []*model.CreateSegment{}
1573+
1574+
// Create the collection
1575+
suite.coordinator.catalog.versionFileEnabled = true
1576+
_, _, err := suite.coordinator.CreateCollectionAndSegments(ctx, newCollection, newSegments)
1577+
suite.NoError(err)
1578+
1579+
// Get the version file paths for the collection
1580+
versionFilePaths, err := suite.coordinator.BatchGetCollectionVersionFilePaths(ctx, &coordinatorpb.BatchGetCollectionVersionFilePathsRequest{
1581+
CollectionIds: []string{newCollection.ID.String()},
1582+
})
1583+
suite.NoError(err)
1584+
suite.Len(versionFilePaths.CollectionIdToVersionFilePath, 1)
1585+
1586+
// Verify version file exists in S3
1587+
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, versionFilePaths.CollectionIdToVersionFilePath[newCollection.ID.String()])
1588+
suite.NoError(err)
1589+
suite.True(exists, "Version file should exist in S3")
1590+
}
1591+
15611592
func (suite *APIsTestSuite) TestCountForks() {
15621593
ctx := context.Background()
15631594

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,20 @@ func (tc *Catalog) DeleteCollectionVersion(ctx context.Context, req *coordinator
20002000
return &result, nil
20012001
}
20022002

2003+
func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
2004+
result := coordinatorpb.BatchGetCollectionVersionFilePathsResponse{
2005+
CollectionIdToVersionFilePath: make(map[string]string),
2006+
}
2007+
2008+
paths, err := tc.metaDomain.CollectionDb(ctx).BatchGetCollectionVersionFilePaths(req.CollectionIds)
2009+
if err != nil {
2010+
return nil, err
2011+
}
2012+
result.CollectionIdToVersionFilePath = paths
2013+
2014+
return &result, nil
2015+
}
2016+
20032017
func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
20042018
collectionIDPtr := &collectionID
20052019
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,3 +545,11 @@ func (s *Server) DeleteCollectionVersion(ctx context.Context, req *coordinatorpb
545545
}
546546
return res, nil
547547
}
548+
549+
func (s *Server) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
550+
res, err := s.coordinator.BatchGetCollectionVersionFilePaths(ctx, req)
551+
if err != nil {
552+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
553+
}
554+
return res, nil
555+
}

go/pkg/sysdb/metastore/db/dao/collection.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,20 @@ func (s *collectionDb) UpdateCollectionLineageFilePath(collectionID string, curr
527527
}).Error
528528

529529
}
530+
531+
func (s *collectionDb) BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error) {
532+
var collections []dbmodel.Collection
533+
err := s.read_db.Model(&dbmodel.Collection{}).
534+
Select("id, version_file_name").
535+
Where("id IN ?", collectionIDs).
536+
Find(&collections).Error
537+
if err != nil {
538+
return nil, err
539+
}
540+
541+
result := make(map[string]string)
542+
for _, collection := range collections {
543+
result[collection.ID] = collection.VersionFileName
544+
}
545+
return result, nil
546+
}

go/pkg/sysdb/metastore/db/dbmodel/collection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,5 @@ type ICollectionDb interface {
6868
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
6969
LockCollection(collectionID string) error
7070
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error
71+
BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error)
7172
}

go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/chromadb/proto/coordinator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,14 @@ message DeleteCollectionVersionResponse {
467467
map<string, bool> collection_id_to_success = 1;
468468
}
469469

470+
message BatchGetCollectionVersionFilePathsRequest {
471+
repeated string collection_ids = 1;
472+
}
473+
474+
message BatchGetCollectionVersionFilePathsResponse {
475+
map<string, string> collection_id_to_version_file_path = 1;
476+
}
477+
470478
service SysDB {
471479
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
472480
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
@@ -498,4 +506,5 @@ service SysDB {
498506
rpc ListCollectionsToGc(ListCollectionsToGcRequest) returns (ListCollectionsToGcResponse) {}
499507
rpc MarkVersionForDeletion(MarkVersionForDeletionRequest) returns (MarkVersionForDeletionResponse) {}
500508
rpc DeleteCollectionVersion(DeleteCollectionVersionRequest) returns (DeleteCollectionVersionResponse) {}
509+
rpc BatchGetCollectionVersionFilePaths(BatchGetCollectionVersionFilePathsRequest) returns (BatchGetCollectionVersionFilePathsResponse) {}
501510
}

rust/sysdb/src/sysdb.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use chroma_types::{
1919
UpdateCollectionConfiguration, UpdateCollectionError, VectorIndexConfiguration,
2020
};
2121
use chroma_types::{
22-
Collection, CollectionConversionError, CollectionUuid, CountForksError,
23-
FlushCompactionResponse, FlushCompactionResponseConversionError, ForkCollectionError, Segment,
24-
SegmentConversionError, SegmentScope, Tenant,
22+
BatchGetCollectionVersionFilePathsError, Collection, CollectionConversionError, CollectionUuid,
23+
CountForksError, FlushCompactionResponse, FlushCompactionResponseConversionError,
24+
ForkCollectionError, Segment, SegmentConversionError, SegmentScope, Tenant,
2525
};
2626
use std::collections::HashMap;
2727
use std::fmt::Debug;
@@ -397,6 +397,20 @@ impl SysDb {
397397
}
398398
}
399399

400+
pub async fn batch_get_collection_version_file_paths(
401+
&mut self,
402+
collection_ids: Vec<CollectionUuid>,
403+
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
404+
match self {
405+
SysDb::Grpc(grpc) => {
406+
grpc.batch_get_collection_version_file_paths(collection_ids)
407+
.await
408+
}
409+
SysDb::Sqlite(_) => todo!(),
410+
SysDb::Test(_) => todo!(),
411+
}
412+
}
413+
400414
pub async fn get_last_compaction_time(
401415
&mut self,
402416
tanant_ids: Vec<String>,
@@ -1110,6 +1124,33 @@ impl GrpcSysDb {
11101124
})
11111125
}
11121126

1127+
async fn batch_get_collection_version_file_paths(
1128+
&mut self,
1129+
collection_ids: Vec<CollectionUuid>,
1130+
) -> Result<HashMap<CollectionUuid, String>, BatchGetCollectionVersionFilePathsError> {
1131+
let res = self
1132+
.client
1133+
.batch_get_collection_version_file_paths(
1134+
chroma_proto::BatchGetCollectionVersionFilePathsRequest {
1135+
collection_ids: collection_ids
1136+
.into_iter()
1137+
.map(|id| id.0.to_string())
1138+
.collect(),
1139+
},
1140+
)
1141+
.await?;
1142+
let collection_id_to_path = res.into_inner().collection_id_to_version_file_path;
1143+
let mut result = HashMap::new();
1144+
for (key, value) in collection_id_to_path {
1145+
let collection_id = CollectionUuid(
1146+
Uuid::try_parse(&key)
1147+
.map_err(|err| BatchGetCollectionVersionFilePathsError::Uuid(err, key))?,
1148+
);
1149+
result.insert(collection_id, value);
1150+
}
1151+
Ok(result)
1152+
}
1153+
11131154
async fn get_last_compaction_time(
11141155
&mut self,
11151156
tenant_ids: Vec<String>,

rust/types/src/api_types.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,23 @@ impl ChromaError for GetCollectionWithSegmentsError {
9797
}
9898
}
9999

100+
#[derive(Debug, Error)]
101+
pub enum BatchGetCollectionVersionFilePathsError {
102+
#[error("Grpc error: {0}")]
103+
Grpc(#[from] Status),
104+
#[error("Could not parse UUID from string {1}: {0}")]
105+
Uuid(uuid::Error, String),
106+
}
107+
108+
impl ChromaError for BatchGetCollectionVersionFilePathsError {
109+
fn code(&self) -> ErrorCodes {
110+
match self {
111+
BatchGetCollectionVersionFilePathsError::Grpc(status) => status.code().into(),
112+
BatchGetCollectionVersionFilePathsError::Uuid(_, _) => ErrorCodes::InvalidArgument,
113+
}
114+
}
115+
}
116+
100117
#[derive(Serialize, ToSchema)]
101118
pub struct ResetResponse {}
102119

0 commit comments

Comments
 (0)