Skip to content

Commit 8d65178

Browse files
committed
wip
1 parent 1473971 commit 8d65178

File tree

5 files changed

+169
-41
lines changed

5 files changed

+169
-41
lines changed

Cargo.lock

+21-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/garbage_collector/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ tracing = { workspace = true }
3232
thiserror = { workspace = true }
3333
humantime = { workspace = true }
3434
opentelemetry = { workspace = true }
35+
petgraph = "0.8.1"
3536

3637
chroma-config = { workspace = true }
3738
chroma-error = { workspace = true }

rust/garbage_collector/src/garbage_collector_orchestrator.rs

+94-30
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,21 @@
3636
//! - Permanently deletes marked versions from the system database
3737
//! - Input: Version file, versions to delete, unused S3 files
3838
//! - Output: Deletion confirmation
39+
//!
40+
//!
41+
//!
42+
//! ListFilesAtVersionOperator
43+
//! - input: version file & versions to check
44+
//! - output: full paths of all S3 files used by specified versions
45+
//!
3946
47+
use std::collections::HashSet;
4048
use std::fmt::{Debug, Formatter};
49+
use std::str::FromStr;
4150

51+
use crate::operators::fetch_lineage_file::{
52+
FetchLineageFileError, FetchLineageFileInput, FetchLineageFileOperator, FetchLineageFileOutput,
53+
};
4254
use crate::types::CleanupMode;
4355
use async_trait::async_trait;
4456
use chroma_error::{ChromaError, ErrorCodes};
@@ -83,6 +95,7 @@ use prost::Message;
8395

