diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index 9f4016da1..ffe4d44dc 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -26,7 +26,7 @@ use tracing::debug; /// The subset of file action fields that uniquely identifies it in the log, used for deduplication /// of adds and removes during log replay. -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, Clone)] pub(crate) struct FileActionKey { pub(crate) path: String, pub(crate) dv_unique_id: Option, @@ -357,3 +357,252 @@ pub(crate) trait HasSelectionVector { /// Check if the selection vector contains at least one selected row fn has_selected_rows(&self) -> bool; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine_data::GetData; + use crate::DeltaResult; + use std::collections::{HashMap, HashSet}; + + /// Mock GetData implementation for testing + struct MockGetData { + string_values: HashMap<(usize, String), String>, + int_values: HashMap<(usize, String), i32>, + errors: HashMap<(usize, String), String>, + } + + impl MockGetData { + fn new() -> Self { + Self { + string_values: HashMap::new(), + int_values: HashMap::new(), + errors: HashMap::new(), + } + } + + fn add_string(&mut self, row: usize, field: &str, value: &str) { + self.string_values + .insert((row, field.to_string()), value.to_string()); + } + + fn add_int(&mut self, row: usize, field: &str, value: i32) { + self.int_values.insert((row, field.to_string()), value); + } + } + + impl<'a> GetData<'a> for MockGetData { + fn get_str(&'a self, row_index: usize, field_name: &str) -> DeltaResult> { + if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) { + return Err(crate::Error::Generic(error_msg.clone())); + } + Ok(self + .string_values + .get(&(row_index, field_name.to_string())) + .map(|s| s.as_str())) + } + + fn get_int(&'a self, row_index: usize, field_name: &str) -> DeltaResult> { + if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) { + return Err(crate::Error::Generic(error_msg.clone())); + } + Ok(self + .int_values + .get(&(row_index, field_name.to_string())) + .cloned()) + } + } + + /// Helper to create a FileActionDeduplicator with standard indices + fn create_deduplicator( + seen: &mut HashSet, + is_log_batch: bool, + ) -> FileActionDeduplicator<'_> { + FileActionDeduplicator::new( + seen, + is_log_batch, + 0, // add_path_index + 5, // remove_path_index + 2, // add_dv_start_index + 6, // remove_dv_start_index + ) + } + + /// Helper to create a getters array with mocks at specific positions + fn create_getters_with_mocks<'a>( + add_mock: Option<&'a MockGetData>, + remove_mock: Option<&'a MockGetData>, + ) -> Vec<&'a dyn GetData<'a>> { + use std::sync::LazyLock; + static EMPTY: LazyLock = LazyLock::new(MockGetData::new); + + let empty_ref = &*EMPTY; + vec![ + add_mock.unwrap_or(empty_ref), // 0: add.path + empty_ref, // 1: (unused) + add_mock.unwrap_or(empty_ref), // 2: add.dv.storageType + add_mock.unwrap_or(empty_ref), // 3: add.dv.pathOrInlineDv + add_mock.unwrap_or(empty_ref), // 4: add.dv.offset + remove_mock.unwrap_or(empty_ref), // 5: remove.path + remove_mock.unwrap_or(empty_ref), // 6: remove.dv.storageType + remove_mock.unwrap_or(empty_ref), // 7: remove.dv.pathOrInlineDv + remove_mock.unwrap_or(empty_ref), // 8: remove.dv.offset + ] + } + + #[test] + fn test_extract_file_action_add() -> DeltaResult<()> { + let mut seen = HashSet::new(); + let deduplicator = create_deduplicator(&mut seen, true); + + let mut mock_add = MockGetData::new(); + mock_add.add_string(0, "add.path", "file1.parquet"); + let getters = create_getters_with_mocks(Some(&mock_add), None); + let result = deduplicator.extract_file_action(0, &getters, false)?; + + assert!(result.is_some()); + let (key, is_add) = result.unwrap(); + assert_eq!(key.path, "file1.parquet"); + assert!(key.dv_unique_id.is_none()); + assert!(is_add); + + Ok(()) + } + + #[test] + fn test_extract_file_action_remove() -> DeltaResult<()> { + let mut seen = HashSet::new(); + let deduplicator = create_deduplicator(&mut seen, true); + + let mut mock_remove = MockGetData::new(); + mock_remove.add_string(0, "remove.path", "file2.parquet"); + let getters = create_getters_with_mocks(None, Some(&mock_remove)); + let result = deduplicator.extract_file_action(0, &getters, false)?; + + assert!(result.is_some()); + let (key, is_add) = result.unwrap(); + assert_eq!(key.path, "file2.parquet"); + assert!(!is_add); + + Ok(()) + } + + #[test] + fn test_extract_file_action_with_deletion_vector() -> DeltaResult<()> { + let mut seen = HashSet::new(); + let deduplicator = create_deduplicator(&mut seen, true); + + let mut mock_dv = MockGetData::new(); + mock_dv.add_string(0, "add.path", "file_with_dv.parquet"); + mock_dv.add_string(0, "deletionVector.storageType", "s3"); + mock_dv.add_string(0, "deletionVector.pathOrInlineDv", "path/to/dv"); + mock_dv.add_int(0, "deletionVector.offset", 100); + let getters = create_getters_with_mocks(Some(&mock_dv), None); + let result = deduplicator.extract_file_action(0, &getters, false)?; + + assert!(result.is_some()); + let (key, is_add) = result.unwrap(); + assert!(key.dv_unique_id.is_some()); + assert!(is_add); + + Ok(()) + } + + #[test] + fn test_extract_file_action_skip_removes() -> DeltaResult<()> { + let mut seen = HashSet::new(); + let deduplicator = create_deduplicator(&mut seen, true); + + let mut mock_remove = MockGetData::new(); + mock_remove.add_string(0, "remove.path", "file2.parquet"); + let getters = create_getters_with_mocks(None, Some(&mock_remove)); + + // With skip_removes=true, should return None + assert!(deduplicator + .extract_file_action(0, &getters, true)? + .is_none()); + + // With skip_removes=false, should return Some + assert!(deduplicator + .extract_file_action(0, &getters, false)? + .is_some()); + + Ok(()) + } + + #[test] + fn test_extract_file_action_no_action_found() -> DeltaResult<()> { + let mut seen = HashSet::new(); + let deduplicator = create_deduplicator(&mut seen, true); + + let getters = create_getters_with_mocks(None, None); + assert!(deduplicator + .extract_file_action(0, &getters, false)? + .is_none()); + + Ok(()) + } + + #[test] + fn test_check_and_record_seen() { + let mut seen = HashSet::new(); + + // Pre-populate with an existing key + let pre_existing_key = FileActionKey::new("existing.parquet", None); + seen.insert(pre_existing_key.clone()); + + let key1 = FileActionKey::new("file1.parquet", None); + let key2 = FileActionKey::new("file2.parquet", None); + let key_with_dv = FileActionKey::new("file1.parquet", Some("dv1".to_string())); + + // Test with log batch (should record keys) + { + let mut deduplicator = create_deduplicator(&mut seen, true); + + // Pre-existing key should be detected as duplicate + assert!(deduplicator.check_and_record_seen(pre_existing_key.clone())); + + // First time seeing keys, should return false and record them + assert!(!deduplicator.check_and_record_seen(key1.clone())); + assert!(!deduplicator.check_and_record_seen(key2.clone())); + assert!(!deduplicator.check_and_record_seen(key_with_dv.clone())); + + // Second time seeing keys, should return true (duplicates) + assert!(deduplicator.check_and_record_seen(key1.clone())); + assert!(deduplicator.check_and_record_seen(key_with_dv.clone())); + } + + // Keys should be recorded in seen set + assert!(seen.contains(&key1)); + assert!(seen.contains(&key2)); + assert!(seen.contains(&key_with_dv)); + + // Test with checkpoint batch (should NOT record keys) + { + let mut deduplicator = create_deduplicator(&mut seen, false); + + let new_key = FileActionKey::new("new.parquet", None); + + // First time seeing new_key in checkpoint, should return false but NOT record it + assert!(!deduplicator.check_and_record_seen(new_key.clone())); + // Still returns false on second call (not recorded) + assert!(!deduplicator.check_and_record_seen(new_key.clone())); + + // Existing keys from seen set should still be detected + assert!(deduplicator.check_and_record_seen(key1.clone())); + } + } + + #[test] + fn test_is_log_batch() { + let mut seen = HashSet::new(); + + // Test with is_log_batch = true + let deduplicator_log = create_deduplicator(&mut seen, true); + assert!(deduplicator_log.is_log_batch()); + + // Test with is_log_batch = false + let deduplicator_checkpoint = create_deduplicator(&mut seen, false); + assert!(!deduplicator_checkpoint.is_log_batch()); + } +}