Skip to content

[ENH] support garbage collection on forked collections #4447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 4 additions & 30 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,35 +1,7 @@
[workspace]
resolver = "2"

members = [
"rust/benchmark",
"rust/blockstore",
"rust/cache",
"rust/chroma",
"rust/config",
"rust/distance",
"rust/error",
"rust/frontend",
"rust/garbage_collector",
"rust/index",
"rust/load",
"rust/log",
"rust/log-service",
"rust/memberlist",
"rust/storage",
"rust/system",
"rust/sysdb",
"rust/types",
"rust/worker",
"rust/segment",
"rust/python_bindings",
"rust/mdac",
"rust/tracing",
"rust/sqlite",
"rust/cli",
"rust/wal3",
"rust/js_bindings"
]
members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/frontend", "rust/garbage_collector", "rust/index", "rust/load", "rust/log", "rust/log-service", "rust/memberlist", "rust/storage", "rust/system", "rust/sysdb", "rust/types", "rust/worker", "rust/segment", "rust/python_bindings", "rust/mdac", "rust/tracing", "rust/sqlite", "rust/cli", "rust/wal3", "rust/js_bindings"]

[workspace.dependencies]
arrow = "52.2.0"
Expand Down Expand Up @@ -86,10 +58,12 @@ rayon = "1.10.0"
validator = { version = "0.19", features = ["derive"] }
rust-embed = { version = "8.5.0", features = ["include-exclude", "debug-embed"] }
hnswlib = { version = "0.8.1", git = "https://github.com/chroma-core/hnswlib.git" }
reqwest = { version = "0.12.9", features = ["rustls-tls-native-roots", "http2"], default-features = false }
reqwest = { version = "0.12.9", features = ["rustls-tls-native-roots", "http2"], default-features = false }
random-port = "0.1.1"
ndarray = { version = "0.16.1", features = ["approx"] }
humantime = { version = "2.2.0" }
petgraph = { version = "0.8.1" }
base64 = "0.22"

chroma-benchmark = { path = "rust/benchmark" }
chroma-blockstore = { path = "rust/blockstore" }
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (s *Coordinator) DeleteCollectionVersion(ctx context.Context, req *coordina
return s.catalog.DeleteCollectionVersion(ctx, req)
}

func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req)
}

// SetDeleteMode sets the delete mode for testing
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
c.deleteMode = mode
Expand Down
29 changes: 29 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao"
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
"github.com/pingcap/log"
Expand Down Expand Up @@ -1484,6 +1485,34 @@ func (suite *APIsTestSuite) TestForkCollection() {
suite.Empty(collections)
}

func (suite *APIsTestSuite) TestBatchGetCollectionVersionFilePaths() {
ctx := context.Background()

// Create a new collection
newCollection := &model.CreateCollection{
ID: types.NewUniqueID(),
Name: "test_batch_get_collection_version_file_paths",
TenantID: suite.tenantName,
DatabaseName: suite.databaseName,
}

// Create the collection
_, _, err := suite.coordinator.CreateCollection(ctx, newCollection)
suite.NoError(err)

// Get the version file paths for the collection
versionFilePaths, err := suite.coordinator.BatchGetCollectionVersionFilePaths(ctx, &coordinatorpb.BatchGetCollectionVersionFilePathsRequest{
CollectionIds: []string{newCollection.ID.String()},
})
suite.NoError(err)
suite.Len(versionFilePaths, 1)

// Verify version file exists in S3
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, versionFilePaths.CollectionIdToVersionFilePath[newCollection.ID.String()])
suite.NoError(err)
suite.True(exists, "Version file should exist in S3")
}

