Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub unsafe extern "C" fn selection_vector_from_scan_metadata(
fn selection_vector_from_scan_metadata_impl(
scan_metadata: &ScanMetadata,
) -> DeltaResult<KernelBoolSlice> {
Ok(scan_metadata.scan_files.selection_vector.clone().into())
Ok(scan_metadata.scan_files.selection_vector().to_vec().into())
}

/// Drops a scan.
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn try_main() -> DeltaResult<()> {

if cli.metadata {
let (scan_metadata_batches, scan_metadata_rows) = scan_metadata
.map(|res| res.unwrap().scan_files.data.len())
.map(|res| res.unwrap().scan_files.data().len())
.fold((0, 0), |(batches, rows), len| (batches + 1, rows + len));
println!("Scan metadata: {scan_metadata_batches} chunks, {scan_metadata_rows} files",);
return Ok(());
Expand Down
27 changes: 15 additions & 12 deletions kernel/src/action_reconciliation/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ impl LogReplayProcessor for ActionReconciliationProcessor {
self.seen_protocol = visitor.seen_protocol;
self.seen_metadata = visitor.seen_metadata;

let filtered_data = FilteredEngineData {
data: actions,
selection_vector: visitor.selection_vector,
};
let filtered_data = FilteredEngineData::try_new(actions, visitor.selection_vector)?;

Ok(ActionReconciliationBatch {
filtered_data,
Expand Down Expand Up @@ -770,8 +767,11 @@ mod tests {

// Verify results
assert_eq!(results.len(), 2, "Expected two batches in results");
assert_eq!(results[0].selection_vector, vec![true, true, true],);
assert_eq!(results[1].selection_vector, vec![false, false, false, true],);
assert_eq!(results[0].selection_vector(), &vec![true, true, true]);
assert_eq!(
results[1].selection_vector(),
&vec![false, false, false, true]
);
assert_eq!(actions_count, 4);
assert_eq!(add_actions, 0);

Expand Down Expand Up @@ -809,8 +809,8 @@ mod tests {

// Verify results
assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions
assert_eq!(results[0].selection_vector, vec![true]);
assert_eq!(results[1].selection_vector, vec![false, true]);
assert_eq!(results[0].selection_vector(), &vec![true]);
assert_eq!(results[1].selection_vector(), &vec![false, true]);
assert_eq!(actions_count, 2);
assert_eq!(add_actions, 1);

Expand Down Expand Up @@ -845,8 +845,8 @@ mod tests {

// Verify results
assert_eq!(results.len(), 2);
assert_eq!(results[0].selection_vector, vec![true, true]);
assert_eq!(results[1].selection_vector, vec![false, false, true]);
assert_eq!(results[0].selection_vector(), &vec![true, true]);
assert_eq!(results[1].selection_vector(), &vec![false, false, true]);
assert_eq!(actions_count, 3);
assert_eq!(add_actions, 2);

Expand Down Expand Up @@ -927,13 +927,16 @@ mod tests {

// First batch: protocol, metadata, and one recent txn (old_app filtered out)
assert_eq!(
results[0].filtered_data.selection_vector,
results[0].filtered_data.selection_vector(),
vec![true, true, false, true]
);
assert_eq!(results[0].actions_count, 3);

// Second batch: timeless_app kept, another_old filtered out
assert_eq!(results[1].filtered_data.selection_vector, vec![true, false]);
assert_eq!(
results[1].filtered_data.selection_vector(),
vec![true, false]
);
assert_eq!(results[1].actions_count, 1);

Ok(())
Expand Down
8 changes: 3 additions & 5 deletions kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,10 @@ impl CheckpointWriter {
let last_checkpoint_path = LastCheckpointHint::path(&self.snapshot.log_segment().log_root)?;

// Write the `_last_checkpoint` file to `table/_delta_log/_last_checkpoint`
let filtered_data = FilteredEngineData::with_all_rows_selected(data?);
engine.json_handler().write_json_file(
&last_checkpoint_path,
Box::new(std::iter::once(data)),
Box::new(std::iter::once(Ok(filtered_data))),
true,
)?;

Expand Down Expand Up @@ -370,10 +371,7 @@ impl CheckpointWriter {
&[Scalar::from(self.version)],
)?;

let filtered_data = FilteredEngineData {
data: checkpoint_metadata_batch,
selection_vector: vec![true], // Include the action in the checkpoint
};
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);

Ok(ActionReconciliationBatch {
filtered_data,
Expand Down
23 changes: 12 additions & 11 deletions kernel/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::arrow::datatypes::{DataType, Schema};
use crate::checkpoint::create_last_checkpoint_data;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
use crate::log_replay::HasSelectionVector;
use crate::schema::{DataType as KernelDataType, StructField, StructType};
use crate::utils::test_utils::Action;
use crate::{DeltaResult, FileMeta, Snapshot};
Expand Down Expand Up @@ -77,13 +78,11 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> {
let writer = snapshot.checkpoint()?;

let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?;

// Check selection vector has one true value
assert_eq!(checkpoint_batch.filtered_data.selection_vector, vec![true]);
assert!(checkpoint_batch.filtered_data.has_selected_rows());

// Verify the underlying EngineData contains the expected CheckpointMetadata action
let arrow_engine_data =
ArrowEngineData::try_from_engine_data(checkpoint_batch.filtered_data.data)?;
let (underlying_data, _) = checkpoint_batch.filtered_data.into_parts();
let arrow_engine_data = ArrowEngineData::try_from_engine_data(underlying_data)?;
let record_batch = arrow_engine_data.record_batch();

// Build the expected RecordBatch
Expand Down Expand Up @@ -314,11 +313,11 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
let mut data_iter = writer.checkpoint_data(&engine)?;
// The first batch should be the metadata and protocol actions.
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true, true]);
assert_eq!(batch.selection_vector(), &[true, true]);

// The second batch should include both the add action and the remove action
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true, true]);
assert_eq!(batch.selection_vector(), &[true, true]);

// The third batch should not be included as the selection vector does not
// contain any true values, as the file added is removed in a following commit.
Expand Down Expand Up @@ -384,7 +383,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {
let mut data_iter = writer.checkpoint_data(&engine)?;
// The first batch should be the metadata and protocol actions.
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true, true]);
assert_eq!(batch.selection_vector(), &[true, true]);

// No more data should exist because we only requested version 0
assert!(data_iter.next().is_none());
Expand Down Expand Up @@ -488,15 +487,17 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
let mut data_iter = writer.checkpoint_data(&engine)?;
// The first batch should be the metadata and protocol actions.
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true, true]);
assert_eq!(batch.selection_vector(), &[true, true]);

// The second batch should include both the add action and the remove action
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true, true]);
assert_eq!(batch.selection_vector(), &[true, true]);

// The third batch should be the CheckpointMetaData action.
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector, [true]);
// According to the new contract, with_all_rows_selected creates an empty selection vector
assert_eq!(batch.selection_vector(), &[] as &[bool]);
assert!(batch.has_selected_rows());

// No more data should exist
assert!(data_iter.next().is_none());
Expand Down
103 changes: 95 additions & 8 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ use std::sync::{Arc, OnceLock};

use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _};
use crate::engine::ensure_data_types::DataTypeCompat;
use crate::engine_data::FilteredEngineData;
use crate::schema::{ColumnMetadataKey, MetadataValue};
use crate::{
engine::arrow_data::ArrowEngineData,
engine::arrow_data::{extract_record_batch, ArrowEngineData},
schema::{DataType, MetadataColumnSpec, Schema, SchemaRef, StructField, StructType},
utils::require,
DeltaResult, EngineData, Error,
};

use crate::arrow::array::{
cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray, MapArray,
OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray,
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
};
use crate::arrow::buffer::NullBuffer;
use crate::arrow::compute::concat_batches;
use crate::arrow::compute::filter_record_batch;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef,
Fields as ArrowFields, Int64Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
Expand Down Expand Up @@ -1076,13 +1078,30 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
// TODO (zach): this should stream data to the JSON writer and output an iterator.
#[internal_api]
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
data: impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
let filtered_data = chunk?;
// Honor the new contract: if selection vector is shorter than the number of rows,
// then all rows not covered by the selection vector are assumed to be selected
let (underlying_data, mut selection_vector) = filtered_data.into_parts();
let batch = extract_record_batch(&*underlying_data)?;
let num_rows = batch.num_rows();

if selection_vector.is_empty() {
// If selection vector is empty, write all rows per contract.
writer.write(batch)?;
} else {
// Extend the selection vector with `true` for uncovered rows
if selection_vector.len() < num_rows {
selection_vector.resize(num_rows, true);
}

let filtered_batch = filter_record_batch(batch, &BooleanArray::from(selection_vector))
.map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))?;
writer.write(&filtered_batch)?
};
}
writer.finish()?;
Ok(writer.into_inner())
Expand Down Expand Up @@ -2767,14 +2786,82 @@ mod tests {
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
let filtered_data = FilteredEngineData::with_all_rows_selected(data);
let json = to_json_bytes(Box::new(std::iter::once(Ok(filtered_data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
);
Ok(())
}

#[test]
fn test_to_json_bytes_filters_data() -> DeltaResult<()> {
// Create test data with 4 rows
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"value",
ArrowDataType::Utf8,
true,
)]));
let record_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
"row0", "row1", "row2", "row3",
]))],
)?;

