Skip to content

Commit 8242dc4

Browse files
committed
[ENH]: add operator to compute versions to garbage collect from version graph
1 parent 721349a commit 8242dc4

File tree

2 files changed

+338
-0
lines changed

2 files changed

+338
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
use crate::construct_version_graph_orchestrator::{VersionGraph, VersionStatus};
2+
use async_trait::async_trait;
3+
use chroma_error::{ChromaError, ErrorCodes};
4+
use chroma_system::{Operator, OperatorType};
5+
use chroma_types::CollectionUuid;
6+
use chrono::{DateTime, Utc};
7+
use petgraph::visit::Topo;
8+
use std::collections::HashMap;
9+
use thiserror::Error;
10+
11+
#[derive(Clone, Debug)]
12+
pub struct ComputeVersionsToDeleteOperator {}
13+
14+
#[derive(Debug)]
15+
pub struct ComputeVersionsToDeleteInput {
16+
pub graph: VersionGraph,
17+
pub cutoff_time: DateTime<Utc>,
18+
pub min_versions_to_keep: u32,
19+
}
20+
21+
#[derive(Debug, PartialEq)]
22+
pub enum CollectionVersionAction {
23+
Keep,
24+
Delete,
25+
}
26+
27+
#[derive(Debug)]
28+
pub struct ComputeVersionsToDeleteOutput {
29+
pub versions: HashMap<CollectionUuid, HashMap<i64, CollectionVersionAction>>,
30+
}
31+
32+
#[derive(Error, Debug)]
33+
pub enum ComputeVersionsToDeleteError {
34+
#[error("Error computing versions to delete: {0}")]
35+
ComputeError(String),
36+
#[error("Invalid timestamp in version file")]
37+
InvalidTimestamp,
38+
#[error("Error parsing version file: {0}")]
39+
ParseError(#[from] prost::DecodeError),
40+
#[error("Graph is missing expected node")]
41+
MissingVersionGraphNode,
42+
}
43+
44+
impl ChromaError for ComputeVersionsToDeleteError {
45+
fn code(&self) -> ErrorCodes {
46+
ErrorCodes::Internal
47+
}
48+
}
49+
50+
#[async_trait]
51+
impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput>
52+
for ComputeVersionsToDeleteOperator
53+
{
54+
type Error = ComputeVersionsToDeleteError;
55+
56+
fn get_type(&self) -> OperatorType {
57+
OperatorType::Other
58+
}
59+
60+
async fn run(
61+
&self,
62+
input: &ComputeVersionsToDeleteInput,
63+
) -> Result<ComputeVersionsToDeleteOutput, ComputeVersionsToDeleteError> {
64+
let mut visitor = Topo::new(&input.graph);
65+
66+
let mut versions_by_collection: HashMap<
67+
CollectionUuid,
68+
Vec<(i64, DateTime<Utc>, CollectionVersionAction)>,
69+
> = HashMap::new();
70+
71+
while let Some(node_i) = visitor.next(&input.graph) {
72+
let node = input
73+
.graph
74+
.node_weight(node_i)
75+
.ok_or(ComputeVersionsToDeleteError::MissingVersionGraphNode)?;
76+
77+
match node.status {
78+
VersionStatus::Alive { created_at } => {
79+
versions_by_collection
80+
.entry(node.collection_id)
81+
.or_default()
82+
.push((node.version, created_at, CollectionVersionAction::Keep));
83+
}
84+
VersionStatus::Deleted => {}
85+
}
86+
}
87+
88+
for versions in versions_by_collection.values_mut() {
89+
for (version, created_at, mode) in versions
90+
.iter_mut()
91+
.rev()
92+
.skip(input.min_versions_to_keep as usize)
93+
{
94+
// Always keep version 0
95+
if *version == 0 {
96+
tracing::debug!("Keeping version 0");
97+
continue;
98+
}
99+
100+
if *created_at < input.cutoff_time {
101+
*mode = CollectionVersionAction::Delete;
102+
} else {
103+
tracing::debug!(
104+
version = *version,
105+
created_at = %created_at,
106+
cutoff_time = %input.cutoff_time,
107+
"Keeping version {version} created at {created_at} after cutoff time {}", input.cutoff_time
108+
);
109+
}
110+
}
111+
}
112+
113+
// todo: rename
114+
Ok(ComputeVersionsToDeleteOutput {
115+
versions: versions_by_collection
116+
.into_iter()
117+
.map(|(collection_id, versions)| {
118+
let versions: HashMap<_, _> = versions
119+
.into_iter()
120+
.map(|(version, _, mode)| (version, mode))
121+
.collect();
122+
let num_versions = versions.len();
123+
tracing::debug!(
124+
collection_id = %collection_id,
125+
versions_to_delete = ?versions,
126+
"Processed {num_versions} versions from collection {collection_id}"
127+
);
128+
129+
(collection_id, versions)
130+
})
131+
.collect(),
132+
})
133+
}
134+
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::*;
139+
use crate::construct_version_graph_orchestrator::{VersionGraphNode, VersionStatus};
140+
use chrono::{Duration, Utc};
141+
use tracing_test::traced_test;
142+
143+
#[tokio::test]
144+
#[traced_test]
145+
async fn test_compute_versions_to_delete() {
146+
let now = Utc::now();
147+
148+
let collection_id = CollectionUuid::new();
149+
150+
let mut graph = VersionGraph::new();
151+
let v0 = graph.add_node(VersionGraphNode {
152+
collection_id,
153+
version: 0,
154+
status: VersionStatus::Alive {
155+
created_at: (now - Duration::hours(48)),
156+
},
157+
});
158+
let v1 = graph.add_node(VersionGraphNode {
159+
collection_id,
160+
version: 1,
161+
status: VersionStatus::Alive {
162+
created_at: (now - Duration::hours(24)),
163+
},
164+
});
165+
let v2 = graph.add_node(VersionGraphNode {
166+
collection_id,
167+
version: 2,
168+
status: VersionStatus::Alive {
169+
created_at: (now - Duration::hours(12)),
170+
},
171+
});
172+
let v3 = graph.add_node(VersionGraphNode {
173+
collection_id,
174+
version: 3,
175+
status: VersionStatus::Alive {
176+
created_at: (now - Duration::hours(1)),
177+
},
178+
});
179+
let v4 = graph.add_node(VersionGraphNode {
180+
collection_id,
181+
version: 4,
182+
status: VersionStatus::Alive { created_at: now },
183+
});
184+
graph.add_edge(v0, v1, ());
185+
graph.add_edge(v1, v2, ());
186+
graph.add_edge(v2, v3, ());
187+
graph.add_edge(v3, v4, ());
188+
189+
let input = ComputeVersionsToDeleteInput {
190+
graph,
191+
cutoff_time: now - Duration::hours(6),
192+
min_versions_to_keep: 1,
193+
};
194+
195+
let mut result = ComputeVersionsToDeleteOperator {}
196+
.run(&input)
197+
.await
198+
.unwrap();
199+
200+
// v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion.
201+
assert_eq!(result.versions.len(), 1);
202+
let versions = result.versions.remove(&collection_id).unwrap();
203+
assert_eq!(
204+
versions.into_iter().collect::<Vec<_>>(),
205+
vec![
206+
(0, CollectionVersionAction::Keep),
207+
(1, CollectionVersionAction::Delete),
208+
(2, CollectionVersionAction::Delete),
209+
(3, CollectionVersionAction::Keep),
210+
(4, CollectionVersionAction::Keep)
211+
]
212+
);
213+
}
214+
215+
#[tokio::test]
216+
#[traced_test]
217+
async fn test_compute_versions_to_delete_fork_tree() {
218+
let now = Utc::now();
219+
220+
let a_collection_id = CollectionUuid::new();
221+
222+
let mut graph = VersionGraph::new();
223+
let a_v0 = graph.add_node(VersionGraphNode {
224+
collection_id: a_collection_id,
225+
version: 0,
226+
status: VersionStatus::Alive {
227+
created_at: (now - Duration::hours(48)),
228+
},
229+
});
230+
let a_v1 = graph.add_node(VersionGraphNode {
231+
collection_id: a_collection_id,
232+
version: 1,
233+
status: VersionStatus::Alive {
234+
created_at: (now - Duration::hours(24)),
235+
},
236+
});
237+
let a_v2 = graph.add_node(VersionGraphNode {
238+
collection_id: a_collection_id,
239+
version: 2,
240+
status: VersionStatus::Alive {
241+
created_at: (now - Duration::hours(12)),
242+
},
243+
});
244+
let a_v3 = graph.add_node(VersionGraphNode {
245+
collection_id: a_collection_id,
246+
version: 3,
247+
status: VersionStatus::Alive {
248+
created_at: (now - Duration::hours(1)),
249+
},
250+
});
251+
let a_v4 = graph.add_node(VersionGraphNode {
252+
collection_id: a_collection_id,
253+
version: 4,
254+
status: VersionStatus::Alive { created_at: now },
255+
});
256+
graph.add_edge(a_v0, a_v1, ());
257+
graph.add_edge(a_v1, a_v2, ());
258+
graph.add_edge(a_v2, a_v3, ());
259+
graph.add_edge(a_v3, a_v4, ());
260+
261+
let b_collection_id = CollectionUuid::new();
262+
let b_v0 = graph.add_node(VersionGraphNode {
263+
collection_id: b_collection_id,
264+
version: 0,
265+
status: VersionStatus::Alive {
266+
created_at: (now - Duration::hours(23)),
267+
},
268+
});
269+
let b_v1 = graph.add_node(VersionGraphNode {
270+
collection_id: b_collection_id,
271+
version: 1,
272+
status: VersionStatus::Alive {
273+
created_at: (now - Duration::hours(12)),
274+
},
275+
});
276+
let b_v2 = graph.add_node(VersionGraphNode {
277+
collection_id: b_collection_id,
278+
version: 2,
279+
status: VersionStatus::Alive {
280+
created_at: (now - Duration::hours(1)),
281+
},
282+
});
283+
graph.add_edge(b_v0, b_v1, ());
284+
graph.add_edge(b_v1, b_v2, ());
285+
// B was forked from A
286+
graph.add_edge(a_v1, b_v0, ());
287+
288+
let c_collection_id = CollectionUuid::new();
289+
let c_v0 = graph.add_node(VersionGraphNode {
290+
collection_id: c_collection_id,
291+
version: 0,
292+
status: VersionStatus::Alive {
293+
created_at: (now - Duration::hours(1)),
294+
},
295+
});
296+
// C was forked from B
297+
graph.add_edge(b_v2, c_v0, ());
298+
299+
let input = ComputeVersionsToDeleteInput {
300+
graph,
301+
cutoff_time: now - Duration::hours(6),
302+
min_versions_to_keep: 1,
303+
};
304+
305+
let mut result = ComputeVersionsToDeleteOperator {}
306+
.run(&input)
307+
.await
308+
.unwrap();
309+
310+
// Only collections A and B should have versions to delete
311+
assert_eq!(result.versions.len(), 2);
312+
313+
// For collection A: v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion.
314+
let a_versions = result.versions.remove(&a_collection_id).unwrap();
315+
assert_eq!(
316+
a_versions.into_iter().collect::<Vec<_>>(),
317+
vec![
318+
(0, CollectionVersionAction::Keep),
319+
(1, CollectionVersionAction::Delete),
320+
(2, CollectionVersionAction::Delete),
321+
(3, CollectionVersionAction::Keep),
322+
(4, CollectionVersionAction::Keep)
323+
]
324+
);
325+
326+
// For collection B: v0 is always kept, and the most recent version (v2) is kept. So v1 is marked for deletion.
327+
let b_versions = result.versions.remove(&b_collection_id).unwrap();
328+
assert_eq!(
329+
b_versions.into_iter().collect::<Vec<_>>(),
330+
vec![
331+
(0, CollectionVersionAction::Keep),
332+
(1, CollectionVersionAction::Delete),
333+
(2, CollectionVersionAction::Keep)
334+
]
335+
);
336+
}
337+
}

rust/garbage_collector/src/operators/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod compute_unused_between_versions;
22
pub mod compute_unused_files;
33
pub mod compute_versions_to_delete;
4+
pub mod compute_versions_to_delete_from_graph;
45
pub mod delete_unused_files;
56
pub mod delete_versions_at_sysdb;
67
pub mod fetch_lineage_file;

0 commit comments

Comments
 (0)