From fc9ac03797a9d60ca2bb020d3a63eef58829ec60 Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 6 Apr 2025 20:17:11 -0400 Subject: [PATCH 01/12] added gc writer check Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 42 ++++++++++------ .../src/operations/write/generated_columns.rs | 16 +++++++ crates/core/src/operations/write/mod.rs | 48 ++++++++++++------- 3 files changed, 74 insertions(+), 32 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index c03ea4f95c..349c9d8626 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -84,7 +84,7 @@ use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::write::execution::write_execution_plan_v2; use crate::operations::write::generated_columns::{ - add_generated_columns, add_missing_generated_columns, + add_generated_columns, add_missing_generated_columns, should_gc, }; use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; @@ -779,17 +779,27 @@ async fn execute( None => TableReference::bare(UNNAMED_TABLE), }; - let generated_col_expressions = snapshot - .schema() - .get_generated_columns() - .unwrap_or_default(); + let mut generated_col_exp = None; + let mut missing_generated_col = None; + let mut source_with_gc = None; - let (source, missing_generated_columns) = - add_missing_generated_columns(source, &generated_col_expressions)?; + if should_gc(&snapshot)? { + let generated_col_expressions = snapshot + .schema() + .get_generated_columns() + .unwrap_or_default(); + + let (source, missing_generated_columns) = + add_missing_generated_columns(source.clone(), &generated_col_expressions)?; + + source_with_gc = Some(source); + generated_col_exp = Some(generated_col_expressions); + missing_generated_col = Some(missing_generated_columns); + } // This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work let source = LogicalPlanBuilder::scan( source_name.clone(), - provider_as_source(source.into_view()), + provider_as_source(source_with_gc.unwrap_or(source).into_view()), None, )? .build()?; @@ -1345,12 +1355,16 @@ async fn execute( .select(write_projection)? }; - projected = add_generated_columns( - projected, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; + if let Some(generated_col_expressions) = generated_col_exp { + if let Some(missing_generated_columns) = missing_generated_col { + projected = add_generated_columns( + projected, + &generated_col_expressions, + &missing_generated_columns, + &state, + )?; + } + } let merge_final = &projected.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?; diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index 582571afaa..f8da8e5fe5 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -1,3 +1,4 @@ +use crate::table::state::DeltaTableState; use datafusion::{execution::SessionState, prelude::DataFrame}; use datafusion_common::ScalarValue; use datafusion_expr::{col, when, Expr, ExprSchemable}; @@ -5,6 +6,21 @@ use tracing::debug; use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult}; +/// check whether not the g +pub fn should_gc(snapshot: &DeltaTableState) -> DeltaResult { + if let Some(features) = &snapshot.protocol().writer_features { + if snapshot.protocol().min_writer_version < 4 { + return Ok(false); + } + if snapshot.protocol().min_writer_version == 7 + && !features.contains(&delta_kernel::table_features::WriterFeatures::GeneratedColumns) + { + return Ok(false); + } + } + Ok(true) +} + /// Add generated column expressions to a dataframe pub fn add_missing_generated_columns( mut df: DataFrame, diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index c0efefa023..fb9280c233 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -34,7 +34,7 @@ pub mod writer; use arrow_schema::Schema; pub use configs::WriterStatsConfig; use datafusion::execution::SessionStateBuilder; -use generated_columns::{add_generated_columns, add_missing_generated_columns}; +use generated_columns::{add_generated_columns, add_missing_generated_columns, should_gc}; use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; @@ -445,18 +445,26 @@ impl std::future::IntoFuture for WriteBuilder { state } }; - let generated_col_expressions = this - .snapshot - .as_ref() - .map(|v| v.schema().get_generated_columns().unwrap_or_default()) - .unwrap_or_default(); - let mut schema_drift = false; - let source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); - - // Add missing generated columns to source_df - let (mut source, missing_generated_columns) = - add_missing_generated_columns(source, &generated_col_expressions)?; + let mut generated_col_exp = None; + let mut missing_gen_col = None; + let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); + if let Some(snapshot) = &this.snapshot { + if should_gc(snapshot)? { + let generated_col_expressions = this + .snapshot + .as_ref() + .map(|v| v.schema().get_generated_columns().unwrap_or_default()) + .unwrap_or_default(); + + // Add missing generated columns to source_df + let (new_source, missing_generated_columns) = + add_missing_generated_columns(source, &generated_col_expressions)?; + source = new_source; + missing_gen_col = Some(missing_generated_columns); + generated_col_exp = Some(generated_col_expressions); + } + } let source_schema: Arc = Arc::new(source.schema().as_arrow().clone()); @@ -527,12 +535,16 @@ impl std::future::IntoFuture for WriteBuilder { source = source.select(schema_evolution_projection)?; } - source = add_generated_columns( - source, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; + if let Some(generated_columns_exp) = generated_col_exp { + if let Some(missing_generated_col) = missing_gen_col { + source = add_generated_columns( + source, + &generated_columns_exp, + &missing_generated_col, + &state, + )?; + } + } let source = LogicalPlan::Extension(Extension { node: Arc::new(MetricObserver { From b82914fd82e11eb7b40e29424d7bbdd77a5b4193 Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 13 Apr 2025 20:22:11 -0400 Subject: [PATCH 02/12] change the docstring Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 4 ++-- crates/core/src/operations/write/generated_columns.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 349c9d8626..b5b0b0e160 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -84,7 +84,7 @@ use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::write::execution::write_execution_plan_v2; use crate::operations::write::generated_columns::{ - add_generated_columns, add_missing_generated_columns, should_gc, + able_to_gc, add_generated_columns, add_missing_generated_columns, }; use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; @@ -783,7 +783,7 @@ async fn execute( let mut missing_generated_col = None; let mut source_with_gc = None; - if should_gc(&snapshot)? { + if able_to_gc(&snapshot)? { let generated_col_expressions = snapshot .schema() .get_generated_columns() diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index f8da8e5fe5..252cfdc243 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -6,8 +6,8 @@ use tracing::debug; use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult}; -/// check whether not the g -pub fn should_gc(snapshot: &DeltaTableState) -> DeltaResult { +/// check if the writer version is able to write generated columns +pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult { if let Some(features) = &snapshot.protocol().writer_features { if snapshot.protocol().min_writer_version < 4 { return Ok(false); From 1875637b5e5de7769712c6281d97945c7fcd79d9 Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 13 Apr 2025 20:34:10 -0400 Subject: [PATCH 03/12] change the function name Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/write/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index fb9280c233..c00336620d 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -34,7 +34,7 @@ pub mod writer; use arrow_schema::Schema; pub use configs::WriterStatsConfig; use datafusion::execution::SessionStateBuilder; -use generated_columns::{add_generated_columns, add_missing_generated_columns, should_gc}; +use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns}; use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; @@ -450,7 +450,7 @@ impl std::future::IntoFuture for WriteBuilder { let mut missing_gen_col = None; let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); if let Some(snapshot) = &this.snapshot { - if should_gc(snapshot)? { + if able_to_gc(snapshot)? { let generated_col_expressions = this .snapshot .as_ref() From 43251fefe7bb07d9fe2d829ba836b760081cd6eb Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Tue, 6 May 2025 21:15:08 -0400 Subject: [PATCH 04/12] change to writerfeature Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/write/generated_columns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index 252cfdc243..f4d55da690 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -13,7 +13,7 @@ pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult { return Ok(false); } if snapshot.protocol().min_writer_version == 7 - && !features.contains(&delta_kernel::table_features::WriterFeatures::GeneratedColumns) + && !features.contains(&delta_kernel::table_features::WriterFeature::GeneratedColumns) { return Ok(false); } From ecaa3371f0f1f389f484bd09cd066de9d7cd3fb8 Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 6 Apr 2025 20:17:11 -0400 Subject: [PATCH 05/12] added gc writer check Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 42 ++++++++++------ .../src/operations/write/generated_columns.rs | 16 +++++++ crates/core/src/operations/write/mod.rs | 48 ++++++++++++------- 3 files changed, 74 insertions(+), 32 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index c03ea4f95c..349c9d8626 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -84,7 +84,7 @@ use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::write::execution::write_execution_plan_v2; use crate::operations::write::generated_columns::{ - add_generated_columns, add_missing_generated_columns, + add_generated_columns, add_missing_generated_columns, should_gc, }; use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; @@ -779,17 +779,27 @@ async fn execute( None => TableReference::bare(UNNAMED_TABLE), }; - let generated_col_expressions = snapshot - .schema() - .get_generated_columns() - .unwrap_or_default(); + let mut generated_col_exp = None; + let mut missing_generated_col = None; + let mut source_with_gc = None; - let (source, missing_generated_columns) = - add_missing_generated_columns(source, &generated_col_expressions)?; + if should_gc(&snapshot)? { + let generated_col_expressions = snapshot + .schema() + .get_generated_columns() + .unwrap_or_default(); + + let (source, missing_generated_columns) = + add_missing_generated_columns(source.clone(), &generated_col_expressions)?; + + source_with_gc = Some(source); + generated_col_exp = Some(generated_col_expressions); + missing_generated_col = Some(missing_generated_columns); + } // This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work let source = LogicalPlanBuilder::scan( source_name.clone(), - provider_as_source(source.into_view()), + provider_as_source(source_with_gc.unwrap_or(source).into_view()), None, )? .build()?; @@ -1345,12 +1355,16 @@ async fn execute( .select(write_projection)? }; - projected = add_generated_columns( - projected, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; + if let Some(generated_col_expressions) = generated_col_exp { + if let Some(missing_generated_columns) = missing_generated_col { + projected = add_generated_columns( + projected, + &generated_col_expressions, + &missing_generated_columns, + &state, + )?; + } + } let merge_final = &projected.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?; diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index 582571afaa..f8da8e5fe5 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -1,3 +1,4 @@ +use crate::table::state::DeltaTableState; use datafusion::{execution::SessionState, prelude::DataFrame}; use datafusion_common::ScalarValue; use datafusion_expr::{col, when, Expr, ExprSchemable}; @@ -5,6 +6,21 @@ use tracing::debug; use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult}; +/// check whether not the g +pub fn should_gc(snapshot: &DeltaTableState) -> DeltaResult { + if let Some(features) = &snapshot.protocol().writer_features { + if snapshot.protocol().min_writer_version < 4 { + return Ok(false); + } + if snapshot.protocol().min_writer_version == 7 + && !features.contains(&delta_kernel::table_features::WriterFeatures::GeneratedColumns) + { + return Ok(false); + } + } + Ok(true) +} + /// Add generated column expressions to a dataframe pub fn add_missing_generated_columns( mut df: DataFrame, diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index c0efefa023..fb9280c233 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -34,7 +34,7 @@ pub mod writer; use arrow_schema::Schema; pub use configs::WriterStatsConfig; use datafusion::execution::SessionStateBuilder; -use generated_columns::{add_generated_columns, add_missing_generated_columns}; +use generated_columns::{add_generated_columns, add_missing_generated_columns, should_gc}; use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; @@ -445,18 +445,26 @@ impl std::future::IntoFuture for WriteBuilder { state } }; - let generated_col_expressions = this - .snapshot - .as_ref() - .map(|v| v.schema().get_generated_columns().unwrap_or_default()) - .unwrap_or_default(); - let mut schema_drift = false; - let source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); - - // Add missing generated columns to source_df - let (mut source, missing_generated_columns) = - add_missing_generated_columns(source, &generated_col_expressions)?; + let mut generated_col_exp = None; + let mut missing_gen_col = None; + let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); + if let Some(snapshot) = &this.snapshot { + if should_gc(snapshot)? { + let generated_col_expressions = this + .snapshot + .as_ref() + .map(|v| v.schema().get_generated_columns().unwrap_or_default()) + .unwrap_or_default(); + + // Add missing generated columns to source_df + let (new_source, missing_generated_columns) = + add_missing_generated_columns(source, &generated_col_expressions)?; + source = new_source; + missing_gen_col = Some(missing_generated_columns); + generated_col_exp = Some(generated_col_expressions); + } + } let source_schema: Arc = Arc::new(source.schema().as_arrow().clone()); @@ -527,12 +535,16 @@ impl std::future::IntoFuture for WriteBuilder { source = source.select(schema_evolution_projection)?; } - source = add_generated_columns( - source, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; + if let Some(generated_columns_exp) = generated_col_exp { + if let Some(missing_generated_col) = missing_gen_col { + source = add_generated_columns( + source, + &generated_columns_exp, + &missing_generated_col, + &state, + )?; + } + } let source = LogicalPlan::Extension(Extension { node: Arc::new(MetricObserver { From fc2ce51b49b9c2703d78fc7d70d6ce166a2c127e Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 13 Apr 2025 20:22:11 -0400 Subject: [PATCH 06/12] change the docstring Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 4 ++-- crates/core/src/operations/write/generated_columns.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 349c9d8626..b5b0b0e160 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -84,7 +84,7 @@ use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::write::execution::write_execution_plan_v2; use crate::operations::write::generated_columns::{ - add_generated_columns, add_missing_generated_columns, should_gc, + able_to_gc, add_generated_columns, add_missing_generated_columns, }; use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; @@ -783,7 +783,7 @@ async fn execute( let mut missing_generated_col = None; let mut source_with_gc = None; - if should_gc(&snapshot)? { + if able_to_gc(&snapshot)? { let generated_col_expressions = snapshot .schema() .get_generated_columns() diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index f8da8e5fe5..252cfdc243 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -6,8 +6,8 @@ use tracing::debug; use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult}; -/// check whether not the g -pub fn should_gc(snapshot: &DeltaTableState) -> DeltaResult { +/// check if the writer version is able to write generated columns +pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult { if let Some(features) = &snapshot.protocol().writer_features { if snapshot.protocol().min_writer_version < 4 { return Ok(false); From a789d08c275d2d83eed6d78445496577178fc35a Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Sun, 13 Apr 2025 20:34:10 -0400 Subject: [PATCH 07/12] change the function name Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/write/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index fb9280c233..c00336620d 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -34,7 +34,7 @@ pub mod writer; use arrow_schema::Schema; pub use configs::WriterStatsConfig; use datafusion::execution::SessionStateBuilder; -use generated_columns::{add_generated_columns, add_missing_generated_columns, should_gc}; +use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns}; use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; @@ -450,7 +450,7 @@ impl std::future::IntoFuture for WriteBuilder { let mut missing_gen_col = None; let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); if let Some(snapshot) = &this.snapshot { - if should_gc(snapshot)? { + if able_to_gc(snapshot)? { let generated_col_expressions = this .snapshot .as_ref() From 0295d01a47e4bc160e92f66f6359d91dda71bd3d Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Tue, 6 May 2025 21:15:08 -0400 Subject: [PATCH 08/12] change to writerfeature Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/write/generated_columns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index 252cfdc243..f4d55da690 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -13,7 +13,7 @@ pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult { return Ok(false); } if snapshot.protocol().min_writer_version == 7 - && !features.contains(&delta_kernel::table_features::WriterFeatures::GeneratedColumns) + && !features.contains(&delta_kernel::table_features::WriterFeature::GeneratedColumns) { return Ok(false); } From 3542cafa10cb4ac20f85bcd2f1d8abc3b062ab73 Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Thu, 8 May 2025 21:23:34 -0400 Subject: [PATCH 09/12] small changes Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 16 ++++++---------- crates/core/src/operations/write/mod.rs | 11 +++-------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b5b0b0e160..d2f0146d07 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -781,25 +781,21 @@ async fn execute( let mut generated_col_exp = None; let mut missing_generated_col = None; - let mut source_with_gc = None; + let mut source = source; if able_to_gc(&snapshot)? { - let generated_col_expressions = snapshot - .schema() - .get_generated_columns() - .unwrap_or_default(); - - let (source, missing_generated_columns) = - add_missing_generated_columns(source.clone(), &generated_col_expressions)?; + let generated_col_expressions = snapshot.schema().get_generated_columns()?; + let (source_with_gc, missing_generated_columns) = + add_missing_generated_columns(source, &generated_col_expressions)?; - source_with_gc = Some(source); + source = source_with_gc; generated_col_exp = Some(generated_col_expressions); missing_generated_col = Some(missing_generated_columns); } // This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work let source = LogicalPlanBuilder::scan( source_name.clone(), - provider_as_source(source_with_gc.unwrap_or(source).into_view()), + provider_as_source(source.into_view()), None, )? .build()?; diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index c00336620d..d01f15647e 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -451,16 +451,11 @@ impl std::future::IntoFuture for WriteBuilder { let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); if let Some(snapshot) = &this.snapshot { if able_to_gc(snapshot)? { - let generated_col_expressions = this - .snapshot - .as_ref() - .map(|v| v.schema().get_generated_columns().unwrap_or_default()) - .unwrap_or_default(); - + let generated_col_expressions = snapshot.schema().get_generated_columns()?; // Add missing generated columns to source_df - let (new_source, missing_generated_columns) = + let (source_with_gc, missing_generated_columns) = add_missing_generated_columns(source, &generated_col_expressions)?; - source = new_source; + source = source_with_gc; missing_gen_col = Some(missing_generated_columns); generated_col_exp = Some(generated_col_expressions); } From bd4eaeb968e2ba1388412f887b5d0a7b89dfd93b Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Thu, 8 May 2025 21:36:40 -0400 Subject: [PATCH 10/12] cleaning Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b5b0b0e160..8df12c0ea6 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -784,11 +784,7 @@ async fn execute( let mut source_with_gc = None; if able_to_gc(&snapshot)? { - let generated_col_expressions = snapshot - .schema() - .get_generated_columns() - .unwrap_or_default(); - + let generated_col_expressions = snapshot.schema().get_generated_columns()?; let (source, missing_generated_columns) = add_missing_generated_columns(source.clone(), &generated_col_expressions)?; From 76ffc94d459896b069703c41ab2c266ff247daea Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Thu, 8 May 2025 21:44:35 -0400 Subject: [PATCH 11/12] change to use mut source instead Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 8df12c0ea6..d2f0146d07 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -781,21 +781,21 @@ async fn execute( let mut generated_col_exp = None; let mut missing_generated_col = None; - let mut source_with_gc = None; + let mut source = source; if able_to_gc(&snapshot)? { let generated_col_expressions = snapshot.schema().get_generated_columns()?; - let (source, missing_generated_columns) = - add_missing_generated_columns(source.clone(), &generated_col_expressions)?; + let (source_with_gc, missing_generated_columns) = + add_missing_generated_columns(source, &generated_col_expressions)?; - source_with_gc = Some(source); + source = source_with_gc; generated_col_exp = Some(generated_col_expressions); missing_generated_col = Some(missing_generated_columns); } // This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work let source = LogicalPlanBuilder::scan( source_name.clone(), - provider_as_source(source_with_gc.unwrap_or(source).into_view()), + provider_as_source(source.into_view()), None, )? .build()?; From 886eeb03f8737686ef2eba9b33ca6c2e436770ca Mon Sep 17 00:00:00 2001 From: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> Date: Thu, 8 May 2025 21:46:22 -0400 Subject: [PATCH 12/12] refactoring Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index d2f0146d07..3a1bec4bbc 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -722,7 +722,7 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { #[allow(clippy::too_many_arguments)] async fn execute( predicate: Expression, - source: DataFrame, + mut source: DataFrame, log_store: LogStoreRef, snapshot: DeltaTableState, _state: SessionState, @@ -781,7 +781,6 @@ async fn execute( let mut generated_col_exp = None; let mut missing_generated_col = None; - let mut source = source; if able_to_gc(&snapshot)? { let generated_col_expressions = snapshot.schema().get_generated_columns()?;