// Helper function to create EngineData from the same record batch
let create_engine_data =
|| -> Box<dyn EngineData> { Box::new(ArrowEngineData::new(record_batch.clone())) };

// Test case 1: All rows selected (should include all 4 rows)
let all_selected =
FilteredEngineData::try_new(create_engine_data(), vec![true, true, true, true])?;
let json_all = to_json_bytes(Box::new(std::iter::once(Ok(all_selected))))?;
assert_eq!(
json_all,
"{\"value\":\"row0\"}\n{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n{\"value\":\"row3\"}\n".as_bytes()
);

// Test case 2: Only first and last rows selected (should include only 2 rows)
let partial_selected =
FilteredEngineData::try_new(create_engine_data(), vec![true, false, false, true])?;
let json_partial = to_json_bytes(Box::new(std::iter::once(Ok(partial_selected))))?;
assert_eq!(
json_partial,
"{\"value\":\"row0\"}\n{\"value\":\"row3\"}\n".as_bytes()
);

// Test case 3: Only middle rows selected (should include only 2 rows)
let middle_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, true, true, false])?;
let json_middle = to_json_bytes(Box::new(std::iter::once(Ok(middle_selected))))?;
assert_eq!(
json_middle,
"{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n".as_bytes()
);

// Test case 4: No rows selected (should produce empty output)
let none_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, false, false, false])?;
let json_none = to_json_bytes(Box::new(std::iter::once(Ok(none_selected))))?;
assert_eq!(json_none, "".as_bytes());

