Skip to content

Commit 3b30127

Browse files
authored
chore: upgrade datafusion to 44 (#103)
1 parent 065dbe8 commit 3b30127

4 files changed

Lines changed: 18 additions & 11 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ arrow-flight = { version = "53.3", features = ["flight-sql-experimental"] }
2020
arrow-json = "53.3"
2121
async-stream = "0.3.5"
2222
async-trait = "0.1.83"
23-
datafusion = "43.0.0"
23+
datafusion = "44.0.0"
2424
datafusion-federation = { path = "./datafusion-federation", version = "0.3.3" }
25-
datafusion-substrait = "43.0.0"
25+
datafusion-substrait = "44.0.0"
2626
futures = "0.3.31"
2727
tokio = { version = "1.41", features = ["full"] }
2828
tonic = { version = "0.12", features = [

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ pub struct SchemaCastScanExec {
2828
impl SchemaCastScanExec {
2929
pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
3030
let eq_properties = input.equivalence_properties().clone();
31-
let execution_mode = input.execution_mode();
31+
let emission_type = input.pipeline_behavior();
32+
let boundedness = input.boundedness();
3233
let properties = PlanProperties::new(
3334
eq_properties,
3435
input.output_partitioning().clone(),
35-
execution_mode,
36+
emission_type,
37+
boundedness,
3638
);
3739
Self {
3840
input,

datafusion-federation/src/sql/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use datafusion::{
2020
optimizer::{optimizer::Optimizer, OptimizerConfig, OptimizerRule},
2121
physical_expr::EquivalenceProperties,
2222
physical_plan::{
23-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
23+
execution_plan::{Boundedness, EmissionType},
24+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
2425
SendableRecordBatchStream,
2526
},
2627
sql::{
@@ -579,15 +580,18 @@ fn rewrite_table_scans_in_expr(
579580
})
580581
})
581582
.transpose()?,
582-
..options
583+
..*options
583584
};
584585
if let Some(rewrite) = qualifier.as_ref().and_then(|q| known_rewrites.get(q)) {
585586
Ok(Expr::Wildcard {
586587
qualifier: Some(rewrite.clone()),
587-
options,
588+
options: Box::new(options),
588589
})
589590
} else {
590-
Ok(Expr::Wildcard { qualifier, options })
591+
Ok(Expr::Wildcard {
592+
qualifier,
593+
options: Box::new(options),
594+
})
591595
}
592596
}
593597
Expr::GroupingSet(gs) => match gs {
@@ -676,7 +680,8 @@ impl VirtualExecutionPlan {
676680
let props = PlanProperties::new(
677681
EquivalenceProperties::new(Arc::new(schema)),
678682
Partitioning::UnknownPartitioning(1),
679-
ExecutionMode::Bounded,
683+
EmissionType::Incremental,
684+
Boundedness::Bounded,
680685
);
681686
Self {
682687
plan,
@@ -952,7 +957,7 @@ mod tests {
952957
// different tables in single aggregation expression
953958
(
954959
"SELECT COUNT(CASE WHEN appt.a > 0 THEN appt.a ELSE dft.a END) FROM app_table as appt, foo.df_table as dft",
955-
"SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt JOIN remote_table AS dft"
960+
"SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt CROSS JOIN remote_table AS dft"
956961
),
957962
];
958963

datafusion-flight-sql-server/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ async fn parse_substrait_bytes(
10021002
.await
10031003
.map_err(df_error_to_status)?;
10041004

1005-
from_substrait_plan(&ctx.inner, &substrait_plan)
1005+
from_substrait_plan(&ctx.inner.state(), &substrait_plan)
10061006
.await
10071007
.map_err(df_error_to_status)
10081008
}

0 commit comments

Comments
 (0)