From f4c63212a80f171895dd5db7dcc0eea84cc6a98f Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 19 Oct 2025 16:03:03 +0200 Subject: [PATCH 1/3] refactor: use single writer globally Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/write/execution.rs | 369 ++++++++---------- 1 file changed, 168 insertions(+), 201 deletions(-) diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 5cc9e15cc7..3e1c389991 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -3,11 +3,12 @@ use std::vec; use arrow::compute::concat_batches; use arrow::datatypes::Schema; +use arrow_array::RecordBatch; use datafusion::catalog::{Session, TableProvider}; use datafusion::datasource::{provider_as_source, MemTable}; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::{col, lit, when, Expr, LogicalPlanBuilder}; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{execute_stream_partitioned, ExecutionPlan}; use datafusion::prelude::DataFrame; use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; use futures::StreamExt; @@ -278,216 +279,182 @@ pub(crate) async fn write_execution_plan_v2( } // Write data to disk - let mut tasks = vec![]; - if !contains_cdc { - for i in 0..plan.properties().output_partitioning().partition_count() { - let inner_plan = plan.clone(); - let inner_schema = schema.clone(); - let config = WriterConfig::new( - inner_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - let mut writer = DeltaWriter::new(object_store.clone(), config); - let checker_stream = checker.clone(); - let scan_start = std::time::Instant::now(); - let mut stream = inner_plan.execute(i, session.task_ctx())?; - - let handle: tokio::task::JoinHandle< - DeltaResult<(Vec, WriteExecutionPlanMetrics)>, - > = tokio::task::spawn(async move { - let mut write_time_ms = 0; - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; - let write_start = std::time::Instant::now(); - checker_stream.check_batch(&batch).await?; - writer.write(&batch).await?; - write_time_ms += write_start.elapsed().as_millis() as u64; - } - let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; - - let write_start = std::time::Instant::now(); - let add_actions = writer.close().await; - write_time_ms += write_start.elapsed().as_millis() as u64; - let metrics = WriteExecutionPlanMetrics { - scan_time_ms, - write_time_ms, - }; - match add_actions { - Ok(actions) => Ok(( - actions.into_iter().map(Action::Add).collect::>(), - metrics, - )), - Err(err) => Err(err), - } - }); - tasks.push(handle); + + let (actions, metrics) = if !contains_cdc { + let config = WriterConfig::new( + schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + let mut writer = DeltaWriter::new(object_store.clone(), config); + let checker_stream = checker.clone(); + + let data = execute_stream_partitioned(plan, session.task_ctx())?; + + let write_start = std::time::Instant::now(); + let scan_start = std::time::Instant::now(); + + for mut partition_stream in data { + while let Some(batch) = partition_stream.next().await { + let batch = batch?; + checker_stream.check_batch(&batch).await?; + writer.write(&batch).await?; + } } + let write_time_ms = write_start.elapsed().as_millis() as u64; + let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; + + let metrics = WriteExecutionPlanMetrics { + scan_time_ms, + write_time_ms, + }; + ( + writer + .close() + .await? + .into_iter() + .map(|v| Action::Add(v)) + .collect::>(), + metrics, + ) } else { // Incoming plan contains the normal write_plan unioned with the cdf plan // we split these batches during the write let cdf_store = Arc::new(PrefixStore::new(object_store.clone(), "_change_data")); - for i in 0..plan.properties().output_partitioning().partition_count() { - let inner_plan = plan.clone(); - let write_schema = Arc::new(Schema::new( - schema - .clone() - .fields() - .into_iter() - .filter_map(|f| { - if f.name() != CDC_COLUMN_NAME { - Some(f.as_ref().clone()) - } else { - None - } - }) - .collect::>(), - )); - let cdf_schema = schema.clone(); - let normal_config = WriterConfig::new( - write_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - - let cdf_config = WriterConfig::new( - cdf_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - - let mut writer = DeltaWriter::new(object_store.clone(), normal_config); - - let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); - - let checker_stream = checker.clone(); - let scan_start = std::time::Instant::now(); - let mut stream = inner_plan.execute(i, session.task_ctx())?; - - let session_context = SessionContext::new(); - - let handle: tokio::task::JoinHandle< - DeltaResult<(Vec, WriteExecutionPlanMetrics)>, - > = tokio::task::spawn(async move { - let mut write_time_ms = 0; - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; - - let write_start = std::time::Instant::now(); - // split batch since we unioned upstream the operation write and cdf plan - let table_provider: Arc = Arc::new(MemTable::try_new( - batch.schema(), - vec![vec![batch.clone()]], - )?); - let batch_df = session_context.read_table(table_provider).unwrap(); - - let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( - vec![lit("delete"), lit("source_delete"), lit("update_preimage")], - true, - ))?; - - let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( - vec![ - lit("delete"), - lit("insert"), - lit("update_preimage"), - lit("update_postimage"), - ], - false, - ))?; - - // Concatenate with the CDF_schema, since we need to keep the _change_type col - let mut normal_batch = - concat_batches(&cdf_schema, &normal_df.collect().await?)?; - checker_stream.check_batch(&normal_batch).await?; - - // Drop the CDC_COLUMN ("_change_type") - let mut idx: Option = None; - for (i, field) in normal_batch.schema_ref().fields().iter().enumerate() { - if field.name() == CDC_COLUMN_NAME { - idx = Some(i); - break; - } - } - normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( - "idx of _change_type col not found. This shouldn't have happened.", - ))?); + let data = execute_stream_partitioned(plan, session.task_ctx())?; + + let write_schema = Arc::new(Schema::new( + schema + .clone() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() != CDC_COLUMN_NAME { + Some(f.as_ref().clone()) + } else { + None + } + }) + .collect::>(), + )); + let cdf_schema = schema.clone(); + let normal_config = WriterConfig::new( + write_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); - let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; - checker_stream.check_batch(&cdf_batch).await?; + let cdf_config = WriterConfig::new( + cdf_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); - writer.write(&normal_batch).await?; - cdf_writer.write(&cdf_batch).await?; - write_time_ms += write_start.elapsed().as_millis() as u64; + let mut writer = DeltaWriter::new(object_store.clone(), normal_config); + let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); + let checker_stream = checker.clone(); + let write_start = std::time::Instant::now(); + let scan_start = std::time::Instant::now(); + + for mut partition_stream in data { + while let Some(maybe_batch) = partition_stream.next().await { + let session_context = SessionContext::new(); + let batch = maybe_batch?; + + // split batch since we unioned upstream the operation write and cdf plan + let table_provider: Arc = Arc::new(MemTable::try_new( + batch.schema(), + vec![vec![batch.clone()]], + )?); + let batch_df = session_context.read_table(table_provider).unwrap(); + + let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( + vec![lit("delete"), lit("source_delete"), lit("update_preimage")], + true, + ))?; + + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( + vec![ + lit("delete"), + lit("insert"), + lit("update_preimage"), + lit("update_postimage"), + ], + false, + ))?; + + // Concatenate with the CDF_schema, since we need to keep the _change_type col + let mut normal_batch = concat_batches(&cdf_schema, &normal_df.collect().await?)?; + checker_stream.check_batch(&normal_batch).await?; + + // Drop the CDC_COLUMN ("_change_type") + let mut idx: Option = None; + for (i, field) in normal_batch.schema_ref().fields().iter().enumerate() { + if field.name() == CDC_COLUMN_NAME { + idx = Some(i); + break; + } } - let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; - - let write_start = std::time::Instant::now(); - let mut add_actions = writer - .close() - .await? - .into_iter() - .map(Action::Add) - .collect::>(); - let cdf_actions = cdf_writer.close().await.map(|v| { - v.into_iter() - .map(|add| { - { - Action::Cdc(AddCDCFile { - // This is a gnarly hack, but the action needs the nested path, not the - // path inside the prefixed store - path: format!("_change_data/{}", add.path), - size: add.size, - partition_values: add.partition_values, - data_change: false, - tags: add.tags, - }) - } - }) - .collect::>() - })?; - write_time_ms += write_start.elapsed().as_millis() as u64; - let metrics = WriteExecutionPlanMetrics { - scan_time_ms, - write_time_ms, - }; - - add_actions.extend(cdf_actions); - Ok((add_actions, metrics)) - }); - tasks.push(handle); + + normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( + "idx of _change_type col not found. This shouldn't have happened.", + ))?); + + let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; + checker_stream.check_batch(&cdf_batch).await?; + writer.write(&normal_batch).await?; + cdf_writer.write(&cdf_batch).await?; + } } - } - let (actions, metrics) = futures::future::join_all(tasks) - .await - .into_iter() - .collect::, _>>() - .map_err(|err| WriteError::WriteTask { source: err })? - .into_iter() - .collect::, _>>()? - .into_iter() - .fold( - (vec![], WriteExecutionPlanMetrics::default()), - |mut acc, (actions, metrics)| { - acc.0.extend(actions); - acc.1.scan_time_ms += metrics.scan_time_ms; - acc.1.write_time_ms += metrics.write_time_ms; - acc - }, - ); + let write_time_ms = write_start.elapsed().as_millis() as u64; + let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; + + let metrics = WriteExecutionPlanMetrics { + scan_time_ms, + write_time_ms, + }; + + let mut actions = writer + .close() + .await? + .into_iter() + .map(|v| Action::Add(v)) + .collect::>(); + let cdf_actions = cdf_writer.close().await.map(|v| { + v.into_iter() + .map(|add| { + { + Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path inside the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) + } + }) + .collect::>() + })?; + actions.extend(cdf_actions); + + (actions, metrics) + }; + // Collect add actions to add to commit Ok((actions, metrics)) } From ba896713895ca7e743aa3b668a69f34fa178224c Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 19 Oct 2025 16:54:51 +0200 Subject: [PATCH 2/3] refactor: use tokio sync channel to send batches through for the writer Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/write/execution.rs | 324 ++++++++++++------ 1 file changed, 215 insertions(+), 109 deletions(-) diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 3e1c389991..1e7030cc41 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::vec; use arrow::compute::concat_batches; @@ -7,6 +7,7 @@ use arrow_array::RecordBatch; use datafusion::catalog::{Session, TableProvider}; use datafusion::datasource::{provider_as_source, MemTable}; use datafusion::execution::context::SessionContext; +use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::{col, lit, when, Expr, LogicalPlanBuilder}; use datafusion::physical_plan::{execute_stream_partitioned, ExecutionPlan}; use datafusion::prelude::DataFrame; @@ -14,6 +15,7 @@ use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; use futures::StreamExt; use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; +use tokio::sync::mpsc; use tracing::log::*; use uuid::Uuid; @@ -27,11 +29,24 @@ use crate::errors::DeltaResult; use crate::kernel::{Action, Add, AddCDCFile, EagerSnapshot, Remove, StructType, StructTypeExt}; use crate::logstore::{LogStoreRef, ObjectStoreRef}; use crate::operations::cdc::{should_write_cdc, CDC_COLUMN_NAME}; -use crate::operations::write::{WriteError, WriterStatsConfig}; +use crate::operations::write::WriterStatsConfig; use crate::table::config::TablePropertiesExt as _; use crate::table::Constraint as DeltaConstraint; use crate::DeltaTableError; +const DEFAULT_WRITER_BATCH_CHANNEL_SIZE: usize = 10; + +fn channel_size() -> usize { + static CHANNEL_SIZE: OnceLock = OnceLock::new(); + *CHANNEL_SIZE.get_or_init(|| { + std::env::var("DELTARS_WRITER_BATCH_CHANNEL_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(DEFAULT_WRITER_BATCH_CHANNEL_SIZE) + }) +} + +/// Metrics captured from execution #[derive(Debug, Default)] pub(crate) struct WriteExecutionPlanMetrics { pub scan_time_ms: u64, @@ -279,8 +294,8 @@ pub(crate) async fn write_execution_plan_v2( } // Write data to disk - - let (actions, metrics) = if !contains_cdc { + // We drive partition streams concurrently and centralize writes via an mpsc channel. + if !contains_cdc { let config = WriterConfig::new( schema.clone(), partition_columns.clone(), @@ -290,44 +305,75 @@ pub(crate) async fn write_execution_plan_v2( writer_stats_config.num_indexed_cols, writer_stats_config.stats_columns.clone(), ); - let mut writer = DeltaWriter::new(object_store.clone(), config); + let checker_stream = checker.clone(); - let data = execute_stream_partitioned(plan, session.task_ctx())?; + let partition_streams: Vec = + execute_stream_partitioned(plan, session.task_ctx())?; - let write_start = std::time::Instant::now(); - let scan_start = std::time::Instant::now(); + // sync channel for batches produced by partition stream + let (tx, mut rx) = mpsc::channel::(channel_size()); - for mut partition_stream in data { - while let Some(batch) = partition_stream.next().await { - let batch = batch?; - checker_stream.check_batch(&batch).await?; + let writer_handle = tokio::task::spawn(async move { + let mut writer = DeltaWriter::new(object_store.clone(), config); + let mut total_write_ms: u64 = 0; + while let Some(batch) = rx.recv().await { + let wstart = std::time::Instant::now(); writer.write(&batch).await?; + total_write_ms += wstart.elapsed().as_millis() as u64; } + let adds = writer.close().await?; + Ok::<(Vec, u64), DeltaTableError>((adds, total_write_ms)) + }); + + // spawn one worker per partition stream to drive DataFusion concurrently + let mut worker_handles = Vec::new(); + let scan_start = std::time::Instant::now(); + for mut partition_stream in partition_streams { + let tx_clone = tx.clone(); + let checker_clone = checker_stream.clone(); + let handle = tokio::task::spawn(async move { + while let Some(maybe_batch) = partition_stream.next().await { + let batch = maybe_batch?; + checker_clone.check_batch(&batch).await?; + tx_clone.send(batch).await.map_err(|_| { + DeltaTableError::Generic("Writer task closed unexpectedly".to_string()) + })?; + } + Ok::<(), DeltaTableError>(()) + }); + worker_handles.push(handle); + } + + drop(tx); + + let join_res = writer_handle + .await + .map_err(|e| DeltaTableError::Generic(format!("writer join error: {}", e)))?; + let (adds, write_time_ms) = join_res?; + + for h in worker_handles { + let join_res = h.await.map_err(|e| { + DeltaTableError::Generic(format!("worker join error when driving partition: {}", e)) + })?; + join_res?; } - let write_time_ms = write_start.elapsed().as_millis() as u64; - let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; + + let write_elapsed = write_time_ms; + let scan_time_ms = scan_start.elapsed().as_millis() as u64; + let scan_time_ms = scan_time_ms.saturating_sub(write_elapsed); let metrics = WriteExecutionPlanMetrics { scan_time_ms, - write_time_ms, + write_time_ms: write_elapsed, }; - ( - writer - .close() - .await? - .into_iter() - .map(|v| Action::Add(v)) - .collect::>(), - metrics, - ) + + let actions = adds.into_iter().map(Action::Add).collect::>(); + return Ok((actions, metrics)); } else { - // Incoming plan contains the normal write_plan unioned with the cdf plan - // we split these batches during the write + // CDC branch: create two writer tasks (normal + cdf) and drive partition streams concurrently let cdf_store = Arc::new(PrefixStore::new(object_store.clone(), "_change_data")); - let data = execute_stream_partitioned(plan, session.task_ctx())?; - let write_schema = Arc::new(Schema::new( schema .clone() @@ -343,6 +389,7 @@ pub(crate) async fn write_execution_plan_v2( .collect::>(), )); let cdf_schema = schema.clone(); + let normal_config = WriterConfig::new( write_schema.clone(), partition_columns.clone(), @@ -363,98 +410,157 @@ pub(crate) async fn write_execution_plan_v2( writer_stats_config.stats_columns.clone(), ); - let mut writer = DeltaWriter::new(object_store.clone(), normal_config); - let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); let checker_stream = checker.clone(); - let write_start = std::time::Instant::now(); - let scan_start = std::time::Instant::now(); - for mut partition_stream in data { - while let Some(maybe_batch) = partition_stream.next().await { - let session_context = SessionContext::new(); - let batch = maybe_batch?; - - // split batch since we unioned upstream the operation write and cdf plan - let table_provider: Arc = Arc::new(MemTable::try_new( - batch.schema(), - vec![vec![batch.clone()]], - )?); - let batch_df = session_context.read_table(table_provider).unwrap(); - - let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( - vec![lit("delete"), lit("source_delete"), lit("update_preimage")], - true, - ))?; - - let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( - vec![ - lit("delete"), - lit("insert"), - lit("update_preimage"), - lit("update_postimage"), - ], - false, - ))?; - - // Concatenate with the CDF_schema, since we need to keep the _change_type col - let mut normal_batch = concat_batches(&cdf_schema, &normal_df.collect().await?)?; - checker_stream.check_batch(&normal_batch).await?; - - // Drop the CDC_COLUMN ("_change_type") - let mut idx: Option = None; - for (i, field) in normal_batch.schema_ref().fields().iter().enumerate() { - if field.name() == CDC_COLUMN_NAME { - idx = Some(i); - break; + // partition streams + let partition_streams = execute_stream_partitioned(plan, session.task_ctx())?; + + // sync channel for batches produced by partition stream for normal and cdf batches + let (tx_normal, mut rx_normal) = mpsc::channel::(channel_size()); + let (tx_cdf, mut rx_cdf) = mpsc::channel::(channel_size()); + + let normal_writer_handle = tokio::task::spawn(async move { + let mut writer = DeltaWriter::new(object_store, normal_config); + let mut total_write_ms: u64 = 0; + while let Some(batch) = rx_normal.recv().await { + let wstart = std::time::Instant::now(); + writer.write(&batch).await?; + total_write_ms += wstart.elapsed().as_millis() as u64; + } + let adds = writer.close().await?; + Ok::<(Vec, u64), DeltaTableError>((adds, total_write_ms)) + }); + + let cdf_writer_handle = tokio::task::spawn(async move { + let mut writer = DeltaWriter::new(cdf_store, cdf_config); + let mut total_write_ms: u64 = 0; + while let Some(batch) = rx_cdf.recv().await { + let wstart = std::time::Instant::now(); + writer.write(&batch).await?; + total_write_ms += wstart.elapsed().as_millis() as u64; + } + let adds = writer.close().await?; + Ok::<(Vec, u64), DeltaTableError>((adds, total_write_ms)) + }); + + // spawn partition workers that split batches and send to appropriate writer channel + let mut worker_handles = Vec::new(); + let scan_start = std::time::Instant::now(); + for mut partition_stream in partition_streams { + let txn = tx_normal.clone(); + let txc = tx_cdf.clone(); + let checker_clone = checker_stream.clone(); + let session_ctx = SessionContext::new(); + let cdf_schema_clone = cdf_schema.clone(); + + let h = tokio::task::spawn(async move { + while let Some(maybe_batch) = partition_stream.next().await { + let batch = maybe_batch?; + + // split batch since upstream unioned write and cdf plans + let table_provider: Arc = Arc::new(MemTable::try_new( + batch.schema(), + vec![vec![batch.clone()]], + )?); + let batch_df = session_ctx.read_table(table_provider).unwrap(); + + let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( + vec![lit("delete"), lit("source_delete"), lit("update_preimage")], + true, + ))?; + + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( + vec![ + lit("delete"), + lit("insert"), + lit("update_preimage"), + lit("update_postimage"), + ], + false, + ))?; + + // Concatenate with the CDF_schema, since we need to keep the _change_type col + let mut normal_batch = + concat_batches(&cdf_schema_clone, &normal_df.collect().await?)?; + checker_clone.check_batch(&normal_batch).await?; + + // Drop the CDC_COLUMN ("_change_type") + let mut idx: Option = None; + for (i_field, field) in normal_batch.schema_ref().fields().iter().enumerate() { + if field.name() == CDC_COLUMN_NAME { + idx = Some(i_field); + break; + } } + normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( + "idx of _change_type col not found. This shouldn't have happened.", + ))?); + + let cdf_batch = concat_batches(&cdf_schema_clone, &cdf_df.collect().await?)?; + checker_clone.check_batch(&cdf_batch).await?; + + // send to writers channels + txn.send(normal_batch).await.map_err(|_| { + DeltaTableError::Generic("normal writer closed unexpectedly".to_string()) + })?; + txc.send(cdf_batch).await.map_err(|_| { + DeltaTableError::Generic("cdf writer closed unexpectedly".to_string()) + })?; } + Ok::<(), DeltaTableError>(()) + }); - normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( - "idx of _change_type col not found. This shouldn't have happened.", - ))?); + worker_handles.push(h); + } - let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; - checker_stream.check_batch(&cdf_batch).await?; - writer.write(&normal_batch).await?; - cdf_writer.write(&cdf_batch).await?; - } + // drop original senders so writer tasks exit after all workers finish + drop(tx_normal); + drop(tx_cdf); + + // wait for writer tasks to finish + let normal_join = normal_writer_handle + .await + .map_err(|e| DeltaTableError::Generic(format!("normal writer join error: {}", e)))?; + let (normal_adds, normal_write_ms) = normal_join?; + + let cdf_join = cdf_writer_handle + .await + .map_err(|e| DeltaTableError::Generic(format!("cdf writer join error: {}", e)))?; + let (cdf_adds, cdf_write_ms) = cdf_join?; + + // check if workers that collect stream didnt fail + for h in worker_handles { + let join_res = h.await.map_err(|e| { + DeltaTableError::Generic(format!("worker join error when driving partition: {}", e)) + })?; + join_res?; } - let write_time_ms = write_start.elapsed().as_millis() as u64; - let scan_time_ms = scan_start.elapsed().as_millis() as u64 - write_time_ms; + + let mut actions = normal_adds.into_iter().map(Action::Add).collect::>(); + let mut cdf_actions = cdf_adds + .into_iter() + .map(|add| { + Action::Cdc(AddCDCFile { + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) + }) + .collect::>(); + + actions.append(&mut cdf_actions); + + let write_time_ms = normal_write_ms + cdf_write_ms; + let scan_elapsed = scan_start.elapsed().as_millis() as u64; + let scan_time_ms = scan_elapsed.saturating_sub(write_time_ms); let metrics = WriteExecutionPlanMetrics { scan_time_ms, write_time_ms, }; - let mut actions = writer - .close() - .await? - .into_iter() - .map(|v| Action::Add(v)) - .collect::>(); - let cdf_actions = cdf_writer.close().await.map(|v| { - v.into_iter() - .map(|add| { - { - Action::Cdc(AddCDCFile { - // This is a gnarly hack, but the action needs the nested path, not the - // path inside the prefixed store - path: format!("_change_data/{}", add.path), - size: add.size, - partition_values: add.partition_values, - data_change: false, - tags: add.tags, - }) - } - }) - .collect::>() - })?; - actions.extend(cdf_actions); - - (actions, metrics) - }; - - // Collect add actions to add to commit - Ok((actions, metrics)) + return Ok((actions, metrics)); + } } From 03d860d7274d5131ed2b5b4fd2c2f614492e8745 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 19 Oct 2025 17:52:26 +0200 Subject: [PATCH 3/3] chore: bump version Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- python/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 10df66b266..f392e1240c 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "1.2.0" +version = "1.2.1" authors = [ "Qingping Hou ", "Will Jones ",