Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ readme = "README.md"
repository = "https://github.com/datafusion-contrib/datafusion-federation"

[workspace.dependencies]
arrow-json = "57.2"
arrow-json = "58.0"
async-stream = "0.3"
async-trait = "0.1"
datafusion = "52.0"
# TODO
#datafusion = "52.0"
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
Comment thread
nuno-faria marked this conversation as resolved.
Outdated
datafusion-federation = { path = "./datafusion-federation", version = "0.5.0" }
futures = "0.3"
tokio = { version = "1.41", features = ["full"] }
8 changes: 4 additions & 4 deletions datafusion-federation/src/schema_cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod struct_cast;
pub struct SchemaCastScanExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
properties: PlanProperties,
properties: Arc<PlanProperties>,
metrics_set: ExecutionPlanMetricsSet,
}

Expand All @@ -36,12 +36,12 @@ impl SchemaCastScanExec {
let eq_properties = input.equivalence_properties().clone();
let emission_type = input.pipeline_behavior();
let boundedness = input.boundedness();
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(),
emission_type,
boundedness,
);
));
Self {
input,
schema,
Expand All @@ -66,7 +66,7 @@ impl ExecutionPlan for SchemaCastScanExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
23 changes: 22 additions & 1 deletion datafusion-federation/src/sql/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion::{
logical_expr::{
expr::{
AggregateFunction, AggregateFunctionParams, Alias, Exists, InList, InSubquery,
PlannedReplaceSelectItem, ScalarFunction, Sort, Unnest, WildcardOptions,
PlannedReplaceSelectItem, ScalarFunction, SetComparison, Sort, Unnest, WildcardOptions,
WindowFunction, WindowFunctionParams,
},
Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Limit, LogicalPlan, Subquery,
Expand Down Expand Up @@ -563,6 +563,27 @@ fn rewrite_table_scans_in_expr(
Ok(Expr::Unnest(Unnest::new(expr)))
}
Expr::ScalarVariable(_, _) | Expr::Literal(_, _) | Expr::Placeholder(_) => Ok(expr),
Expr::SetComparison(sc) => {
let expr = rewrite_table_scans_in_expr(*sc.expr, known_rewrites)?;
let subquery_plan = rewrite_table_scans(&sc.subquery.subquery, known_rewrites)?;
let outer_ref_columns = sc
.subquery
.outer_ref_columns
.into_iter()
.map(|e| rewrite_table_scans_in_expr(e, known_rewrites))
.collect::<Result<Vec<Expr>>>()?;
let subquery = Subquery {
subquery: Arc::new(subquery_plan),
outer_ref_columns,
spans: Spans::new(),
};
Ok(Expr::SetComparison(SetComparison::new(
Box::new(expr),
subquery,
sc.op,
sc.quantifier,
)))
}
Comment on lines +566 to +586
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Expr::InSubquery, as it also contains an expr and a subquery.

}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion-federation/src/sql/ast_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl VisitorMut for TableArgReplace {
*args = Some(arg.clone());
if alias.is_none() {
*alias = Some(TableAlias {
explicit: true,
name: Ident::new(table.table()),
columns: vec![],
})
Expand Down
8 changes: 4 additions & 4 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,20 @@ impl FederationPlanner for SQLFederationPlanner {
pub struct VirtualExecutionPlan {
plan: LogicalPlan,
executor: Arc<dyn SQLExecutor>,
props: PlanProperties,
props: Arc<PlanProperties>,
statistics: Statistics,
filters: Vec<Arc<dyn PhysicalExpr>>,
}

impl VirtualExecutionPlan {
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>, statistics: Statistics) -> Self {
let schema: Schema = plan.schema().as_arrow().clone();
let props = PlanProperties::new(
let props = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::new(schema)),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
));
Self {
plan,
executor,
Expand Down Expand Up @@ -371,7 +371,7 @@ impl ExecutionPlan for VirtualExecutionPlan {
.execute(&self.final_sql()?, self.schema(), &self.filters)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.props
}

Expand Down
Loading