Skip to content

Commit 0e2236f

Browse files
committed
[ENH]: garbage collector v2 orchestrator (supports forking)
1 parent b989fec commit 0e2236f

23 files changed

+2458
-347
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/sysdb/coordinator/coordinator.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ func (s *Coordinator) GetTenant(ctx context.Context, getTenant *model.GetTenant)
9797
return tenant, nil
9898
}
9999

100-
func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment) (*model.Collection, bool, error) {
101-
collection, created, err := s.catalog.CreateCollectionAndSegments(ctx, createCollection, createSegments, createCollection.Ts)
100+
func (s *Coordinator) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, initialFilePaths []*model.FlushSegmentCompaction) (*model.Collection, bool, error) {
101+
collection, created, err := s.catalog.CreateCollectionAndSegments(ctx, createCollection, createSegments, initialFilePaths, createCollection.Ts)
102102
if err != nil {
103103
return nil, false, err
104104
}
@@ -251,6 +251,10 @@ func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *u
251251
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID)
252252
}
253253

254+
func (s *Coordinator) GetCollectionToGcInfo(ctx context.Context, collectionID types.UniqueID, tenantID string) (*model.CollectionToGc, error) {
255+
return s.catalog.GetCollectionToGcInfo(ctx, collectionID, tenantID)
256+
}
257+
254258
func (s *Coordinator) ListCollectionVersions(ctx context.Context, collectionID types.UniqueID, tenantID string, maxCount *int64, versionsBefore *int64, versionsAtOrAfter *int64, includeMarkedForDeletion bool) ([]*coordinatorpb.CollectionVersionInfo, error) {
255259
return s.catalog.ListCollectionVersions(ctx, collectionID, tenantID, maxCount, versionsBefore, versionsAtOrAfter, includeMarkedForDeletion)
256260
}

go/pkg/sysdb/coordinator/model_db_convert.go

+7-15
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,14 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
4141
return collections
4242
}
4343

