Skip to content
Open
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ default-members = [

tokio = { version = "1.47", features = ["macros", "rt", "sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }

# DataFusion dependencies
datafusion = { version = "52.0.0", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dirs = "6.0.0"
regex = "1.8"
object_store = "0.12.4"
url = "2.5.4"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
tracing-subscriber = { workspace = true }
tracing = { workspace = true }

futures = "0.3.31"
Expand Down
2 changes: 2 additions & 0 deletions connectors/datafusion/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ extensions_options! {
pub optd_strict_mode: bool, default = false
/// Disable DataFusion optimizers and run optd optimization only.
pub optd_only: bool, default = false
/// Print per-pass timing metrics while optd is running.
pub profile_passes: bool, default = false
}
}

Expand Down
237 changes: 191 additions & 46 deletions connectors/datafusion/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ mod from_optd;
mod into_optd;
mod utils;

use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use datafusion::{
catalog::memory::DataSourceExec,
Expand All @@ -28,7 +31,7 @@ use optd_core::{
statistics::{ColumnStatistics, TableStatistics},
},
magic::{MagicCardinalityEstimator, MagicCostModel},
rules,
rules::{self, PassExtension, PassManager, PassProfilingExtension},
};
use optd_repository_api::optd_catalog::RepositoryCatalog;
use snafu::{OptionExt, ResultExt, Snafu};
Expand Down Expand Up @@ -192,6 +195,80 @@ fn warm_explain_properties(op: &Arc<optd_core::ir::Operator>, ctx: &IRContext) {
let _ = op.cardinality(ctx);
}

/// One rendered optd logical-plan snapshot captured after a named pass.
#[derive(Clone, Debug, PartialEq, Eq)]
struct PassExplainSnapshot {
/// Stable pass name used to label the `EXPLAIN` section.
pass_name: &'static str,
/// Fully rendered optd plan after the pass completed.
rendered_plan: String,
}

/// Pass extension that records post-pass plans for `EXPLAIN` output.
#[derive(Default)]
struct ExplainPassExtension {
/// Captured snapshots in pass-execution order.
snapshots: Mutex<Vec<PassExplainSnapshot>>,
}

impl ExplainPassExtension {
/// Stores the initial optd logical plan.
fn record_initial_logical_plan(&self, rendered_plan: String) {
self.snapshots.lock().unwrap().insert(
0,
PassExplainSnapshot {
pass_name: "optd-initial",
rendered_plan,
},
);
}

/// Builds the optd logical-plan explain rows accumulated during planning.
fn logical_stringified_plans(&self) -> Vec<StringifiedPlan> {
let snapshots = self.snapshots.lock().unwrap().clone();
pass_explain_plans(&snapshots)
}
}

impl PassExtension for ExplainPassExtension {
fn after_pass(
&self,
pass_name: &'static str,
_before: &Arc<optd_core::ir::Operator>,
after: &Arc<optd_core::ir::Operator>,
ctx: &IRContext,
) -> optd_core::error::Result<()> {
warm_explain_properties(after, ctx);
self.snapshots.lock().unwrap().push(PassExplainSnapshot {
pass_name,
rendered_plan: quick_explain(after, ctx),
});
Ok(())
}
}

/// Converts captured pass snapshots into `EXPLAIN` rows.
fn pass_explain_plans(snapshots: &[PassExplainSnapshot]) -> Vec<StringifiedPlan> {
let mut plans = Vec::with_capacity(snapshots.len());
for snapshot in snapshots {
let optimizer_name = if snapshot.pass_name.starts_with("optd-") {
snapshot.pass_name.to_string()
} else {
format!("optd-{}", snapshot.pass_name)
};
let plan_type = if snapshot.pass_name == "cascades" {
PlanType::OptimizedPhysicalPlan { optimizer_name }
} else {
PlanType::OptimizedLogicalPlan { optimizer_name }
};
plans.push(StringifiedPlan::new(
plan_type,
snapshot.rendered_plan.clone(),
));
}
plans
}

impl OptdQueryPlanner {
fn optd_extension(session_state: &SessionState) -> Result<Arc<OptdExtension>> {
session_state
Expand Down Expand Up @@ -285,7 +362,29 @@ impl OptdQueryPlanner {

warm_explain_properties(&optd_logical, &ctx.inner);

let opt = Arc::new(Cascades::new(ctx.inner.clone(), rule_set));
let explain_pass_extension = explain
.as_ref()
.map(|_| Arc::new(ExplainPassExtension::default()));
let profile_passes = session_state
.config_options()
.extensions
.get::<OptdExtensionConfig>()
.map(|conf| conf.profile_passes)
.unwrap_or(false);
let mut pass_manager_builder = PassManager::builder();
if let Some(extension) = explain_pass_extension.as_ref() {
pass_manager_builder = pass_manager_builder.add_extension_arc(extension.clone());
}
if profile_passes {
pass_manager_builder =
pass_manager_builder.add_extension(PassProfilingExtension::default());
}
let pass_manager = pass_manager_builder.build();
let opt = Arc::new(Cascades::with_pass_manager(
ctx.inner.clone(),
rule_set,
pass_manager,
));

let optd_physical = match opt.optimize(&optd_logical, Arc::default()).await {
Ok(plan) => plan,
Expand Down Expand Up @@ -323,43 +422,17 @@ impl OptdQueryPlanner {
// for `(.output_columns)` and `(.cardinality)` in explain output.
warm_explain_properties(&optd_logical, &ctx.inner);
let s = quick_explain(&optd_logical, &ctx.inner);
x.stringified_plans.push(StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
optimizer_name: "optd-initial".to_string(),
},
s.clone(),
));
if let Some(extension) = explain_pass_extension.as_ref() {
extension.record_initial_logical_plan(s);
x.stringified_plans
.extend(extension.logical_stringified_plans());
}
x.stringified_plans.push(StringifiedPlan::new(
PlanType::FinalLogicalPlan,
logical_plan.display_indent().to_string(),
));
}

if let Some(x) = explain.as_mut() {
let s = quick_explain(&optd_logical, &opt.ctx);

x.stringified_plans.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-initial".to_string(),
},
s.clone(),
));
x.stringified_plans
.push(StringifiedPlan::new(PlanType::FinalLogicalPlan, s));
}

if let Some(x) = explain.as_mut() {
let s = quick_explain(&optd_physical, &opt.ctx);
x.stringified_plans.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-finalized".to_string(),
},
s.clone(),
));
x.stringified_plans
.push(StringifiedPlan::new(PlanType::FinalPhysicalPlan, s));
}

