Skip to content

Commit 9e02f53

Browse files
committed
[ENH]: garbage collector v2 orchestrator (supports forking)
1 parent 0c85b47 commit 9e02f53

File tree

6 files changed

+496
-25
lines changed

6 files changed

+496
-25
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

+17-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use chroma_system::{
2121
wrap, ChannelError, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator,
2222
PanicError, TaskError, TaskMessage, TaskResult,
2323
};
24-
use chroma_types::{chroma_proto::CollectionVersionInfo, CollectionUuid};
24+
use chroma_types::{
25+
chroma_proto::{CollectionVersionFile, CollectionVersionInfo},
26+
CollectionUuid,
27+
};
2528
use chrono::DateTime;
2629
use itertools::Itertools;
2730
use petgraph::{dot::Dot, graph::DiGraph, prelude::DiGraphMap};
@@ -58,6 +61,7 @@ pub struct ConstructVersionGraphOrchestrator {
5861

5962
graph: DiGraphMap<InternalVersionGraphNode, ()>,
6063
graph_data: HashMap<InternalVersionGraphNode, VersionGraphNodeData>,
64+
version_files: HashMap<CollectionUuid, CollectionVersionFile>,
6165
num_pending_tasks: usize,
6266
}
6367

@@ -82,13 +86,13 @@ impl ConstructVersionGraphOrchestrator {
8286

8387
graph: DiGraphMap::new(),
8488
graph_data: HashMap::new(),
89+
version_files: HashMap::new(),
8590
num_pending_tasks: 0,
8691
}
8792
}
8893
}
8994

9095
#[derive(Debug, Clone)]
91-
#[allow(dead_code)]
9296
pub struct VersionGraphNode {
9397
pub collection_id: CollectionUuid,
9498
pub version: i64,
@@ -98,8 +102,8 @@ pub struct VersionGraphNode {
98102
pub type VersionGraph = DiGraph<VersionGraphNode, ()>;
99103

100104
#[derive(Debug)]
101-
#[allow(dead_code)]
102105
pub struct ConstructVersionGraphResponse {
106+
pub version_files: HashMap<CollectionUuid, CollectionVersionFile>,
103107
pub graph: VersionGraph,
104108
}
105109

@@ -259,8 +263,14 @@ impl ConstructVersionGraphOrchestrator {
259263
None => return,
260264
};
261265

262-
self.terminate_with_result(Ok(ConstructVersionGraphResponse { graph }), ctx)
263-
.await;
266+
self.terminate_with_result(
267+
Ok(ConstructVersionGraphResponse {
268+
graph,
269+
version_files: self.version_files.clone(),
270+
}),
271+
ctx,
272+
)
273+
.await;
264274
}
265275
}
266276

@@ -368,6 +378,8 @@ impl Handler<TaskResult<FetchVersionFileOutput, FetchVersionFileError>>
368378
}
369379
};
370380
let collection_id = output.collection_id;
381+
self.version_files
382+
.insert(collection_id, output.file.clone());
371383

372384
let versions = match output.file.version_history {
373385
Some(versions) => versions.versions,

0 commit comments

Comments
 (0)