44-
func convertCollectionToGcToModel(collectionToGc []*dbmodel.CollectionToGc) []*model.CollectionToGc {
45-
if collectionToGc == nil {
46-
return nil
47-
}
48-
collections := make([]*model.CollectionToGc, 0, len(collectionToGc))
49-
for _, collectionInfo := range collectionToGc {
50-
collection := model.CollectionToGc{
51-
ID: types.MustParse(collectionInfo.ID),
52-
Name: collectionInfo.Name,
53-
VersionFilePath: collectionInfo.VersionFileName,
54-
TenantID: collectionInfo.TenantID,
55-
LineageFilePath: collectionInfo.LineageFileName,
56-
}
57-
collections = append(collections, &collection)
44+
func convertCollectionToGcToModel(collectionToGc dbmodel.CollectionToGc) model.CollectionToGc {
45+
return model.CollectionToGc{
46+
ID: types.MustParse(collectionToGc.ID),
47+
Name: collectionToGc.Name,
48+
VersionFilePath: collectionToGc.VersionFileName,
49+
TenantID: collectionToGc.TenantID,
50+
LineageFilePath: collectionToGc.LineageFileName,
5851
}
59-
return collections
6052
}
6153

6254
func convertCollectionMetadataToModel(collectionMetadataList []*dbmodel.CollectionMetadata) *model.CollectionMetadata[model.CollectionMetadataValueType] {

go/pkg/sysdb/coordinator/table_catalog.go

+51-5
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,38 @@ func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint
456456
if err != nil {
457457
return nil, err
458458
}
459-
collections := convertCollectionToGcToModel(collectionsToGc)
459+
460+
if collectionsToGc == nil {
461+
return nil, nil
462+
}
463+
464+
collections := make([]*model.CollectionToGc, 0, len(collectionsToGc))
465+
for _, collectionInfo := range collectionsToGc {
466+
collection := convertCollectionToGcToModel(*collectionInfo)
467+
collections = append(collections, &collection)
468+
}
469+
460470
return collections, nil
461471
}
462472

473+
func (tc *Catalog) GetCollectionToGcInfo(ctx context.Context, collectionID types.UniqueID, tenantID string) (*model.CollectionToGc, error) {
474+
tracer := otel.Tracer
475+
if tracer != nil {
476+
_, span := tracer.Start(ctx, "Catalog.GetCollectionToGcInfo")
477+
defer span.End()
478+
}
479+
480+
collectionToGc, err := tc.metaDomain.CollectionDb(ctx).GetCollectionToGcInfo(collectionID.String(), tenantID)
481+
if err != nil {
482+
return nil, err
483+
}
484+
if collectionToGc == nil {
485+
return nil, common.ErrCollectionNotFound
486+
}
487+
result := convertCollectionToGcToModel(*collectionToGc)
488+
return &result, nil
489+
}
490+
463491
func (tc *Catalog) GetCollectionWithSegments(ctx context.Context, collectionID types.UniqueID) (*model.Collection, []*model.Segment, error) {
464492
tracer := otel.Tracer
465493
if tracer != nil {
@@ -952,7 +980,7 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
952980
flushFilePaths = append(flushFilePaths, flushFilePath)
953981
}
954982

955-
_, _, err = tc.CreateCollectionAndSegments(txCtx, createCollection, createSegments, ts.Unix())
983+
_, _, err = tc.CreateCollectionAndSegments(txCtx, createCollection, createSegments, flushFilePaths, ts.Unix())
956984
if err != nil {
957985
return err
958986
}
@@ -1090,7 +1118,22 @@ func (tc *Catalog) createSegmentImpl(txCtx context.Context, createSegment *model
10901118
return result, nil
10911119
}
10921120

1093-
func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (string, error) {
1121+
func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, initialFilePaths []*model.FlushSegmentCompaction, ts types.Timestamp) (string, error) {
1122+
segmentCompactionInfos := make([]*coordinatorpb.FlushSegmentCompactionInfo, 0, len(createSegments))
1123+
for _, compaction := range initialFilePaths {
1124+
// Convert map[string][]string to map[string]*coordinatorpb.FilePaths
1125+
convertedPaths := make(map[string]*coordinatorpb.FilePaths)
1126+
for k, v := range compaction.FilePaths {
1127+
convertedPaths[k] = &coordinatorpb.FilePaths{Paths: v}
1128+
}
1129+
1130+
info := &coordinatorpb.FlushSegmentCompactionInfo{
1131+
SegmentId: compaction.ID.String(),
1132+
FilePaths: convertedPaths,
1133+
}
1134+
segmentCompactionInfos = append(segmentCompactionInfos, info)
1135+
}
1136+
10941137
collectionVersionFilePb := &coordinatorpb.CollectionVersionFile{
10951138
CollectionInfoImmutable: &coordinatorpb.CollectionInfoImmutable{
10961139
TenantId: createCollection.TenantID,
@@ -1104,6 +1147,9 @@ func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string
11041147
{
11051148
Version: 0,
11061149
CreatedAtSecs: int64(ts),
1150+
SegmentInfo: &coordinatorpb.CollectionSegmentInfo{
1151+
SegmentCompactionInfo: segmentCompactionInfos,
1152+
},
11071153
},
11081154
},
11091155
},
@@ -1117,7 +1163,7 @@ func (tc *Catalog) createFirstVersionFile(ctx context.Context, databaseID string
11171163
return fullFilePath, nil
11181164
}
11191165

1120-
func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, ts types.Timestamp) (*model.Collection, bool, error) {
1166+
func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollection *model.CreateCollection, createSegments []*model.CreateSegment, initialFilePaths []*model.FlushSegmentCompaction, ts types.Timestamp) (*model.Collection, bool, error) {
11211167
var resultCollection *model.Collection
11221168
created := false
11231169

@@ -1139,7 +1185,7 @@ func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollec
11391185
return nil, false, common.ErrDatabaseNotFound
11401186
}
11411187

1142-
versionFileName, err = tc.createFirstVersionFile(ctx, databases[0].ID, createCollection, createSegments, ts)
1188+
versionFileName, err = tc.createFirstVersionFile(ctx, databases[0].ID, createCollection, createSegments, initialFilePaths, ts)
11431189
if err != nil {
11441190
return nil, false, err
11451191
}

go/pkg/sysdb/grpc/collection_service.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func (s *Server) CreateCollection(ctx context.Context, req *coordinatorpb.Create
7878

7979
// Convert the request segments to create segment models
8080
createSegments := []*model.CreateSegment{}
81+
initialFilePaths := []*model.FlushSegmentCompaction{}
8182
for _, segment := range req.Segments {
8283
createSegment, err := convertSegmentToModel(segment)
8384
if err != nil {
@@ -91,10 +92,19 @@ func (s *Server) CreateCollection(ctx context.Context, req *coordinatorpb.Create
9192
return res, grpcutils.BuildInternalGrpcError(err.Error())
9293
}
9394
createSegments = append(createSegments, createSegment)
95+
filePaths := make(map[string][]string)
96+
for key, filePath := range segment.FilePaths {
97+
filePaths[key] = filePath.Paths
98+
}
99+
flushSegmentCompaction := &model.FlushSegmentCompaction{
100+
ID: createSegment.ID,
101+
FilePaths: filePaths,
102+
}
103+
initialFilePaths = append(initialFilePaths, flushSegmentCompaction)
94104
}
95105

96106
// Create the collection and segments
97-
collection, created, err := s.coordinator.CreateCollectionAndSegments(ctx, createCollection, createSegments)
107+
collection, created, err := s.coordinator.CreateCollectionAndSegments(ctx, createCollection, createSegments, initialFilePaths)
98108
if err != nil {
99109
log.Error("CreateCollection failed. error creating collection", zap.Error(err), zap.String("collection_id", req.Id), zap.String("collection_name", req.Name))
100110
res.Collection = &coordinatorpb.Collection{
@@ -513,6 +523,29 @@ func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.Lis
513523
return res, nil
514524
}
515525

526+
func (s *Server) GetCollectionToGcInfo(ctx context.Context, req *coordinatorpb.GetCollectionToGcInfoRequest) (*coordinatorpb.GetCollectionToGcInfoResponse, error) {
527+
collectionID, err := types.ToUniqueID(&req.CollectionId)
528+
if err != nil {
529+
log.Error("GetCollectionInfoToGc failed. error parsing collection id", zap.Error(err), zap.String("collection_id", req.CollectionId))
530+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
531+
}
532+
collection, err := s.coordinator.GetCollectionToGcInfo(ctx, collectionID, req.TenantId)
533+
if err != nil {
534+
log.Error("GetCollectionInfoToGc failed", zap.Error(err), zap.String("collection_id", req.CollectionId))
535+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
536+
}
537+
res := &coordinatorpb.GetCollectionToGcInfoResponse{
538+
Collection: &coordinatorpb.CollectionToGcInfo{
539+
Id: collection.ID.String(),
540+
Name: collection.Name,
541+
VersionFilePath: collection.VersionFilePath,
542+
TenantId: collection.TenantID,
543+
LineageFilePath: collection.LineageFilePath,
544+
},
545+
}
546+
return res, nil
547+
}
548+
516549
// Mark the versions for deletion.
517550
// GC minics a 2PC protocol.
518551
// 1. Mark the versions for deletion by calling MarkVersionForDeletion.

go/pkg/sysdb/metastore/db/dao/collection.go

+14
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64
101101
return collections, nil
102102
}
103103

104+
func (s *collectionDb) GetCollectionToGcInfo(collectionID string, tenantID string) (*dbmodel.CollectionToGc, error) {
105+
var collection dbmodel.CollectionToGc
106+
err := s.read_db.Table("collections").
107+
Select("collections.id, collections.name, collections.version_file_name, collections.oldest_version_ts, databases.tenant_id, COALESCE(NULLIF(root.lineage_file_name, ''), NULLIF(collections.lineage_file_name, '')) AS lineage_file_name").
108+
Joins("INNER JOIN databases ON collections.database_id = databases.id").
109+
Joins("LEFT JOIN collections AS root ON collections.root_collection_id = root.id").
110+
Where("collections.id = ? AND databases.tenant_id = ?", collectionID, tenantID).
111+
First(&collection).Error
112+
if err != nil {
113+
return nil, err
114+
}
115+
return &collection, nil
116+
}
117+
104118
func (s *collectionDb) getCollections(id *string, name *string, tenantID string, databaseName string, limit *int32, offset *int32, is_deleted bool) (collectionWithMetdata []*dbmodel.CollectionAndMetadata, err error) {
105119
type Result struct {
106120
// Collection fields

go/pkg/sysdb/metastore/db/dbmodel/collection.go

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type ICollectionDb interface {
6565
GetCollectionEntry(collectionID *string, databaseName *string) (*Collection, error)
6666
GetCollectionSize(collectionID string) (uint64, error)
6767
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error)
68+
GetCollectionToGcInfo(collectionID string, tenantID string) (*CollectionToGc, error)
6869
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
6970
LockCollection(collectionID string) error
7071
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error

idl/chromadb/proto/coordinator.proto

+10
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,15 @@ message ListCollectionsToGcResponse {
449449
repeated CollectionToGcInfo collections = 1;
450450
}
451451

452+
message GetCollectionToGcInfoRequest {
453+
string collection_id = 1;
454+
string tenant_id = 2;
455+
}
456+
457+
message GetCollectionToGcInfoResponse {
458+
CollectionToGcInfo collection = 1;
459+
}
460+
452461
message MarkVersionForDeletionRequest {
453462
int64 epoch_id = 1;
454463
repeated VersionListForCollection versions = 2;
@@ -504,6 +513,7 @@ service SysDB {
504513
rpc ListCollectionVersions(ListCollectionVersionsRequest) returns (ListCollectionVersionsResponse) {}
505514
rpc GetCollectionSize(GetCollectionSizeRequest) returns (GetCollectionSizeResponse) {}
506515
rpc ListCollectionsToGc(ListCollectionsToGcRequest) returns (ListCollectionsToGcResponse) {}
516+
rpc GetCollectionToGcInfo(GetCollectionToGcInfoRequest) returns (GetCollectionToGcInfoResponse) {}
507517
rpc MarkVersionForDeletion(MarkVersionForDeletionRequest) returns (MarkVersionForDeletionResponse) {}
508518
rpc DeleteCollectionVersion(DeleteCollectionVersionRequest) returns (DeleteCollectionVersionResponse) {}
509519
rpc BatchGetCollectionVersionFilePaths(BatchGetCollectionVersionFilePathsRequest) returns (BatchGetCollectionVersionFilePathsResponse) {}

rust/blockstore/src/test_utils/sparse_index_test_utils.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use uuid::Uuid;
2121
/// * `Result<Uuid, Box<dyn ChromaError>>` - The UUID of the created sparse index file
2222
pub async fn create_test_sparse_index(
2323
storage: &Storage,
24+
root_id: Uuid,
2425
block_ids: Vec<Uuid>,
2526
prefix: Option<String>,
2627
) -> Result<Uuid, Box<dyn ChromaError>> {
@@ -43,7 +44,6 @@ pub async fn create_test_sparse_index(
4344
sparse_index.set_count(block_ids[0], 1)?;
4445

4546
// Create and save the sparse index file
46-
let root_id = Uuid::new_v4();
4747
let root_writer = RootWriter::new(Version::V1_1, root_id, sparse_index);
4848
let root_manager = RootManager::new(storage.clone(), Box::new(NopCache));
4949
root_manager.flush::<&str>(&root_writer).await?;
@@ -87,7 +87,8 @@ mod tests {
8787
let storage = Storage::Local(LocalStorage::new(temp_dir.path().to_str().unwrap()));
8888

8989
let block_ids = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
90-
let result = create_test_sparse_index(&storage, block_ids.clone(), None).await;
90+
let result =
91+
create_test_sparse_index(&storage, Uuid::new_v4(), block_ids.clone(), None).await;
9192
assert!(result.is_ok());
9293

9394
// Verify the sparse index was created by trying to read it
@@ -107,7 +108,7 @@ mod tests {
107108
let temp_dir = TempDir::new().unwrap();
108109
let storage = Storage::Local(LocalStorage::new(temp_dir.path().to_str().unwrap()));
109110

110-
let result = create_test_sparse_index(&storage, vec![], None).await;
111+
let result = create_test_sparse_index(&storage, Uuid::new_v4(), vec![], None).await;
111112
assert!(matches!(
112113
result,
113114
Err(e) if e.to_string().contains("Cannot create sparse index with empty block IDs")
@@ -121,7 +122,7 @@ mod tests {
121122

122123
let block_ids = vec![Uuid::new_v4(), Uuid::new_v4()];
123124
let prefix = Some("custom".to_string());
124-
let result = create_test_sparse_index(&storage, block_ids, prefix).await;
125+
let result = create_test_sparse_index(&storage, Uuid::new_v4(), block_ids, prefix).await;
125126
assert!(result.is_ok());
126127
}
127128
}

rust/garbage_collector/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,4 @@ tokio-test = "0.4"
5757
rand = { workspace = true }
5858
itertools = { workspace = true }
5959
tracing-test = { version = "0.2.5" }
60+
tracing-subscriber = { workspace = true }

0 commit comments

Comments
 (0)