Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 250 additions & 1 deletion kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::debug;

/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
/// of adds and removes during log replay.
#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub(crate) struct FileActionKey {
pub(crate) path: String,
pub(crate) dv_unique_id: Option<String>,
Expand Down Expand Up @@ -357,3 +357,252 @@ pub(crate) trait HasSelectionVector {
/// Check if the selection vector contains at least one selected row
fn has_selected_rows(&self) -> bool;
}

#[cfg(test)]
mod tests {
use super::*;
use crate::engine_data::GetData;
use crate::DeltaResult;
use std::collections::{HashMap, HashSet};

/// Mock GetData implementation for testing
struct MockGetData {
string_values: HashMap<(usize, String), String>,
int_values: HashMap<(usize, String), i32>,
errors: HashMap<(usize, String), String>,
}

impl MockGetData {
fn new() -> Self {
Self {
string_values: HashMap::new(),
int_values: HashMap::new(),
errors: HashMap::new(),
}
}

fn add_string(&mut self, row: usize, field: &str, value: &str) {
self.string_values
.insert((row, field.to_string()), value.to_string());
}

fn add_int(&mut self, row: usize, field: &str, value: i32) {
self.int_values.insert((row, field.to_string()), value);
}
}

impl<'a> GetData<'a> for MockGetData {
fn get_str(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<&'a str>> {
if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) {
return Err(crate::Error::Generic(error_msg.clone()));
}
Ok(self
.string_values
.get(&(row_index, field_name.to_string()))
.map(|s| s.as_str()))
}

fn get_int(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<i32>> {
if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) {
return Err(crate::Error::Generic(error_msg.clone()));
}
Ok(self
.int_values
.get(&(row_index, field_name.to_string()))
.cloned())
}
}

/// Helper to create a FileActionDeduplicator with standard indices
fn create_deduplicator(
seen: &mut HashSet<FileActionKey>,
is_log_batch: bool,
) -> FileActionDeduplicator<'_> {
FileActionDeduplicator::new(
seen,
is_log_batch,
0, // add_path_index
5, // remove_path_index
2, // add_dv_start_index
6, // remove_dv_start_index
)
}

/// Helper to create a getters array with mocks at specific positions
fn create_getters_with_mocks<'a>(
add_mock: Option<&'a MockGetData>,
remove_mock: Option<&'a MockGetData>,
) -> Vec<&'a dyn GetData<'a>> {
use std::sync::LazyLock;
static EMPTY: LazyLock<MockGetData> = LazyLock::new(MockGetData::new);

let empty_ref = &*EMPTY;
vec![
add_mock.unwrap_or(empty_ref), // 0: add.path
empty_ref, // 1: (unused)
add_mock.unwrap_or(empty_ref), // 2: add.dv.storageType
add_mock.unwrap_or(empty_ref), // 3: add.dv.pathOrInlineDv
add_mock.unwrap_or(empty_ref), // 4: add.dv.offset
remove_mock.unwrap_or(empty_ref), // 5: remove.path
remove_mock.unwrap_or(empty_ref), // 6: remove.dv.storageType
remove_mock.unwrap_or(empty_ref), // 7: remove.dv.pathOrInlineDv
remove_mock.unwrap_or(empty_ref), // 8: remove.dv.offset
]
}

#[test]
fn test_extract_file_action_add() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);

let mut mock_add = MockGetData::new();
mock_add.add_string(0, "add.path", "file1.parquet");
let getters = create_getters_with_mocks(Some(&mock_add), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;

assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "file1.parquet");
assert!(key.dv_unique_id.is_none());
assert!(is_add);

Ok(())
}

#[test]
fn test_extract_file_action_remove() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);

let mut mock_remove = MockGetData::new();
mock_remove.add_string(0, "remove.path", "file2.parquet");
let getters = create_getters_with_mocks(None, Some(&mock_remove));
let result = deduplicator.extract_file_action(0, &getters, false)?;

assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "file2.parquet");
assert!(!is_add);

Ok(())
}

#[test]
fn test_extract_file_action_with_deletion_vector() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);

let mut mock_dv = MockGetData::new();
mock_dv.add_string(0, "add.path", "file_with_dv.parquet");
mock_dv.add_string(0, "deletionVector.storageType", "s3");
mock_dv.add_string(0, "deletionVector.pathOrInlineDv", "path/to/dv");
mock_dv.add_int(0, "deletionVector.offset", 100);
let getters = create_getters_with_mocks(Some(&mock_dv), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;

assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert!(key.dv_unique_id.is_some());
assert!(is_add);

Ok(())
}

#[test]
fn test_extract_file_action_skip_removes() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);

let mut mock_remove = MockGetData::new();
mock_remove.add_string(0, "remove.path", "file2.parquet");
let getters = create_getters_with_mocks(None, Some(&mock_remove));

// With skip_removes=true, should return None
assert!(deduplicator
.extract_file_action(0, &getters, true)?
.is_none());

// With skip_removes=false, should return Some
assert!(deduplicator
.extract_file_action(0, &getters, false)?
.is_some());

Ok(())
}

#[test]
fn test_extract_file_action_no_action_found() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);

let getters = create_getters_with_mocks(None, None);
assert!(deduplicator
.extract_file_action(0, &getters, false)?
.is_none());

Ok(())
}

#[test]
fn test_check_and_record_seen() {
let mut seen = HashSet::new();

// Pre-populate with an existing key
let pre_existing_key = FileActionKey::new("existing.parquet", None);
seen.insert(pre_existing_key.clone());

let key1 = FileActionKey::new("file1.parquet", None);
let key2 = FileActionKey::new("file2.parquet", None);
let key_with_dv = FileActionKey::new("file1.parquet", Some("dv1".to_string()));

// Test with log batch (should record keys)
{
let mut deduplicator = create_deduplicator(&mut seen, true);

// Pre-existing key should be detected as duplicate
assert!(deduplicator.check_and_record_seen(pre_existing_key.clone()));

// First time seeing keys, should return false and record them
assert!(!deduplicator.check_and_record_seen(key1.clone()));
assert!(!deduplicator.check_and_record_seen(key2.clone()));
assert!(!deduplicator.check_and_record_seen(key_with_dv.clone()));

// Second time seeing keys, should return true (duplicates)
assert!(deduplicator.check_and_record_seen(key1.clone()));
assert!(deduplicator.check_and_record_seen(key_with_dv.clone()));
}

// Keys should be recorded in seen set
assert!(seen.contains(&key1));
assert!(seen.contains(&key2));
assert!(seen.contains(&key_with_dv));

// Test with checkpoint batch (should NOT record keys)
{
let mut deduplicator = create_deduplicator(&mut seen, false);

let new_key = FileActionKey::new("new.parquet", None);

// First time seeing new_key in checkpoint, should return false but NOT record it
assert!(!deduplicator.check_and_record_seen(new_key.clone()));
// Still returns false on second call (not recorded)
assert!(!deduplicator.check_and_record_seen(new_key.clone()));

// Existing keys from seen set should still be detected
assert!(deduplicator.check_and_record_seen(key1.clone()));
}
}

#[test]
fn test_is_log_batch() {
let mut seen = HashSet::new();

// Test with is_log_batch = true
let deduplicator_log = create_deduplicator(&mut seen, true);
assert!(deduplicator_log.is_log_batch());

// Test with is_log_batch = false
let deduplicator_checkpoint = create_deduplicator(&mut seen, false);
assert!(!deduplicator_checkpoint.is_log_batch());
}
}
Loading