if let Some(x) = explain.as_mut() {
let config = &session_state.config_options().explain;
x.stringified_plans.push(StringifiedPlan::new(
Expand Down Expand Up @@ -485,18 +558,6 @@ impl OptdQueryPlanner {
// .push(StringifiedPlan::new(PlanType::FinalLogicalPlan, s));
// }

// if let Some(x) = explain.as_mut() {
// let s = quick_explain(&optd_physical, &opt.ctx);
// x.stringified_plans.push(StringifiedPlan::new(
// PlanType::OptimizedPhysicalPlan {
// optimizer_name: "optd-finalized".to_string(),
// },
// s.clone(),
// ));
// x.stringified_plans
// .push(StringifiedPlan::new(PlanType::FinalPhysicalPlan, s));
// }

// if let Some(x) = explain.as_mut() {
// let config = &session_state.config_options().explain;
// x.stringified_plans.push(StringifiedPlan::new(
Expand Down Expand Up @@ -759,3 +820,87 @@ fn get_join_order_from_df_exec(rel_node: &Arc<dyn ExecutionPlan>) -> Option<Join
}
None
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::{execution::runtime_env::RuntimeEnv, prelude::SessionConfig};
use tokio::runtime::Runtime;

use super::{PassExplainSnapshot, PlanType, pass_explain_plans};
use crate::{OptdExtensionConfig, create_optd_session_context};

#[test]
fn pass_explain_plans_labels_cascades_as_physical() {
let plans = pass_explain_plans(&[
PassExplainSnapshot {
pass_name: "optd-initial",
rendered_plan: "initial".to_string(),
},
PassExplainSnapshot {
pass_name: "decorrelation",
rendered_plan: "initial".to_string(),
},
PassExplainSnapshot {
pass_name: "simplification",
rendered_plan: "simplified".to_string(),
},
PassExplainSnapshot {
pass_name: "pruning",
rendered_plan: "simplified".to_string(),
},
PassExplainSnapshot {
pass_name: "cascades",
rendered_plan: "physical".to_string(),
},
]);

assert_eq!(plans.len(), 5);
assert_eq!(plans[0].plan.as_ref().as_str(), "initial");
assert_eq!(plans[1].plan.as_ref().as_str(), "initial");
assert_eq!(plans[2].plan.as_ref().as_str(), "simplified");
assert_eq!(plans[3].plan.as_ref().as_str(), "simplified");
assert_eq!(plans[4].plan.as_ref().as_str(), "physical");
assert_eq!(
plans[4].plan_type,
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-cascades".to_string()
}
);
}

#[test]
fn profile_passes_config_is_toggled_by_set_statement() {
let disabled =
create_optd_session_context(SessionConfig::new(), Arc::new(RuntimeEnv::default()));
let disabled_state = disabled.state();
let disabled_config = &disabled_state.config_options().extensions;
assert_eq!(
disabled_config
.get::<OptdExtensionConfig>()
.map(|conf| conf.profile_passes),
Some(false)
);

let enabled =
create_optd_session_context(SessionConfig::new(), Arc::new(RuntimeEnv::default()));
Runtime::new().unwrap().block_on(async {
enabled
.sql("set optd.profile_passes = true")
.await
.unwrap()
.collect()
.await
.unwrap();
});
let enabled_state = enabled.state();
let enabled_config = &enabled_state.config_options().extensions;
assert_eq!(
enabled_config
.get::<OptdExtensionConfig>()
.map(|conf| conf.profile_passes),
Some(true)
);
}
}
2 changes: 1 addition & 1 deletion optd/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ synopses = { path = "../../synopses" }
[dev-dependencies]
console-subscriber = "0.4.1"
tracing-test = "0.2.5"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tracing-subscriber = { workspace = true }
rand = "0.8.5"
36 changes: 33 additions & 3 deletions optd/core/src/cascades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,33 @@ use crate::{
rule::{OperatorPattern, Rule, RuleSet},
},
memo::{CostedExpr, Exploration, MemoGroupExpr, MemoTable, Optimization, Status, WithId},
rules::{EnforceTupleOrderingRule, SimplificationPass, UnnestingRule},
rules::{EnforceTupleOrderingRule, PassManager, SimplificationPass, UnnestingRule},
};

