Skip to content

Commit 34ed231

Browse files
committed
[ENH]: add operator to compute versions to garbage collect from version graph
1 parent fafe998 commit 34ed231

File tree

2 files changed

+305
-0
lines changed

2 files changed

+305
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
use crate::construct_version_graph_orchestrator::VersionGraph;
2+
use async_trait::async_trait;
3+
use chroma_error::{ChromaError, ErrorCodes};
4+
use chroma_system::{Operator, OperatorType};
5+
use chroma_types::CollectionUuid;
6+
use chrono::{DateTime, Utc};
7+
use petgraph::visit::Topo;
8+
use std::collections::HashMap;
9+
use thiserror::Error;
10+
11+
#[derive(Clone, Debug)]
12+
pub struct ComputeVersionsToDeleteOperator {}
13+
14+
#[derive(Debug)]
15+
pub struct ComputeVersionsToDeleteInput {
16+
pub graph: VersionGraph,
17+
pub cutoff_time: DateTime<Utc>,
18+
pub min_versions_to_keep: u32,
19+
}
20+
21+
#[derive(Debug, PartialEq)]
22+
pub enum CollectionVersionAction {
23+
Keep,
24+
Delete,
25+
}
26+
27+
#[derive(Debug)]
28+
pub struct ComputeVersionsToDeleteOutput {
29+
pub versions: HashMap<CollectionUuid, Vec<(i64, CollectionVersionAction)>>,
30+
}
31+
32+
#[derive(Error, Debug)]
33+
pub enum ComputeVersionsToDeleteError {
34+
#[error("Error computing versions to delete: {0}")]
35+
ComputeError(String),
36+
#[error("Invalid timestamp in version file")]
37+
InvalidTimestamp,
38+
#[error("Error parsing version file: {0}")]
39+
ParseError(#[from] prost::DecodeError),
40+
#[error("Graph is missing expected node")]
41+
MissingVersionGraphNode,
42+
}
43+
44+
impl ChromaError for ComputeVersionsToDeleteError {
45+
fn code(&self) -> ErrorCodes {
46+
ErrorCodes::Internal
47+
}
48+
}
49+
50+
#[async_trait]
51+
impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput>
52+
for ComputeVersionsToDeleteOperator
53+
{
54+
type Error = ComputeVersionsToDeleteError;
55+
56+
fn get_type(&self) -> OperatorType {
57+
OperatorType::Other
58+
}
59+
60+
async fn run(
61+
&self,
62+
input: &ComputeVersionsToDeleteInput,
63+
) -> Result<ComputeVersionsToDeleteOutput, ComputeVersionsToDeleteError> {
64+
let mut visitor = Topo::new(&input.graph);
65+
66+
let mut versions_by_collection: HashMap<
67+
CollectionUuid,
68+
Vec<(i64, DateTime<Utc>, CollectionVersionAction)>,
69+
> = HashMap::new();
70+
71+
while let Some(node_i) = visitor.next(&input.graph) {
72+
let node = input
73+
.graph
74+
.node_weight(node_i)
75+
.ok_or(ComputeVersionsToDeleteError::MissingVersionGraphNode)?;
76+
77+
versions_by_collection
78+
.entry(node.collection_id)
79+
.or_default()
80+
.push((node.version, node.created_at, CollectionVersionAction::Keep));
81+
}
82+
83+
for versions in versions_by_collection.values_mut() {
84+
for (version, created_at, mode) in versions
85+
.iter_mut()
86+
.rev()
87+
.skip(input.min_versions_to_keep as usize)
88+
{
89+
// Always keep version 0
90+
if *version == 0 {
91+
continue;
92+
}
93+
94+
if *created_at < input.cutoff_time {
95+
*mode = CollectionVersionAction::Delete;
96+
}
97+
}
98+
}
99+
100+
Ok(ComputeVersionsToDeleteOutput {
101+
versions: versions_by_collection
102+
.into_iter()
103+
.filter(|(_, versions)| {
104+
versions
105+
.iter()
106+
.any(|(_, _, mode)| *mode == CollectionVersionAction::Delete)
107+
})
108+
.map(|(collection_id, versions)| {
109+
let versions: Vec<_> = versions
110+
.into_iter()
111+
.map(|(version, _, mode)| (version, mode))
112+
.collect();
113+
let num_versions = versions.len();
114+
tracing::debug!(
115+
collection_id = %collection_id,
116+
versions_to_delete = ?versions,
117+
"Deleting {num_versions} versions from collection {collection_id}"
118+
);
119+
120+
(collection_id, versions)
121+
})
122+
.collect(),
123+
})
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
use crate::construct_version_graph_orchestrator::VersionGraphNode;
131+
use chrono::{Duration, Utc};
132+
use tracing_test::traced_test;
133+
134+
#[tokio::test]
135+
#[traced_test]
136+
async fn test_compute_versions_to_delete() {
137+
let now = Utc::now();
138+
139+
let collection_id = CollectionUuid::new();
140+
141+
let mut graph = VersionGraph::new();
142+
let v0 = graph.add_node(VersionGraphNode {
143+
collection_id,
144+
version: 0,
145+
created_at: (now - Duration::hours(48)),
146+
});
147+
let v1 = graph.add_node(VersionGraphNode {
148+
collection_id,
149+
version: 1,
150+
created_at: (now - Duration::hours(24)),
151+
});
152+
let v2 = graph.add_node(VersionGraphNode {
153+
collection_id,
154+
version: 2,
155+
created_at: (now - Duration::hours(12)),
156+
});
157+
let v3 = graph.add_node(VersionGraphNode {
158+
collection_id,
159+
version: 3,
160+
created_at: (now - Duration::hours(1)),
161+
});
162+
let v4 = graph.add_node(VersionGraphNode {
163+
collection_id,
164+
version: 4,
165+
created_at: now,
166+
});
167+
graph.add_edge(v0, v1, ());
168+
graph.add_edge(v1, v2, ());
169+
graph.add_edge(v2, v3, ());
170+
graph.add_edge(v3, v4, ());
171+
172+
let input = ComputeVersionsToDeleteInput {
173+
graph,
174+
cutoff_time: now - Duration::hours(6),
175+
min_versions_to_keep: 1,
176+
};
177+
178+
let result = ComputeVersionsToDeleteOperator {}
179+
.run(&input)
180+
.await
181+
.unwrap();
182+
183+
// v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion.
184+
assert_eq!(result.versions.len(), 1);
185+
let versions = result.versions.get(&collection_id).unwrap();
186+
assert_eq!(
187+
*versions,
188+
vec![
189+
(0, CollectionVersionAction::Keep),
190+
(1, CollectionVersionAction::Delete),
191+
(2, CollectionVersionAction::Delete),
192+
(3, CollectionVersionAction::Keep),
193+
(4, CollectionVersionAction::Keep)
194+
]
195+
);
196+
}
197+
198+
#[tokio::test]
199+
#[traced_test]
200+
async fn test_compute_versions_to_delete_fork_tree() {
201+
let now = Utc::now();
202+
203+
let a_collection_id = CollectionUuid::new();
204+
205+
let mut graph = VersionGraph::new();
206+
let a_v0 = graph.add_node(VersionGraphNode {
207+
collection_id: a_collection_id,
208+
version: 0,
209+
created_at: (now - Duration::hours(48)),
210+
});
211+
let a_v1 = graph.add_node(VersionGraphNode {
212+
collection_id: a_collection_id,
213+
version: 1,
214+
created_at: (now - Duration::hours(24)),
215+
});
216+
let a_v2 = graph.add_node(VersionGraphNode {
217+
collection_id: a_collection_id,
218+
version: 2,
219+
created_at: (now - Duration::hours(12)),
220+
});
221+
let a_v3 = graph.add_node(VersionGraphNode {
222+
collection_id: a_collection_id,
223+
version: 3,
224+
created_at: (now - Duration::hours(1)),
225+
});
226+
let a_v4 = graph.add_node(VersionGraphNode {
227+
collection_id: a_collection_id,
228+
version: 4,
229+
created_at: now,
230+
});
231+
graph.add_edge(a_v0, a_v1, ());
232+
graph.add_edge(a_v1, a_v2, ());
233+
graph.add_edge(a_v2, a_v3, ());
234+
graph.add_edge(a_v3, a_v4, ());
235+
236+
let b_collection_id = CollectionUuid::new();
237+
let b_v0 = graph.add_node(VersionGraphNode {
238+
collection_id: b_collection_id,
239+
version: 0,
240+
created_at: (now - Duration::hours(23)),
241+
});
242+
let b_v1 = graph.add_node(VersionGraphNode {
243+
collection_id: b_collection_id,
244+
version: 1,
245+
created_at: (now - Duration::hours(12)),
246+
});
247+
let b_v2 = graph.add_node(VersionGraphNode {
248+
collection_id: b_collection_id,
249+
version: 2,
250+
created_at: (now - Duration::hours(1)),
251+
});
252+
graph.add_edge(b_v0, b_v1, ());
253+
graph.add_edge(b_v1, b_v2, ());
254+
// B was forked from A
255+
graph.add_edge(a_v1, b_v0, ());
256+
257+
let c_collection_id = CollectionUuid::new();
258+
let c_v0 = graph.add_node(VersionGraphNode {
259+
collection_id: c_collection_id,
260+
version: 0,
261+
created_at: (now - Duration::hours(1)),
262+
});
263+
// C was forked from B
264+
graph.add_edge(b_v2, c_v0, ());
265+
266+
let input = ComputeVersionsToDeleteInput {
267+
graph,
268+
cutoff_time: now - Duration::hours(6),
269+
min_versions_to_keep: 1,
270+
};
271+
272+
let result = ComputeVersionsToDeleteOperator {}
273+
.run(&input)
274+
.await
275+
.unwrap();
276+
277+
// Only collections A and B should have versions to delete
278+
assert_eq!(result.versions.len(), 2);
279+
280+
// For collection A: v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion.
281+
let a_versions = result.versions.get(&a_collection_id).unwrap();
282+
assert_eq!(
283+
*a_versions,
284+
vec![
285+
(0, CollectionVersionAction::Keep),
286+
(1, CollectionVersionAction::Delete),
287+
(2, CollectionVersionAction::Delete),
288+
(3, CollectionVersionAction::Keep),
289+
(4, CollectionVersionAction::Keep)
290+
]
291+
);
292+
293+
// For collection B: v0 is always kept, and the most recent version (v2) is kept. So v1 is marked for deletion.
294+
let b_versions = result.versions.get(&b_collection_id).unwrap();
295+
assert_eq!(
296+
*b_versions,
297+
vec![
298+
(0, CollectionVersionAction::Keep),
299+
(1, CollectionVersionAction::Delete),
300+
(2, CollectionVersionAction::Keep)
301+
]
302+
);
303+
}
304+
}

rust/garbage_collector/src/operators/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod compute_unused_between_versions;
22
pub mod compute_unused_files;
33
pub mod compute_versions_to_delete;
4+
pub mod compute_versions_to_delete_from_graph;
45
pub mod delete_unused_files;
56
pub mod delete_versions_at_sysdb;
67
pub mod fetch_lineage_file;

0 commit comments

Comments
 (0)