Skip to content

Commit 4a9f784

Browse files
authored
update equivalence properties when rewriting schema (#507)
1 parent 8b2d747 commit 4a9f784

2 files changed

Lines changed: 17 additions & 4 deletions

File tree

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ duckdb = [
120120
"dep:async-stream",
121121
"dep:arrow-schema",
122122
"dep:byte-unit",
123+
"dep:datafusion-physical-expr",
123124
]
124125
duckdb-federation = ["duckdb", "federation"]
125126
federation = ["dep:datafusion-federation"]

core/src/duckdb/sql_table.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use futures::TryStreamExt;
2626
use std::collections::HashMap;
2727
use std::fmt::Display;
2828
use std::{any::Any, fmt, sync::Arc};
29+
use datafusion_physical_expr::EquivalenceProperties;
2930

3031
pub struct DuckDBTable<T: 'static, P: 'static> {
3132
pub(crate) base_table: SqlTable<T, P>,
@@ -146,6 +147,7 @@ pub struct DuckSqlExec<T, P> {
146147
indexes: Vec<(ColumnReference, IndexType)>,
147148
optimized_sql: Option<String>,
148149
optimized_sql_schema: Option<SchemaRef>,
150+
optimized_sql_properties: Option<PlanProperties>,
149151
}
150152

151153
impl<T, P> Clone for DuckSqlExec<T, P> {
@@ -156,11 +158,12 @@ impl<T, P> Clone for DuckSqlExec<T, P> {
156158
indexes: self.indexes.clone(),
157159
optimized_sql: self.optimized_sql.clone(),
158160
optimized_sql_schema: self.optimized_sql_schema.clone(),
161+
optimized_sql_properties: self.optimized_sql_properties.clone(),
159162
}
160163
}
161164
}
162165

163-
impl<T, P> DuckSqlExec<T, P> {
166+
impl<T: 'static, P: 'static> DuckSqlExec<T, P> {
164167
#[allow(clippy::too_many_arguments)]
165168
fn new(
166169
projections: Option<&Vec<usize>>,
@@ -188,6 +191,7 @@ impl<T, P> DuckSqlExec<T, P> {
188191
indexes,
189192
optimized_sql: None,
190193
optimized_sql_schema: None,
194+
optimized_sql_properties: None,
191195
})
192196
}
193197

@@ -225,18 +229,26 @@ impl<T, P> DuckSqlExec<T, P> {
225229
) -> Self {
226230
self.optimized_sql = Some(sql.into());
227231
self.optimized_sql_schema = new_schema;
232+
233+
if let Some(schema) = self.optimized_sql_schema.as_ref() {
234+
let mut properties = self.base_exec.properties().clone();
235+
let eq_properties = EquivalenceProperties::new(Arc::clone(schema));
236+
properties.eq_properties = eq_properties;
237+
self.optimized_sql_properties = Some(properties);
238+
}
239+
228240
self
229241
}
230242
}
231243

232-
impl<T, P> std::fmt::Debug for DuckSqlExec<T, P> {
244+
impl<T: 'static, P: 'static> std::fmt::Debug for DuckSqlExec<T, P> {
233245
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234246
let sql = self.sql().unwrap_or_default();
235247
write!(f, "DuckSqlExec sql={sql}")
236248
}
237249
}
238250

239-
impl<T, P> DisplayAs for DuckSqlExec<T, P> {
251+
impl<T: 'static, P: 'static> DisplayAs for DuckSqlExec<T, P> {
240252
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
241253
let sql = self.sql().unwrap_or_default();
242254
write!(f, "DuckSqlExec sql={sql}")
@@ -260,7 +272,7 @@ impl<T: 'static, P: 'static> ExecutionPlan for DuckSqlExec<T, P> {
260272
}
261273

262274
fn properties(&self) -> &PlanProperties {
263-
self.base_exec.properties()
275+
self.optimized_sql_properties.as_ref().unwrap_or(self.base_exec.properties())
264276
}
265277

266278
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

0 commit comments

Comments
 (0)