// Test case 5: Only one row selected (should include only 1 row)
let one_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, true, false, false])?;
let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?;
assert_eq!(json_one, "{\"value\":\"row1\"}\n".as_bytes());

// Test case 6: Only one row selected implicitly by short vector
let one_selected =
FilteredEngineData::try_new(create_engine_data(), vec![false, false, false])?;
let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?;
assert_eq!(json_one, "{\"value\":\"row3\"}\n".as_bytes());

Ok(())
}

#[test]
fn test_arrow_broken_nested_null_masks() {
use crate::arrow::datatypes::{DataType, Field, Schema};
Expand Down
11 changes: 8 additions & 3 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::engine::arrow_conversion::TryFromKernel as _;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::engine_data::FilteredEngineData;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, JsonHandler, PredicateRef,
Expand Down Expand Up @@ -136,7 +137,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down Expand Up @@ -774,7 +775,9 @@ mod tests {

// First write with no existing file
let data = create_test_data(vec!["remi", "wilson"])?;
let result = handler.write_json_file(&path, Box::new(std::iter::once(Ok(data))), overwrite);
let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data));
let result =
handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite);

// Verify the first write is successful
assert!(result.is_ok());
Expand All @@ -783,7 +786,9 @@ mod tests {

// Second write with existing file
let data = create_test_data(vec!["seb", "tia"])?;
let result = handler.write_json_file(&path, Box::new(std::iter::once(Ok(data))), overwrite);
let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data));
let result =
handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite);

if overwrite {
// Verify the second write is successful
Expand Down
7 changes: 6 additions & 1 deletion kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod tests {
use crate::arrow::array::{RecordBatch, StringArray};
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine_data::FilteredEngineData;
use crate::{Engine, EngineData};

use test_utils::delta_path_for_version;
Expand All @@ -47,7 +48,11 @@ mod tests {
engine_data: impl Fn() -> Box<dyn EngineData>,
) {
let json = engine.json_handler();
let get_data = || Box::new(std::iter::once(Ok(engine_data())));
let get_data = || {
let data = engine_data();
let filtered_data = FilteredEngineData::with_all_rows_selected(data);
Box::new(std::iter::once(Ok(filtered_data)))
};

let expected_names: Vec<Path> = (1..4)
.map(|i| delta_path_for_version(i, "json"))
Expand Down
Loading
Loading