diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index f44f2cb5e6..8f2b5fa59b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -331,6 +331,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.filters, this.target_size.to_owned(), writer_properties, + this.preserve_insertion_order, )?; let metrics = plan .execute( @@ -877,12 +878,15 @@ pub fn create_merge_plan( filters: &[PartitionFilter], target_size: Option, writer_properties: WriterProperties, + preserve_insertion_order: bool, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); let partitions_keys = &snapshot.metadata().partition_columns; let (operations, metrics) = match optimize_type { - OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?, + OptimizeType::Compact => { + build_compaction_plan(snapshot, filters, target_size, preserve_insertion_order)? + } OptimizeType::ZOrder(zorder_columns) => { build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)? } @@ -958,6 +962,7 @@ fn build_compaction_plan( snapshot: &DeltaTableState, filters: &[PartitionFilter], target_size: i64, + perserve_insertion_order: bool, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { let mut metrics = Metrics::default(); @@ -985,8 +990,13 @@ fn build_compaction_plan( } for (_, file) in partition_files.values_mut() { - // Sort files by size: largest to smallest - file.sort_by(|a, b| b.size.cmp(&a.size)); + if perserve_insertion_order { + // sort files by modification date + file.sort_by(|a, b| b.last_modified.cmp(&a.last_modified)); + } else { + // Sort files by size: largest to smallest + file.sort_by(|a, b| b.size.cmp(&a.size)); + } } let mut operations: HashMap, Vec)> = HashMap::new(); diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 4826647750..64932f7072 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -289,6 +289,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), + false, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -351,6 +352,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), + false, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -410,6 +412,7 @@ async fn test_commit_interval() -> Result<(), Box> { &[], None, WriterProperties::builder().build(), + false, )?; let metrics = plan @@ -867,6 +870,61 @@ async fn test_zorder_respects_target_size() -> Result<(), Box> { Ok(()) } +#[tokio::test] +async fn test_preserve_insertion_order() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + // first file + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 1), (1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + + // later file + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 5), (2, 6), (2, 7), (2, 8)], "2022-05-22")?, + ) + .await?; + + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + + let optimize = DeltaOps(dt) + .optimize() + .with_target_size(2_000_000) + .with_filters(&filter) + .with_preserve_insertion_order(true); + let (dt, metrics) = optimize.await?; + + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + assert_eq!(metrics.total_files_skipped, 0); + assert_eq!(metrics.total_considered_files, 2); + + // Check data + let files = dt.get_files_iter()?.collect::>(); + assert_eq!(files.len(), 1); + + let actual = read_parquet_file(&files[0], dt.object_store()).await?; + let expected = RecordBatch::try_new( + actual.schema(), + // file created later is merged first + vec![ + Arc::new(Int32Array::from(vec![2, 2, 2, 2, 1, 1, 1, 1])), + Arc::new(Int32Array::from(vec![5, 6, 7, 8, 1, 2, 3, 4])), + ], + )?; + + assert_eq!(actual, expected); + + Ok(()) +} + async fn read_parquet_file( path: &Path, object_store: ObjectStoreRef,