|
| 1 | +use crate::construct_version_graph_orchestrator::{VersionGraph, VersionStatus}; |
| 2 | +use async_trait::async_trait; |
| 3 | +use chroma_error::{ChromaError, ErrorCodes}; |
| 4 | +use chroma_system::{Operator, OperatorType}; |
| 5 | +use chroma_types::CollectionUuid; |
| 6 | +use chrono::{DateTime, Utc}; |
| 7 | +use petgraph::visit::Topo; |
| 8 | +use std::collections::HashMap; |
| 9 | +use thiserror::Error; |
| 10 | + |
| 11 | +#[derive(Clone, Debug)] |
| 12 | +pub struct ComputeVersionsToDeleteOperator {} |
| 13 | + |
| 14 | +#[derive(Debug)] |
| 15 | +pub struct ComputeVersionsToDeleteInput { |
| 16 | + pub graph: VersionGraph, |
| 17 | + pub cutoff_time: DateTime<Utc>, |
| 18 | + pub min_versions_to_keep: u32, |
| 19 | +} |
| 20 | + |
| 21 | +#[derive(Debug, PartialEq)] |
| 22 | +pub enum CollectionVersionAction { |
| 23 | + Keep, |
| 24 | + Delete, |
| 25 | +} |
| 26 | + |
| 27 | +#[derive(Debug)] |
| 28 | +pub struct ComputeVersionsToDeleteOutput { |
| 29 | + pub versions: HashMap<CollectionUuid, HashMap<i64, CollectionVersionAction>>, |
| 30 | +} |
| 31 | + |
| 32 | +#[derive(Error, Debug)] |
| 33 | +pub enum ComputeVersionsToDeleteError { |
| 34 | + #[error("Error computing versions to delete: {0}")] |
| 35 | + ComputeError(String), |
| 36 | + #[error("Invalid timestamp in version file")] |
| 37 | + InvalidTimestamp, |
| 38 | + #[error("Error parsing version file: {0}")] |
| 39 | + ParseError(#[from] prost::DecodeError), |
| 40 | + #[error("Graph is missing expected node")] |
| 41 | + MissingVersionGraphNode, |
| 42 | +} |
| 43 | + |
| 44 | +impl ChromaError for ComputeVersionsToDeleteError { |
| 45 | + fn code(&self) -> ErrorCodes { |
| 46 | + ErrorCodes::Internal |
| 47 | + } |
| 48 | +} |
| 49 | + |
| 50 | +#[async_trait] |
| 51 | +impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput> |
| 52 | + for ComputeVersionsToDeleteOperator |
| 53 | +{ |
| 54 | + type Error = ComputeVersionsToDeleteError; |
| 55 | + |
| 56 | + fn get_type(&self) -> OperatorType { |
| 57 | + OperatorType::Other |
| 58 | + } |
| 59 | + |
| 60 | + async fn run( |
| 61 | + &self, |
| 62 | + input: &ComputeVersionsToDeleteInput, |
| 63 | + ) -> Result<ComputeVersionsToDeleteOutput, ComputeVersionsToDeleteError> { |
| 64 | + let mut visitor = Topo::new(&input.graph); |
| 65 | + |
| 66 | + let mut versions_by_collection: HashMap< |
| 67 | + CollectionUuid, |
| 68 | + Vec<(i64, DateTime<Utc>, CollectionVersionAction)>, |
| 69 | + > = HashMap::new(); |
| 70 | + |
| 71 | + while let Some(node_i) = visitor.next(&input.graph) { |
| 72 | + let node = input |
| 73 | + .graph |
| 74 | + .node_weight(node_i) |
| 75 | + .ok_or(ComputeVersionsToDeleteError::MissingVersionGraphNode)?; |
| 76 | + |
| 77 | + match node.status { |
| 78 | + VersionStatus::Alive { created_at } => { |
| 79 | + versions_by_collection |
| 80 | + .entry(node.collection_id) |
| 81 | + .or_default() |
| 82 | + .push((node.version, created_at, CollectionVersionAction::Keep)); |
| 83 | + } |
| 84 | + VersionStatus::Deleted => {} |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + for versions in versions_by_collection.values_mut() { |
| 89 | + for (version, created_at, mode) in versions |
| 90 | + .iter_mut() |
| 91 | + .rev() |
| 92 | + .skip(input.min_versions_to_keep as usize) |
| 93 | + { |
| 94 | + // Always keep version 0 |
| 95 | + if *version == 0 { |
| 96 | + tracing::debug!("Keeping version 0"); |
| 97 | + continue; |
| 98 | + } |
| 99 | + |
| 100 | + if *created_at < input.cutoff_time { |
| 101 | + *mode = CollectionVersionAction::Delete; |
| 102 | + } else { |
| 103 | + tracing::debug!( |
| 104 | + version = *version, |
| 105 | + created_at = %created_at, |
| 106 | + cutoff_time = %input.cutoff_time, |
| 107 | + "Keeping version {version} created at {created_at} after cutoff time {}", input.cutoff_time |
| 108 | + ); |
| 109 | + } |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + // todo: rename |
| 114 | + Ok(ComputeVersionsToDeleteOutput { |
| 115 | + versions: versions_by_collection |
| 116 | + .into_iter() |
| 117 | + .map(|(collection_id, versions)| { |
| 118 | + let versions: HashMap<_, _> = versions |
| 119 | + .into_iter() |
| 120 | + .map(|(version, _, mode)| (version, mode)) |
| 121 | + .collect(); |
| 122 | + let num_versions = versions.len(); |
| 123 | + tracing::debug!( |
| 124 | + collection_id = %collection_id, |
| 125 | + versions_to_delete = ?versions, |
| 126 | + "Processed {num_versions} versions from collection {collection_id}" |
| 127 | + ); |
| 128 | + |
| 129 | + (collection_id, versions) |
| 130 | + }) |
| 131 | + .collect(), |
| 132 | + }) |
| 133 | + } |
| 134 | +} |
| 135 | + |
| 136 | +#[cfg(test)] |
| 137 | +mod tests { |
| 138 | + use super::*; |
| 139 | + use crate::construct_version_graph_orchestrator::{VersionGraphNode, VersionStatus}; |
| 140 | + use chrono::{Duration, Utc}; |
| 141 | + use tracing_test::traced_test; |
| 142 | + |
| 143 | + #[tokio::test] |
| 144 | + #[traced_test] |
| 145 | + async fn test_compute_versions_to_delete() { |
| 146 | + let now = Utc::now(); |
| 147 | + |
| 148 | + let collection_id = CollectionUuid::new(); |
| 149 | + |
| 150 | + let mut graph = VersionGraph::new(); |
| 151 | + let v0 = graph.add_node(VersionGraphNode { |
| 152 | + collection_id, |
| 153 | + version: 0, |
| 154 | + status: VersionStatus::Alive { |
| 155 | + created_at: (now - Duration::hours(48)), |
| 156 | + }, |
| 157 | + }); |
| 158 | + let v1 = graph.add_node(VersionGraphNode { |
| 159 | + collection_id, |
| 160 | + version: 1, |
| 161 | + status: VersionStatus::Alive { |
| 162 | + created_at: (now - Duration::hours(24)), |
| 163 | + }, |
| 164 | + }); |
| 165 | + let v2 = graph.add_node(VersionGraphNode { |
| 166 | + collection_id, |
| 167 | + version: 2, |
| 168 | + status: VersionStatus::Alive { |
| 169 | + created_at: (now - Duration::hours(12)), |
| 170 | + }, |
| 171 | + }); |
| 172 | + let v3 = graph.add_node(VersionGraphNode { |
| 173 | + collection_id, |
| 174 | + version: 3, |
| 175 | + status: VersionStatus::Alive { |
| 176 | + created_at: (now - Duration::hours(1)), |
| 177 | + }, |
| 178 | + }); |
| 179 | + let v4 = graph.add_node(VersionGraphNode { |
| 180 | + collection_id, |
| 181 | + version: 4, |
| 182 | + status: VersionStatus::Alive { created_at: now }, |
| 183 | + }); |
| 184 | + graph.add_edge(v0, v1, ()); |
| 185 | + graph.add_edge(v1, v2, ()); |
| 186 | + graph.add_edge(v2, v3, ()); |
| 187 | + graph.add_edge(v3, v4, ()); |
| 188 | + |
| 189 | + let input = ComputeVersionsToDeleteInput { |
| 190 | + graph, |
| 191 | + cutoff_time: now - Duration::hours(6), |
| 192 | + min_versions_to_keep: 1, |
| 193 | + }; |
| 194 | + |
| 195 | + let mut result = ComputeVersionsToDeleteOperator {} |
| 196 | + .run(&input) |
| 197 | + .await |
| 198 | + .unwrap(); |
| 199 | + |
| 200 | + // v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion. |
| 201 | + assert_eq!(result.versions.len(), 1); |
| 202 | + let versions = result.versions.remove(&collection_id).unwrap(); |
| 203 | + assert_eq!( |
| 204 | + versions.into_iter().collect::<Vec<_>>(), |
| 205 | + vec![ |
| 206 | + (0, CollectionVersionAction::Keep), |
| 207 | + (1, CollectionVersionAction::Delete), |
| 208 | + (2, CollectionVersionAction::Delete), |
| 209 | + (3, CollectionVersionAction::Keep), |
| 210 | + (4, CollectionVersionAction::Keep) |
| 211 | + ] |
| 212 | + ); |
| 213 | + } |
| 214 | + |
| 215 | + #[tokio::test] |
| 216 | + #[traced_test] |
| 217 | + async fn test_compute_versions_to_delete_fork_tree() { |
| 218 | + let now = Utc::now(); |
| 219 | + |
| 220 | + let a_collection_id = CollectionUuid::new(); |
| 221 | + |
| 222 | + let mut graph = VersionGraph::new(); |
| 223 | + let a_v0 = graph.add_node(VersionGraphNode { |
| 224 | + collection_id: a_collection_id, |
| 225 | + version: 0, |
| 226 | + status: VersionStatus::Alive { |
| 227 | + created_at: (now - Duration::hours(48)), |
| 228 | + }, |
| 229 | + }); |
| 230 | + let a_v1 = graph.add_node(VersionGraphNode { |
| 231 | + collection_id: a_collection_id, |
| 232 | + version: 1, |
| 233 | + status: VersionStatus::Alive { |
| 234 | + created_at: (now - Duration::hours(24)), |
| 235 | + }, |
| 236 | + }); |
| 237 | + let a_v2 = graph.add_node(VersionGraphNode { |
| 238 | + collection_id: a_collection_id, |
| 239 | + version: 2, |
| 240 | + status: VersionStatus::Alive { |
| 241 | + created_at: (now - Duration::hours(12)), |
| 242 | + }, |
| 243 | + }); |
| 244 | + let a_v3 = graph.add_node(VersionGraphNode { |
| 245 | + collection_id: a_collection_id, |
| 246 | + version: 3, |
| 247 | + status: VersionStatus::Alive { |
| 248 | + created_at: (now - Duration::hours(1)), |
| 249 | + }, |
| 250 | + }); |
| 251 | + let a_v4 = graph.add_node(VersionGraphNode { |
| 252 | + collection_id: a_collection_id, |
| 253 | + version: 4, |
| 254 | + status: VersionStatus::Alive { created_at: now }, |
| 255 | + }); |
| 256 | + graph.add_edge(a_v0, a_v1, ()); |
| 257 | + graph.add_edge(a_v1, a_v2, ()); |
| 258 | + graph.add_edge(a_v2, a_v3, ()); |
| 259 | + graph.add_edge(a_v3, a_v4, ()); |
| 260 | + |
| 261 | + let b_collection_id = CollectionUuid::new(); |
| 262 | + let b_v0 = graph.add_node(VersionGraphNode { |
| 263 | + collection_id: b_collection_id, |
| 264 | + version: 0, |
| 265 | + status: VersionStatus::Alive { |
| 266 | + created_at: (now - Duration::hours(23)), |
| 267 | + }, |
| 268 | + }); |
| 269 | + let b_v1 = graph.add_node(VersionGraphNode { |
| 270 | + collection_id: b_collection_id, |
| 271 | + version: 1, |
| 272 | + status: VersionStatus::Alive { |
| 273 | + created_at: (now - Duration::hours(12)), |
| 274 | + }, |
| 275 | + }); |
| 276 | + let b_v2 = graph.add_node(VersionGraphNode { |
| 277 | + collection_id: b_collection_id, |
| 278 | + version: 2, |
| 279 | + status: VersionStatus::Alive { |
| 280 | + created_at: (now - Duration::hours(1)), |
| 281 | + }, |
| 282 | + }); |
| 283 | + graph.add_edge(b_v0, b_v1, ()); |
| 284 | + graph.add_edge(b_v1, b_v2, ()); |
| 285 | + // B was forked from A |
| 286 | + graph.add_edge(a_v1, b_v0, ()); |
| 287 | + |
| 288 | + let c_collection_id = CollectionUuid::new(); |
| 289 | + let c_v0 = graph.add_node(VersionGraphNode { |
| 290 | + collection_id: c_collection_id, |
| 291 | + version: 0, |
| 292 | + status: VersionStatus::Alive { |
| 293 | + created_at: (now - Duration::hours(1)), |
| 294 | + }, |
| 295 | + }); |
| 296 | + // C was forked from B |
| 297 | + graph.add_edge(b_v2, c_v0, ()); |
| 298 | + |
| 299 | + let input = ComputeVersionsToDeleteInput { |
| 300 | + graph, |
| 301 | + cutoff_time: now - Duration::hours(6), |
| 302 | + min_versions_to_keep: 1, |
| 303 | + }; |
| 304 | + |
| 305 | + let mut result = ComputeVersionsToDeleteOperator {} |
| 306 | + .run(&input) |
| 307 | + .await |
| 308 | + .unwrap(); |
| 309 | + |
| 310 | + // Only collections A and B should have versions to delete |
| 311 | + assert_eq!(result.versions.len(), 2); |
| 312 | + |
| 313 | + // For collection A: v0 is always kept, and the most recent version (v4) is kept. v3 is not eligible for deletion because it is after the cutoff time. So v1 and v2 are marked for deletion. |
| 314 | + let a_versions = result.versions.remove(&a_collection_id).unwrap(); |
| 315 | + assert_eq!( |
| 316 | + a_versions.into_iter().collect::<Vec<_>>(), |
| 317 | + vec![ |
| 318 | + (0, CollectionVersionAction::Keep), |
| 319 | + (1, CollectionVersionAction::Delete), |
| 320 | + (2, CollectionVersionAction::Delete), |
| 321 | + (3, CollectionVersionAction::Keep), |
| 322 | + (4, CollectionVersionAction::Keep) |
| 323 | + ] |
| 324 | + ); |
| 325 | + |
| 326 | + // For collection B: v0 is always kept, and the most recent version (v2) is kept. So v1 is marked for deletion. |
| 327 | + let b_versions = result.versions.remove(&b_collection_id).unwrap(); |
| 328 | + assert_eq!( |
| 329 | + b_versions.into_iter().collect::<Vec<_>>(), |
| 330 | + vec![ |
| 331 | + (0, CollectionVersionAction::Keep), |
| 332 | + (1, CollectionVersionAction::Delete), |
| 333 | + (2, CollectionVersionAction::Keep) |
| 334 | + ] |
| 335 | + ); |
| 336 | + } |
| 337 | +} |
0 commit comments