Skip to content

Commit 9e0b9d6

Browse files
committed
fix: preserve explain directives in federated SQL
1 parent a0a9090 commit 9e0b9d6

4 files changed

Lines changed: 237 additions & 41 deletions

File tree

datafusion-federation/src/analyzer/mod.rs

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ impl AnalyzerRule for FederationAnalyzerRule {
4646

4747
// Find all federation providers for TableReferences that appear in the plan, to resolve OuterRefColumns
4848
let providers = get_plan_provider_recursively(&plan)?;
49+
let explain_context = Self::explain_context_template(&plan);
4950

50-
match self.analyze_plan_recursively(&plan, true, config, &providers)? {
51+
match self.analyze_plan_recursively(&plan, true, config, &providers, explain_context)? {
5152
(Some(optimized_plan), _) => Ok(optimized_plan),
5253
(None, _) => Ok(plan),
5354
}
@@ -85,6 +86,27 @@ impl FederationAnalyzerRule {
8586
self
8687
}
8788

89+
fn explain_context_template(plan: &LogicalPlan) -> Option<LogicalPlan> {
90+
match plan {
91+
LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) => Some(plan.clone()),
92+
_ => None,
93+
}
94+
}
95+
96+
fn wrap_federated_plan(
97+
plan: LogicalPlan,
98+
explain_context: Option<&LogicalPlan>,
99+
) -> Result<LogicalPlan> {
100+
if matches!(plan, LogicalPlan::Explain(_) | LogicalPlan::Analyze(_)) {
101+
return Ok(plan);
102+
}
103+
104+
match explain_context {
105+
Some(wrapper) => wrapper.with_new_exprs(wrapper.expressions(), vec![plan]),
106+
None => Ok(plan),
107+
}
108+
}
109+
88110
/// Scans a plan to see if it belongs to a single [`FederationProvider`].
89111
fn scan_plan_recursively(
90112
&self,
@@ -184,7 +206,9 @@ impl FederationAnalyzerRule {
184206
is_root: bool,
185207
config: &ConfigOptions,
186208
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
209+
explain_context: Option<LogicalPlan>,
187210
) -> Result<(Option<LogicalPlan>, ScanResult)> {
211+
let explain_context = explain_context.or_else(|| Self::explain_context_template(plan));
188212
let mut sole_provider: ScanResult = ScanResult::None;
189213

190214
if let LogicalPlan::Extension(Extension { ref node }) = plan {
@@ -217,7 +241,9 @@ impl FederationAnalyzerRule {
217241
// Recursively analyze inputs
218242
let input_results = inputs
219243
.iter()
220-
.map(|i| self.analyze_plan_recursively(i, false, config, providers))
244+
.map(|i| {
245+
self.analyze_plan_recursively(i, false, config, providers, explain_context.clone())
246+
})
221247
.collect::<Result<Vec<_>>>()?;
222248

223249
// Aggregate the input providers
@@ -238,23 +264,25 @@ impl FederationAnalyzerRule {
238264

239265
// If all sources are federated to the same provider
240266
if let ScanResult::Distinct(provider) = sole_provider {
241-
// Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by the
242-
// Unparser, so they must not be federated as a whole. Only the inner
243-
// query should be federated; DataFusion's AnalyzeExec will handle
244-
// executing it and collecting metrics.
245-
let provider_analyzer = if matches!(plan, LogicalPlan::Analyze(_)) {
246-
None
247-
} else {
248-
provider.analyzer(plan)
249-
};
267+
// Explain and Analyze wrappers stay in the DataFusion plan so their
268+
// physical operators can still run. The corresponding directive is
269+
// injected into the federated subquery instead.
270+
let federated_plan = Self::wrap_federated_plan(plan.clone(), explain_context.as_ref())?;
271+
let provider_analyzer =
272+
if matches!(plan, LogicalPlan::Analyze(_) | LogicalPlan::Explain(_)) {
273+
None
274+
} else {
275+
provider.analyzer(&federated_plan)
276+
};
250277
match (is_root, provider_analyzer) {
251278
(false, Some(_)) => {
252279
// The largest sub-plan is higher up.
253280
return Ok((None, ScanResult::Distinct(provider)));
254281
}
255282
(true, Some(FederationAnalyzerForLogicalPlan::With(analyzer))) => {
256283
// If this is the root plan node; federate the entire plan
257-
let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?;
284+
let optimized =
285+
analyzer.execute_and_check(federated_plan, config, |_, _| {})?;
258286
return Ok((Some(optimized), ScanResult::None));
259287
}
260288
(_, None | Some(FederationAnalyzerForLogicalPlan::Unable)) => {
@@ -291,22 +319,26 @@ impl FederationAnalyzerRule {
291319
return Ok(original_input);
292320
};
293321

322+
let federated_input = Self::wrap_federated_plan(
323+
wrap_projection(original_input.clone())?,
324+
explain_context.as_ref(),
325+
)?;
326+
294327
let Some(FederationAnalyzerForLogicalPlan::With(analyzer)) =
295-
provider.analyzer(&original_input)
328+
provider.analyzer(&federated_input)
296329
else {
297330
// Either provider has no analyzer, or cannot federate [`LogicalPlan`].
298331
return Ok(original_input);
299332
};
300333

301334
// Replace the input with the federated counterpart
302-
let wrapped = wrap_projection(original_input)?;
303-
analyzer.execute_and_check(wrapped, config, |_, _| {})
335+
analyzer.execute_and_check(federated_input, config, |_, _| {})
304336
})
305337
.collect::<Result<Vec<_>>>()?;
306338

307339
// Optimize expressions if needed
308340
let new_expressions = if optimize_expressions {
309-
self.analyze_plan_exprs(plan, config, providers)?
341+
self.analyze_plan_exprs(plan, config, providers, explain_context)?
310342
} else {
311343
plan.expressions()
312344
};
@@ -324,13 +356,14 @@ impl FederationAnalyzerRule {
324356
plan: &LogicalPlan,
325357
config: &ConfigOptions,
326358
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
359+
explain_context: Option<LogicalPlan>,
327360
) -> Result<Vec<Expr>> {
328361
plan.expressions()
329362
.iter()
330363
.map(|expr| {
331-
let transformed = expr
332-
.clone()
333-
.transform(&|e| self.analyze_expr_recursively(e, config, providers))?;
364+
let transformed = expr.clone().transform(&|e| {
365+
self.analyze_expr_recursively(e, config, providers, explain_context.clone())
366+
})?;
334367
Ok(transformed.data)
335368
})
336369
.collect::<Result<Vec<_>>>()
@@ -343,12 +376,18 @@ impl FederationAnalyzerRule {
343376
expr: Expr,
344377
_config: &ConfigOptions,
345378
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
379+
explain_context: Option<LogicalPlan>,
346380
) -> Result<Transformed<Expr>> {
347381
match expr {
348382
Expr::ScalarSubquery(ref subquery) => {
349383
// Analyze as root to force federating the sub-query
350-
let (new_subquery, _) =
351-
self.analyze_plan_recursively(&subquery.subquery, true, _config, providers)?;
384+
let (new_subquery, _) = self.analyze_plan_recursively(
385+
&subquery.subquery,
386+
true,
387+
_config,
388+
providers,
389+
explain_context.clone(),
390+
)?;
352391
let Some(new_subquery) = new_subquery else {
353392
return Ok(Transformed::no(expr));
354393
};
@@ -382,6 +421,7 @@ impl FederationAnalyzerRule {
382421
true,
383422
_config,
384423
providers,
424+
explain_context,
385425
)?;
386426
let Some(new_subquery) = new_subquery else {
387427
return Ok(Transformed::no(expr));

datafusion-federation/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use datafusion::{
2424

2525
pub use analyzer::{get_table_source, FederationAnalyzerRule};
2626
pub use plan_node::{
27-
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederationPlanner,
27+
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederatedQueryType,
28+
FederationPlanner,
2829
};
2930
pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource};
3031

datafusion-federation/src/plan_node.rs

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,49 @@ use datafusion::{
1010
common::DFSchemaRef,
1111
error::{DataFusionError, Result},
1212
execution::context::{QueryPlanner, SessionState},
13-
logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore},
13+
logical_expr::{
14+
Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
15+
},
1416
physical_plan::ExecutionPlan,
1517
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
1618
};
1719

20+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21+
pub enum FederatedQueryType {
22+
Explain,
23+
Analyze,
24+
}
25+
26+
impl FederatedQueryType {
27+
pub fn prefix(self) -> &'static str {
28+
match self {
29+
Self::Explain => "EXPLAIN",
30+
Self::Analyze => "EXPLAIN ANALYZE",
31+
}
32+
}
33+
}
34+
1835
pub struct FederatedPlanNode {
1936
pub(crate) plan: LogicalPlan,
2037
pub(crate) planner: Arc<dyn FederationPlanner>,
38+
pub(crate) query_type: Option<FederatedQueryType>,
2139
}
2240

2341
impl FederatedPlanNode {
2442
pub fn new(plan: LogicalPlan, planner: Arc<dyn FederationPlanner>) -> Self {
25-
Self { plan, planner }
43+
Self::new_with_query_type(plan, planner, None)
44+
}
45+
46+
pub fn new_with_query_type(
47+
plan: LogicalPlan,
48+
planner: Arc<dyn FederationPlanner>,
49+
query_type: Option<FederatedQueryType>,
50+
) -> Self {
51+
Self {
52+
plan,
53+
planner,
54+
query_type,
55+
}
2656
}
2757

2858
pub fn plan(&self) -> &LogicalPlan {
@@ -32,6 +62,10 @@ impl FederatedPlanNode {
3262
pub fn planner(&self) -> &Arc<dyn FederationPlanner> {
3363
&self.planner
3464
}
65+
66+
pub fn query_type(&self) -> Option<FederatedQueryType> {
67+
self.query_type
68+
}
3569
}
3670

3771
impl Debug for FederatedPlanNode {
@@ -72,6 +106,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode {
72106
Ok(Self {
73107
plan: self.plan.clone(),
74108
planner: Arc::clone(&self.planner),
109+
query_type: self.query_type,
75110
})
76111
}
77112
}
@@ -83,6 +118,64 @@ impl FederatedQueryPlanner {
83118
pub fn new() -> Self {
84119
Self::default()
85120
}
121+
122+
fn annotate_query_type(
123+
plan: &LogicalPlan,
124+
query_type: FederatedQueryType,
125+
) -> Result<LogicalPlan> {
126+
let new_inputs = plan
127+
.inputs()
128+
.into_iter()
129+
.map(|input| Self::annotate_query_type(input, query_type))
130+
.collect::<Result<Vec<_>>>()?;
131+
let plan = if new_inputs.is_empty() {
132+
plan.clone()
133+
} else {
134+
plan.with_new_exprs(plan.expressions(), new_inputs)?
135+
};
136+
137+
if let LogicalPlan::Extension(Extension { node }) = &plan {
138+
if let Some(federated_node) = node.as_any().downcast_ref::<FederatedPlanNode>() {
139+
return Ok(LogicalPlan::Extension(Extension {
140+
node: Arc::new(FederatedPlanNode::new_with_query_type(
141+
federated_node.plan.clone(),
142+
Arc::clone(&federated_node.planner),
143+
Some(federated_node.query_type.unwrap_or(query_type)),
144+
)),
145+
}));
146+
}
147+
}
148+
149+
Ok(plan)
150+
}
151+
152+
fn annotate_query_directives(plan: &LogicalPlan) -> Result<LogicalPlan> {
153+
match plan {
154+
LogicalPlan::Explain(_) => {
155+
let inputs = plan.inputs();
156+
let [input] = inputs.as_slice() else {
157+
return Err(DataFusionError::Plan(
158+
"Explain plan must have exactly one input".into(),
159+
));
160+
};
161+
let annotated_input =
162+
Self::annotate_query_type(input, FederatedQueryType::Explain)?;
163+
plan.with_new_exprs(plan.expressions(), vec![annotated_input])
164+
}
165+
LogicalPlan::Analyze(_) => {
166+
let inputs = plan.inputs();
167+
let [input] = inputs.as_slice() else {
168+
return Err(DataFusionError::Plan(
169+
"Analyze plan must have exactly one input".into(),
170+
));
171+
};
172+
let annotated_input =
173+
Self::annotate_query_type(input, FederatedQueryType::Analyze)?;
174+
plan.with_new_exprs(plan.expressions(), vec![annotated_input])
175+
}
176+
_ => Ok(plan.clone()),
177+
}
178+
}
86179
}
87180

88181
#[async_trait]
@@ -92,14 +185,14 @@ impl QueryPlanner for FederatedQueryPlanner {
92185
logical_plan: &LogicalPlan,
93186
session_state: &SessionState,
94187
) -> Result<Arc<dyn ExecutionPlan>> {
95-
// Get provider here?
188+
let logical_plan = Self::annotate_query_directives(logical_plan)?;
96189

97190
let physical_planner =
98191
DefaultPhysicalPlanner::with_extension_planners(vec![
99192
Arc::new(FederatedPlanner::new()),
100193
]);
101194
physical_planner
102-
.create_physical_plan(logical_plan, session_state)
195+
.create_physical_plan(&logical_plan, session_state)
103196
.await
104197
}
105198
}
@@ -122,13 +215,16 @@ impl std::fmt::Debug for dyn FederationPlanner {
122215
impl PartialEq<FederatedPlanNode> for FederatedPlanNode {
123216
/// Comparing name, args and return_type
124217
fn eq(&self, other: &FederatedPlanNode) -> bool {
125-
self.plan == other.plan
218+
self.plan == other.plan && self.query_type == other.query_type
126219
}
127220
}
128221

129222
impl PartialOrd<FederatedPlanNode> for FederatedPlanNode {
130223
fn partial_cmp(&self, other: &FederatedPlanNode) -> Option<std::cmp::Ordering> {
131-
self.plan.partial_cmp(&other.plan)
224+
match self.plan.partial_cmp(&other.plan) {
225+
Some(std::cmp::Ordering::Equal) => self.query_type.partial_cmp(&other.query_type),
226+
ordering => ordering,
227+
}
132228
}
133229
}
134230

@@ -137,6 +233,7 @@ impl Eq for FederatedPlanNode {}
137233
impl Hash for FederatedPlanNode {
138234
fn hash<H: Hasher>(&self, state: &mut H) {
139235
self.plan.hash(state);
236+
self.query_type.hash(state);
140237
}
141238
}
142239

0 commit comments

Comments
 (0)