8496
pub struct GarbageCollectorOrchestrator {
8597
collection_id: CollectionUuid,
98+
lineage_file_path: Option<String>,
8699
version_file_path: String,
87100
absolute_cutoff_time: DateTime<Utc>,
88101
sysdb_client: SysDb,
@@ -114,9 +127,11 @@ pub struct GarbageCollectorResponse {
114127

115128
#[allow(clippy::too_many_arguments)]
116129
impl GarbageCollectorOrchestrator {
130+
/// Lineage file path must be provided if this collection is part of a fork tree.
117131
pub fn new(
118132
collection_id: CollectionUuid,
119133
version_file_path: String,
134+
lineage_file_path: Option<String>,
120135
absolute_cutoff_time: DateTime<Utc>,
121136
sysdb_client: SysDb,
122137
dispatcher: ComponentHandle<Dispatcher>,
@@ -126,6 +141,7 @@ impl GarbageCollectorOrchestrator {
126141
Self {
127142
collection_id,
128143
version_file_path,
144+
lineage_file_path,
129145
absolute_cutoff_time,
130146
sysdb_client,
131147
dispatcher,
@@ -165,6 +181,8 @@ pub enum GarbageCollectorError {
165181
Aborted,
166182
#[error("DeleteUnusedFiles error: {0}")]
167183
DeleteUnusedFiles(#[from] DeleteUnusedFilesError),
184+
#[error("FetchLineageFile error: {0}")]
185+
FetchLineageFile(#[from] FetchLineageFileError),
168186
}
169187

170188
impl ChromaError for GarbageCollectorError {
@@ -201,14 +219,15 @@ impl Orchestrator for GarbageCollectorOrchestrator {
201219
"Creating initial fetch version file task"
202220
);
203221

204-
vec![wrap(
205-
Box::new(FetchVersionFileOperator {}),
206-
FetchVersionFileInput {
207-
version_file_path: self.version_file_path.clone(),
208-
storage: self.storage.clone(),
209-
},
210-
ctx.receiver(),
211-
)]
222+
// vec![wrap(
223+
// Box::new(FetchVersionFileOperator {}),
224+
// FetchVersionFileInput {
225+
// version_file_path: self.version_file_path.clone(),
226+
// storage: self.storage.clone(),
227+
// },
228+
// ctx.receiver(),
229+
// )]
230+
vec![]
212231
}
213232

214233
fn set_result_channel(
@@ -369,34 +388,76 @@ impl Handler<TaskResult<MarkVersionsAtSysDbOutput, MarkVersionsAtSysDbError>>
369388
message: TaskResult<MarkVersionsAtSysDbOutput, MarkVersionsAtSysDbError>,
370389
ctx: &ComponentContext<GarbageCollectorOrchestrator>,
371390
) {
372-
// Stage 3: After marking versions, compute unused files
373391
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
374392
Some(output) => output,
375393
None => return,
376394
};
377395

378-
let compute_task = wrap(
379-
Box::new(ComputeUnusedFilesOperator::new(
380-
self.collection_id.to_string(),
381-
self.storage.clone(),
382-
2, // min_versions_to_keep
383-
)),
384-
ComputeUnusedFilesInput {
385-
version_file: output.version_file,
386-
versions_to_delete: output.versions_to_delete,
387-
oldest_version_to_keep: output.oldest_version_to_keep,
388-
},
389-
ctx.receiver(),
390-
);
396+
if let Some(lineage_file_path) = self.lineage_file_path.clone() {
397+
let fetch_lineage_file_task = wrap(
398+
Box::new(FetchLineageFileOperator::new()),
399+
FetchLineageFileInput::new(self.storage.clone(), lineage_file_path),
400+
ctx.receiver(),
401+
);
391402

392-
if let Err(e) = self
393-
.dispatcher()
394-
.send(compute_task, Some(Span::current()))
395-
.await
396-
{
397-
self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx)
398-
.await;
399-
return;
403+
if let Err(e) = self
404+
.dispatcher()
405+
.send(fetch_lineage_file_task, Some(Span::current()))
406+
.await
407+
{
408+
self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx)
409+
.await;
410+
return;
411+
}
412+
}
413+
}
414+
}
415+
416+
#[async_trait]
417+
impl Handler<TaskResult<FetchLineageFileOutput, FetchLineageFileError>>
418+
for GarbageCollectorOrchestrator
419+
{
420+
type Result = ();
421+
422+
async fn handle(
423+
&mut self,
424+
message: TaskResult<FetchLineageFileOutput, FetchLineageFileError>,
425+
ctx: &ComponentContext<GarbageCollectorOrchestrator>,
426+
) {
427+
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
428+
Some(output) => output,
429+
None => return,
430+
};
431+
432+
let mut collection_ids_in_tree = HashSet::new();
433+
for dependency in output.0.dependencies {
434+
// todo: no expect
435+
let source_id = CollectionUuid::from_str(&dependency.source_collection_id)
436+
.expect("Failed to parse source ID");
437+
let target_id = CollectionUuid::from_str(&dependency.target_collection_id)
438+
.expect("Failed to parse target ID");
439+
collection_ids_in_tree.insert(source_id);
440+
collection_ids_in_tree.insert(target_id);
441+
}
442+
443+
// todo: need to remove self?
444+
445+
for collection_id in collection_ids_in_tree {
446+
let fetch_version_file_task = wrap(
447+
Box::new(FetchVersionFileOperator::new()),
448+
FetchVersionFileInput::new(collection_id, self.storage.clone()),
449+
ctx.receiver(),
450+
);
451+
452+
if let Err(e) = self
453+
.dispatcher()
454+
.send(fetch_version_file_task, Some(Span::current()))
455+
.await
456+
{
457+
self.terminate_with_result(Err(GarbageCollectorError::Channel(e)), ctx)
458+
.await;
459+
return;
460+
}
400461
}
401462
}
402463
}
@@ -871,6 +932,7 @@ mod tests {
871932
let orchestrator = GarbageCollectorOrchestrator::new(
872933
collection_id,
873934
collection_info.version_file_path.clone(),
935+
None, // todo
874936
SystemTime::now().into(), // immediately expire versions
875937
sysdb,
876938
dispatcher_handle,
@@ -1027,6 +1089,7 @@ mod tests {
10271089
let orchestrator = GarbageCollectorOrchestrator::new(
10281090
collection_id,
10291091
collection_info.version_file_path.clone(),
1092+
None, // todo
10301093
SystemTime::now().into(), // immediately expire versions
10311094
sysdb,
10321095
dispatcher_handle,
@@ -1185,6 +1248,7 @@ mod tests {
11851248
let orchestrator = GarbageCollectorOrchestrator::new(
11861249
collection_id,
11871250
collection_info.version_file_path.clone(),
1251+
None, // todo
11881252
SystemTime::now().into(), // immediately expire versions
11891253
sysdb,
11901254
dispatcher_handle,

rust/garbage_collector/src/operators/fetch_lineage_file.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,31 @@ use chroma_types::chroma_proto::CollectionLineageFile;
55
use prost::Message;
66
use thiserror::Error;
77

8-
struct FetchLineageFileInput {
8+
#[derive(Clone)]
9+
pub struct FetchLineageFileInput {
910
storage: Storage,
1011
lineage_file_path: String,
1112
}
1213

13-
struct FetchLineageFileOutput(CollectionLineageFile);
14+
impl std::fmt::Debug for FetchLineageFileInput {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
f.debug_struct("FetchLineageFileInput")
17+
.field("lineage_file_path", &self.lineage_file_path)
18+
.finish()
19+
}
20+
}
21+
22+
impl FetchLineageFileInput {
23+
pub fn new(storage: Storage, lineage_file_path: String) -> Self {
24+
Self {
25+
storage,
26+
lineage_file_path,
27+
}
28+
}
29+
}
30+
31+
#[derive(Debug)]
32+
pub struct FetchLineageFileOutput(pub CollectionLineageFile);
1433

1534
#[derive(Debug, Error)]
1635
pub enum FetchLineageFileError {
@@ -23,6 +42,12 @@ pub enum FetchLineageFileError {
2342
#[derive(Clone, Debug)]
2443
pub struct FetchLineageFileOperator {}
2544

45+
impl FetchLineageFileOperator {
46+
pub fn new() -> Self {
47+
Self {}
48+
}
49+
}
50+
2651
#[async_trait]
2752
impl Operator<FetchLineageFileInput, FetchLineageFileOutput> for FetchLineageFileOperator {
2853
type Error = FetchLineageFileError;

rust/garbage_collector/src/operators/fetch_version_file.rs

+26-7
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,40 @@ use async_trait::async_trait;
1414
use chroma_error::{ChromaError, ErrorCodes};
1515
use chroma_storage::admissioncontrolleds3::StorageRequestPriority;
1616
use chroma_storage::{GetOptions, Storage, StorageError};
17+
use chroma_sysdb::SysDb;
1718
use chroma_system::{Operator, OperatorType};
19+
use chroma_types::CollectionUuid;
1820
use thiserror::Error;
1921

2022
#[derive(Clone, Debug)]
2123
pub struct FetchVersionFileOperator {}
2224

25+
impl FetchVersionFileOperator {
26+
pub fn new() -> Self {
27+
Self {}
28+
}
29+
}
30+
2331
pub struct FetchVersionFileInput {
24-
pub version_file_path: String,
25-
pub storage: Storage,
32+
collection_id: CollectionUuid,
33+
storage: Storage,
34+
sysdb: SysDb,
35+
}
36+
37+
impl FetchVersionFileInput {
38+
pub fn new(collection_id: CollectionUuid, storage: Storage, sysdb: SysDb) -> Self {
39+
Self {
40+
collection_id,
41+
storage,
42+
sysdb,
43+
}
44+
}
2645
}
2746

2847
impl Debug for FetchVersionFileInput {
2948
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3049
f.debug_struct("FetchVersionFileInput")
31-
.field("version_file_path", &self.version_file_path)
50+
.field("collection_id", &self.collection_id)
3251
.finish()
3352
}
3453
}
@@ -83,10 +102,10 @@ impl Operator<FetchVersionFileInput, FetchVersionFileOutput> for FetchVersionFil
83102
&self,
84103
input: &FetchVersionFileInput,
85104
) -> Result<FetchVersionFileOutput, FetchVersionFileError> {
86-
tracing::info!(
87-
path = %input.version_file_path,
88-
"Starting to fetch version file"
89-
);
105+
let collection =
106+
input
107+
.sysdb
108+
.get_collections(collection_id, name, tenant, database, limit, offset);
90109

91110
let content = input
92111
.storage

0 commit comments

Comments
 (0)