func TestAPIsTestSuite(t *testing.T) {
testSuite := new(APIsTestSuite)
suite.Run(t, testSuite)
Expand Down
20 changes: 20 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,26 @@ func (tc *Catalog) DeleteCollectionVersion(ctx context.Context, req *coordinator
return &result, nil
}

func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
result := coordinatorpb.BatchGetCollectionVersionFilePathsResponse{
CollectionIdToVersionFilePath: make(map[string]string),
}

for _, collectionId := range req.CollectionIds {
collectionIDPtr := &collectionId
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)
if err != nil {
return nil, err
}
if collectionEntry == nil {
return nil, common.ErrCollectionNotFound
}

result.CollectionIdToVersionFilePath[collectionId] = collectionEntry.VersionFileName
}
return &result, nil
}

func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
collectionIDPtr := &collectionID
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionEntry(collectionIDPtr, nil)
Expand Down
8 changes: 8 additions & 0 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,11 @@ func (s *Server) DeleteCollectionVersion(ctx context.Context, req *coordinatorpb
}
return res, nil
}

func (s *Server) BatchGetCollectionVersionFilePaths(ctx context.Context, req *coordinatorpb.BatchGetCollectionVersionFilePathsRequest) (*coordinatorpb.BatchGetCollectionVersionFilePathsResponse, error) {
res, err := s.coordinator.BatchGetCollectionVersionFilePaths(ctx, req)
if err != nil {
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}
return res, nil
}
10 changes: 10 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ message CollectionToGcInfo {
string version_file_path = 3;
int64 latest_version = 4;
string tenant_id = 5;
optional string lineage_file_path = 6;
}

message ListCollectionsToGcResponse {
Expand All @@ -456,6 +457,14 @@ message DeleteCollectionVersionResponse {
map<string, bool> collection_id_to_success = 1;
}

message BatchGetCollectionVersionFilePathsRequest {
repeated string collection_ids = 1;
}

message BatchGetCollectionVersionFilePathsResponse {
map<string, string> collection_id_to_version_file_path = 1;
}

service SysDB {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
Expand Down Expand Up @@ -486,4 +495,5 @@ service SysDB {
rpc ListCollectionsToGc(ListCollectionsToGcRequest) returns (ListCollectionsToGcResponse) {}
rpc MarkVersionForDeletion(MarkVersionForDeletionRequest) returns (MarkVersionForDeletionResponse) {}
rpc DeleteCollectionVersion(DeleteCollectionVersionRequest) returns (DeleteCollectionVersionResponse) {}
rpc BatchGetCollectionVersionFilePaths(BatchGetCollectionVersionFilePathsRequest) returns (BatchGetCollectionVersionFilePathsResponse) {}
}
6 changes: 6 additions & 0 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ pub struct RootManager {
prefetched_roots: Arc<parking_lot::Mutex<HashMap<Uuid, Duration>>>,
}

impl std::fmt::Debug for RootManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RootManager").finish()
}
}

impl RootManager {
pub fn new(storage: Storage, cache: Box<dyn PersistentCache<Uuid, RootReader>>) -> Self {
let cache: Arc<dyn PersistentCache<Uuid, RootReader>> = cache.into();
Expand Down
3 changes: 3 additions & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ tracing = { workspace = true }
thiserror = { workspace = true }
humantime = { workspace = true }
opentelemetry = { workspace = true }
petgraph = { workspace = true }
itertools = { workspace = true }
base64 = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
service_name: "garbage-collector"
otel_endpoint: "http://otel-collector:4317"
relative_cutoff_time_seconds: 43200 # GC all versions created at time < now() - relative_cutoff_time_seconds (12 hours)
relative_cutoff_time_seconds: 10 # GC all versions created at time < now() - relative_cutoff_time_seconds (12 hours)
max_collections_to_gc: 1000 # Maximum number of collections to GC in one run
gc_interval_mins: 120 # Run GC every x mins
gc_interval_mins: 1 # Run GC every x mins
disallow_collections: [] # collection ids to disable GC on
sysdb_config:
host: "sysdb.chroma"
Expand Down
Binary file added rust/garbage_collector/graph.txt.gz
Binary file not shown.
Loading
Loading