Skip to content

Commit fc9ac03

Browse files
committed
added gc writer check
Signed-off-by: JustinRush80 <[email protected]>
1 parent e13080b commit fc9ac03

File tree

3 files changed

+74
-32
lines changed

3 files changed

+74
-32
lines changed

crates/core/src/operations/merge/mod.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ use crate::operations::cdc::*;
8484
use crate::operations::merge::barrier::find_node;
8585
use crate::operations::write::execution::write_execution_plan_v2;
8686
use crate::operations::write::generated_columns::{
87-
add_generated_columns, add_missing_generated_columns,
87+
add_generated_columns, add_missing_generated_columns, should_gc,
8888
};
8989
use crate::operations::write::WriterStatsConfig;
9090
use crate::protocol::{DeltaOperation, MergePredicate};
@@ -779,17 +779,27 @@ async fn execute(
779779
None => TableReference::bare(UNNAMED_TABLE),
780780
};
781781

782-
let generated_col_expressions = snapshot
783-
.schema()
784-
.get_generated_columns()
785-
.unwrap_or_default();
782+
let mut generated_col_exp = None;
783+
let mut missing_generated_col = None;
784+
let mut source_with_gc = None;
786785

787-
let (source, missing_generated_columns) =
788-
add_missing_generated_columns(source, &generated_col_expressions)?;
786+
if should_gc(&snapshot)? {
787+
let generated_col_expressions = snapshot
788+
.schema()
789+
.get_generated_columns()
790+
.unwrap_or_default();
791+
792+
let (source, missing_generated_columns) =
793+
add_missing_generated_columns(source.clone(), &generated_col_expressions)?;
794+
795+
source_with_gc = Some(source);
796+
generated_col_exp = Some(generated_col_expressions);
797+
missing_generated_col = Some(missing_generated_columns);
798+
}
789799
// This is only done to provide the source columns with a correct table reference. Just renaming the columns does not work
790800
let source = LogicalPlanBuilder::scan(
791801
source_name.clone(),
792-
provider_as_source(source.into_view()),
802+
provider_as_source(source_with_gc.unwrap_or(source).into_view()),
793803
None,
794804
)?
795805
.build()?;
@@ -1345,12 +1355,16 @@ async fn execute(
13451355
.select(write_projection)?
13461356
};
13471357

1348-
projected = add_generated_columns(
1349-
projected,
1350-
&generated_col_expressions,
1351-
&missing_generated_columns,
1352-
&state,
1353-
)?;
1358+
if let Some(generated_col_expressions) = generated_col_exp {
1359+
if let Some(missing_generated_columns) = missing_generated_col {
1360+
projected = add_generated_columns(
1361+
projected,
1362+
&generated_col_expressions,
1363+
&missing_generated_columns,
1364+
&state,
1365+
)?;
1366+
}
1367+
}
13541368

13551369
let merge_final = &projected.into_unoptimized_plan();
13561370
let write = state.create_physical_plan(merge_final).await?;

crates/core/src/operations/write/generated_columns.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1+
use crate::table::state::DeltaTableState;
12
use datafusion::{execution::SessionState, prelude::DataFrame};
23
use datafusion_common::ScalarValue;
34
use datafusion_expr::{col, when, Expr, ExprSchemable};
45
use tracing::debug;
56

67
use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult};
78

9+
/// check whether not the g
10+
pub fn should_gc(snapshot: &DeltaTableState) -> DeltaResult<bool> {
11+
if let Some(features) = &snapshot.protocol().writer_features {
12+
if snapshot.protocol().min_writer_version < 4 {
13+
return Ok(false);
14+
}
15+
if snapshot.protocol().min_writer_version == 7
16+
&& !features.contains(&delta_kernel::table_features::WriterFeatures::GeneratedColumns)
17+
{
18+
return Ok(false);
19+
}
20+
}
21+
Ok(true)
22+
}
23+
824
/// Add generated column expressions to a dataframe
925
pub fn add_missing_generated_columns(
1026
mut df: DataFrame,

crates/core/src/operations/write/mod.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub mod writer;
3434
use arrow_schema::Schema;
3535
pub use configs::WriterStatsConfig;
3636
use datafusion::execution::SessionStateBuilder;
37-
use generated_columns::{add_generated_columns, add_missing_generated_columns};
37+
use generated_columns::{add_generated_columns, add_missing_generated_columns, should_gc};
3838
use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
3939
use std::collections::HashMap;
4040
use std::str::FromStr;
@@ -445,18 +445,26 @@ impl std::future::IntoFuture for WriteBuilder {
445445
state
446446
}
447447
};
448-
let generated_col_expressions = this
449-
.snapshot
450-
.as_ref()
451-
.map(|v| v.schema().get_generated_columns().unwrap_or_default())
452-
.unwrap_or_default();
453-
454448
let mut schema_drift = false;
455-
let source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
456-
457-
// Add missing generated columns to source_df
458-
let (mut source, missing_generated_columns) =
459-
add_missing_generated_columns(source, &generated_col_expressions)?;
449+
let mut generated_col_exp = None;
450+
let mut missing_gen_col = None;
451+
let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
452+
if let Some(snapshot) = &this.snapshot {
453+
if should_gc(snapshot)? {
454+
let generated_col_expressions = this
455+
.snapshot
456+
.as_ref()
457+
.map(|v| v.schema().get_generated_columns().unwrap_or_default())
458+
.unwrap_or_default();
459+
460+
// Add missing generated columns to source_df
461+
let (new_source, missing_generated_columns) =
462+
add_missing_generated_columns(source, &generated_col_expressions)?;
463+
source = new_source;
464+
missing_gen_col = Some(missing_generated_columns);
465+
generated_col_exp = Some(generated_col_expressions);
466+
}
467+
}
460468

461469
let source_schema: Arc<Schema> = Arc::new(source.schema().as_arrow().clone());
462470

@@ -527,12 +535,16 @@ impl std::future::IntoFuture for WriteBuilder {
527535
source = source.select(schema_evolution_projection)?;
528536
}
529537

530-
source = add_generated_columns(
531-
source,
532-
&generated_col_expressions,
533-
&missing_generated_columns,
534-
&state,
535-
)?;
538+
if let Some(generated_columns_exp) = generated_col_exp {
539+
if let Some(missing_generated_col) = missing_gen_col {
540+
source = add_generated_columns(
541+
source,
542+
&generated_columns_exp,
543+
&missing_generated_col,
544+
&state,
545+
)?;
546+
}
547+
}
536548

537549
let source = LogicalPlan::Extension(Extension {
538550
node: Arc::new(MetricObserver {

0 commit comments

Comments
 (0)