pub struct Cascades {
pub memo: tokio::sync::RwLock<MemoTable>,
pub ctx: Arc<IRContext>,
pub rule_set: RuleSet,
pub pass_manager: PassManager,
}

impl Cascades {
/// Creates a new Cascades optimizer instance.
pub fn new(ctx: Arc<IRContext>, rule_set: RuleSet) -> Self {
Self::with_pass_manager(ctx, rule_set, PassManager::default())
}

/// Creates a new Cascades optimizer instance with a custom pass manager.
pub fn with_pass_manager(
ctx: Arc<IRContext>,
rule_set: RuleSet,
pass_manager: PassManager,
) -> Self {
Self {
memo: tokio::sync::RwLock::new(MemoTable::new(ctx.clone())),
ctx,
rule_set,
pass_manager,
}
}

Expand All @@ -40,8 +51,27 @@ impl Cascades {
plan: &Arc<Operator>,
required: Arc<Required>,
) -> Result<Arc<Operator>> {
let decorrelated = UnnestingRule::new().apply(plan.clone(), &self.ctx)?;
let simplified = SimplificationPass::new().apply(decorrelated, &self.ctx)?;
let decorrelated = self
.pass_manager
.run(&UnnestingRule::new(), plan.clone(), &self.ctx)?;
let simplified =
self.pass_manager
.run(&SimplificationPass::new(), decorrelated, &self.ctx)?;
let cascades = self.clone();
self.pass_manager
.run_async("cascades", simplified, &self.ctx, move |simplified| {
let cascades = cascades.clone();
async move { cascades.optimize_cascades_phase(simplified, required).await }
})
.await
}

/// Runs the memo-based cascades search after pre-cascades passes finish.
async fn optimize_cascades_phase(
self: &Arc<Self>,
simplified: Arc<Operator>,
required: Arc<Required>,
) -> Result<Arc<Operator>> {
let group_id = self.insert_new_operator(&simplified).await;
let fut = self.find_best_costed_expr_for(group_id, required);
let rx = fut.await;
Expand Down
Loading
Loading