Skip to content
16 changes: 13 additions & 3 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.preserve_insertion_order,
)?;
let metrics = plan
.execute(
Expand Down Expand Up @@ -877,12 +878,15 @@ pub fn create_merge_plan(
filters: &[PartitionFilter],
target_size: Option<i64>,
writer_properties: WriterProperties,
preserve_insertion_order: bool,
Comment on lines 880 to +881
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what you think about this going into WriterProperties rather than all these functions growing an additional argument.

Does this only benefit call paths for the create_merge_plan flow?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writer properties are used in other operations, so we should only add it if it makes senso for those ops as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my thinking as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind if I mark this resolved?

) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());
let partitions_keys = &snapshot.metadata().partition_columns;

let (operations, metrics) = match optimize_type {
OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?,
OptimizeType::Compact => {
build_compaction_plan(snapshot, filters, target_size, preserve_insertion_order)?
}
OptimizeType::ZOrder(zorder_columns) => {
build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)?
}
Expand Down Expand Up @@ -958,6 +962,7 @@ fn build_compaction_plan(
snapshot: &DeltaTableState,
filters: &[PartitionFilter],
target_size: i64,
perserve_insertion_order: bool,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

Expand Down Expand Up @@ -985,8 +990,13 @@ fn build_compaction_plan(
}

for (_, file) in partition_files.values_mut() {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
if perserve_insertion_order {
// sort files by modification date
file.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
} else {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
}
}

let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
Expand Down
58 changes: 58 additions & 0 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -351,6 +352,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -410,6 +412,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
&[],
None,
WriterProperties::builder().build(),
false,
)?;

let metrics = plan
Expand Down Expand Up @@ -867,6 +870,61 @@ async fn test_zorder_respects_target_size() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
async fn test_preserve_insertion_order() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

// first file
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 1), (1, 2), (1, 3), (1, 4)], "2022-05-22")?,
)
.await?;

// later file
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 5), (2, 6), (2, 7), (2, 8)], "2022-05-22")?,
)
.await?;

let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt)
.optimize()
.with_target_size(2_000_000)
.with_filters(&filter)
.with_preserve_insertion_order(true);
let (dt, metrics) = optimize.await?;

assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
assert_eq!(metrics.total_files_skipped, 0);
assert_eq!(metrics.total_considered_files, 2);

// Check data
let files = dt.get_files_iter()?.collect::<Vec<_>>();
assert_eq!(files.len(), 1);

let actual = read_parquet_file(&files[0], dt.object_store()).await?;
let expected = RecordBatch::try_new(
actual.schema(),
// file created later is merged first
vec![
Arc::new(Int32Array::from(vec![2, 2, 2, 2, 1, 1, 1, 1])),
Arc::new(Int32Array::from(vec![5, 6, 7, 8, 1, 2, 3, 4])),
],
)?;

assert_eq!(actual, expected);

Ok(())
}

async fn read_parquet_file(
path: &Path,
object_store: ObjectStoreRef,
Expand Down