Skip to content

feat: added a check for gc code to run #3419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
37 changes: 23 additions & 14 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use crate::operations::cdc::*;
use crate::operations::merge::barrier::find_node;
use crate::operations::write::execution::write_execution_plan_v2;
use crate::operations::write::generated_columns::{
add_generated_columns, add_missing_generated_columns,
able_to_gc, add_generated_columns, add_missing_generated_columns,
};
use crate::operations::write::WriterStatsConfig;
use crate::protocol::{DeltaOperation, MergePredicate};
Expand Down Expand Up @@ -722,7 +722,7 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {
#[allow(clippy::too_many_arguments)]
async fn execute(
predicate: Expression,
source: DataFrame,
mut source: DataFrame,
log_store: LogStoreRef,
snapshot: DeltaTableState,
_state: SessionState,
Expand Down Expand Up @@ -779,13 +779,18 @@ async fn execute(
None => TableReference::bare(UNNAMED_TABLE),
};

let generated_col_expressions = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();
let mut generated_col_exp = None;
let mut missing_generated_col = None;

if able_to_gc(&snapshot)? {
let generated_col_expressions = snapshot.schema().get_generated_columns()?;
let (source_with_gc, missing_generated_columns) =
add_missing_generated_columns(source, &generated_col_expressions)?;

let (source, missing_generated_columns) =
add_missing_generated_columns(source, &generated_col_expressions)?;
source = source_with_gc;
generated_col_exp = Some(generated_col_expressions);
missing_generated_col = Some(missing_generated_columns);
}
// This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work
let source = LogicalPlanBuilder::scan(
source_name.clone(),
Expand Down Expand Up @@ -1345,12 +1350,16 @@ async fn execute(
.select(write_projection)?
};

projected = add_generated_columns(
projected,
&generated_col_expressions,
&missing_generated_columns,
&state,
)?;
if let Some(generated_col_expressions) = generated_col_exp {
if let Some(missing_generated_columns) = missing_generated_col {
projected = add_generated_columns(
projected,
&generated_col_expressions,
&missing_generated_columns,
&state,
)?;
}
}

let merge_final = &projected.into_unoptimized_plan();
let write = state.create_physical_plan(merge_final).await?;
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/operations/write/generated_columns.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use crate::table::state::DeltaTableState;
use datafusion::{execution::SessionState, prelude::DataFrame};
use datafusion_common::ScalarValue;
use datafusion_expr::{col, when, Expr, ExprSchemable};
use tracing::debug;

use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult};

/// check if the writer version is able to write generated columns
pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult<bool> {
if let Some(features) = &snapshot.protocol().writer_features {
if snapshot.protocol().min_writer_version < 4 {
return Ok(false);
}
if snapshot.protocol().min_writer_version == 7
&& !features.contains(&delta_kernel::table_features::WriterFeature::GeneratedColumns)
{
return Ok(false);
}
}
Ok(true)
}

/// Add generated column expressions to a dataframe
pub fn add_missing_generated_columns(
mut df: DataFrame,
Expand Down
43 changes: 25 additions & 18 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod writer;
use arrow_schema::Schema;
pub use configs::WriterStatsConfig;
use datafusion::execution::SessionStateBuilder;
use generated_columns::{add_generated_columns, add_missing_generated_columns};
use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns};
use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -445,18 +445,21 @@ impl std::future::IntoFuture for WriteBuilder {
state
}
};
let generated_col_expressions = this
.snapshot
.as_ref()
.map(|v| v.schema().get_generated_columns().unwrap_or_default())
.unwrap_or_default();

let mut schema_drift = false;
let source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());

// Add missing generated columns to source_df
let (mut source, missing_generated_columns) =
add_missing_generated_columns(source, &generated_col_expressions)?;
let mut generated_col_exp = None;
let mut missing_gen_col = None;
let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
if let Some(snapshot) = &this.snapshot {
if able_to_gc(snapshot)? {
let generated_col_expressions = snapshot.schema().get_generated_columns()?;
// Add missing generated columns to source_df
let (source_with_gc, missing_generated_columns) =
add_missing_generated_columns(source, &generated_col_expressions)?;
source = source_with_gc;
missing_gen_col = Some(missing_generated_columns);
generated_col_exp = Some(generated_col_expressions);
}
}

let source_schema: Arc<Schema> = Arc::new(source.schema().as_arrow().clone());

Expand Down Expand Up @@ -527,12 +530,16 @@ impl std::future::IntoFuture for WriteBuilder {
source = source.select(schema_evolution_projection)?;
}

source = add_generated_columns(
source,
&generated_col_expressions,
&missing_generated_columns,
&state,
)?;
if let Some(generated_columns_exp) = generated_col_exp {
if let Some(missing_generated_col) = missing_gen_col {
source = add_generated_columns(
source,
&generated_columns_exp,
&missing_generated_col,
&state,
)?;
}
}

let source = LogicalPlan::Extension(Extension {
node: Arc::new(MetricObserver {
Expand Down
Loading