Skip to content

Commit 8ef4203

Browse files
committed
feat(sql): cost cluster-key join ordering
1 parent 9420180 commit 8ef4203

27 files changed

Lines changed: 1345 additions & 36 deletions

src/query/settings/src/settings_default.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static COST_FACTOR_COMPUTE_PER_ROW: u64 = 1;
3636
static COST_FACTOR_HASH_TABLE_PER_ROW: u64 = 10;
3737
static COST_FACTOR_AGGREGATE_PER_ROW: u64 = 5;
3838
static COST_FACTOR_NETWORK_PER_ROW: u64 = 50;
39+
static COST_FACTOR_CLUSTER_KEY: u64 = 100;
3940

4041
// Settings for readability and writability of tags.
4142
// we will not be able to safely get its value when set to only write.
@@ -1240,6 +1241,13 @@ impl DefaultSettings {
12401241
scope: SettingScope::Both,
12411242
range: Some(SettingRange::Numeric(0..=u64::MAX)),
12421243
}),
1244+
("cost_factor_cluster_key", DefaultSettingValue {
1245+
value: UserSettingValue::UInt64(COST_FACTOR_CLUSTER_KEY),
1246+
desc: "Cost factor percentage for clustered keys in join ordering. Set to 100 to disable this discount.",
1247+
mode: SettingMode::Both,
1248+
scope: SettingScope::Both,
1249+
range: Some(SettingRange::Numeric(0..=100)),
1250+
}),
12431251
// This setting has been deprecated, retained to prevent set errors.
12441252
("enable_geo_create_table", DefaultSettingValue {
12451253
value: UserSettingValue::UInt64(1),

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,10 @@ impl Settings {
903903
self.try_get_u64("cost_factor_network_per_row")
904904
}
905905

906+
pub fn get_cost_factor_cluster_key(&self) -> Result<u64> {
907+
self.try_get_u64("cost_factor_cluster_key")
908+
}
909+
906910
pub fn get_idle_transaction_timeout_secs(&self) -> Result<u64> {
907911
self.try_get_u64("idle_transaction_timeout_secs")
908912
}

src/query/sql/src/planner/expression/expression_parser.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_ast::ast::Expr as AExpr;
@@ -25,6 +26,7 @@ use databend_common_catalog::table_context::TableContext;
2526
use databend_common_exception::ErrorCode;
2627
use databend_common_exception::Result;
2728
use databend_common_expression::ColumnId;
29+
use databend_common_expression::ColumnIndex;
2830
use databend_common_expression::Constant;
2931
use databend_common_expression::DataSchemaRef;
3032
use databend_common_expression::Expr;
@@ -454,9 +456,39 @@ pub fn analyze_cluster_keys(
454456
ctx: Arc<dyn TableContext>,
455457
table_meta: Arc<dyn Table>,
456458
sql: &str,
457-
) -> Result<(String, Vec<Expr<Symbol>>)> {
459+
) -> Result<(String, Vec<Expr<FieldIndex>>)> {
460+
analyze_cluster_keys_impl(
461+
ctx,
462+
table_meta,
463+
sql,
464+
|_, column| Ok(column.as_field_index()),
465+
)
466+
}
467+
468+
pub fn analyze_cluster_key_order(
469+
ctx: Arc<dyn TableContext>,
470+
table_meta: Arc<dyn Table>,
471+
sql: &str,
472+
column_id_to_symbol: &HashMap<ColumnId, Symbol>,
473+
) -> Result<Vec<Expr<Symbol>>> {
474+
let (_, exprs) = analyze_cluster_keys_impl(ctx, table_meta, sql, |column_id, _| {
475+
column_id_to_symbol
476+
.get(&column_id)
477+
.copied()
478+
.ok_or_else(|| ErrorCode::Internal("Cluster key column should exist in table metadata"))
479+
})?;
480+
Ok(exprs)
481+
}
482+
483+
fn analyze_cluster_keys_impl<Index: ColumnIndex + Copy>(
484+
ctx: Arc<dyn TableContext>,
485+
table_meta: Arc<dyn Table>,
486+
sql: &str,
487+
mut project_column: impl FnMut(ColumnId, Symbol) -> Result<Index>,
488+
) -> Result<(String, Vec<Expr<Index>>)> {
458489
let ast_exprs = parse_cluster_key_exprs(sql)?;
459490
let (mut bind_context, metadata) = bind_table(table_meta)?;
491+
let metadata_ref = metadata.clone();
460492
let name_resolution_ctx = NameResolutionContext::try_from(ctx.get_settings().as_ref())?;
461493
let mut type_checker = TypeChecker::try_create(
462494
&mut bind_context,
@@ -478,13 +510,30 @@ pub fn analyze_cluster_keys(
478510
let mut cluster_keys = Vec::with_capacity(exprs.len());
479511
for ast in ast_exprs {
480512
let (scalar, _) = *type_checker.resolve(&ast)?;
481-
if scalar.used_columns().len() != 1 || !scalar.evaluable() {
513+
let used_columns = scalar.used_columns();
514+
if used_columns.len() != 1 || !scalar.evaluable() {
482515
return Err(ErrorCode::InvalidClusterKeys(format!(
483516
"Cluster by expression `{:#}` is invalid",
484517
ast
485518
)));
486519
}
487520

521+
let column = *used_columns
522+
.iter()
523+
.next()
524+
.expect("cluster key should use one column");
525+
let column_id = {
526+
let metadata = metadata_ref.read();
527+
let ColumnEntry::BaseTableColumn(BaseTableColumn { column_id, .. }) =
528+
metadata.column(column)
529+
else {
530+
return Err(ErrorCode::InvalidClusterKeys(format!(
531+
"Cluster by expression `{:#}` is invalid",
532+
ast
533+
)));
534+
};
535+
*column_id
536+
};
488537
let expr = scalar.as_symbol_expr()?;
489538
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
490539
return Err(ErrorCode::InvalidClusterKeys(format!(
@@ -501,6 +550,8 @@ pub fn analyze_cluster_keys(
501550
)));
502551
}
503552

553+
let target_column = project_column(column_id, column)?;
554+
let expr = expr.project_column_ref(|_| Ok(target_column))?;
504555
exprs.push(expr);
505556

506557
let mut cluster_by = ast.clone();

src/query/sql/src/planner/optimizer/ir/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub use expr::VisitAction;
3232
pub use group::Group;
3333
pub use group::GroupState;
3434
pub use memo::Memo;
35+
pub use property::ClusterKeyStatistics;
3536
pub use property::Distribution;
3637
pub use property::DistributionEnforcer;
3738
pub use property::Enforcer;

src/query/sql/src/planner/optimizer/ir/property/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub use builder::RelExpr;
2121
pub use enforcer::DistributionEnforcer;
2222
pub use enforcer::Enforcer;
2323
pub use enforcer::PropertyEnforcer;
24+
pub use property::ClusterKeyStatistics;
2425
pub use property::Distribution;
2526
pub use property::PhysicalProperty;
2627
pub use property::RelationalProperty;

src/query/sql/src/planner/optimizer/ir/property/property.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::fmt::Display;
1617
use std::fmt::Formatter;
1718

19+
use databend_common_exception::Result;
20+
use databend_common_expression::Expr;
21+
1822
use crate::ColumnSet;
23+
use crate::IndexType;
24+
use crate::Symbol;
1925
use crate::optimizer::ir::ColumnStatSet;
2026
use crate::plans::ScalarExpr;
2127
use crate::plans::ScalarItem;
@@ -38,13 +44,66 @@ impl Display for RequiredProperty {
3844
}
3945
}
4046

47+
#[derive(Default, Clone, Debug)]
48+
pub struct ClusterKeyStatistics {
49+
/// Table index -> cluster-key expressions in that table's cluster-key order.
50+
pub keys: BTreeMap<IndexType, Vec<Expr<Symbol>>>,
51+
/// Expressions filtered by equality with constants.
52+
pub filter_keys: Vec<Expr<Symbol>>,
53+
}
54+
55+
impl ClusterKeyStatistics {
56+
pub fn collect_filter_keys<'a>(
57+
predicates: impl IntoIterator<Item = &'a ScalarExpr>,
58+
) -> Result<Vec<Expr<Symbol>>> {
59+
let mut filter_keys = Vec::new();
60+
for predicate in predicates {
61+
Self::collect_equality_filter_key(predicate, &mut filter_keys)?;
62+
}
63+
Ok(filter_keys)
64+
}
65+
66+
fn collect_equality_filter_key(
67+
predicate: &ScalarExpr,
68+
filter_keys: &mut Vec<Expr<Symbol>>,
69+
) -> Result<()> {
70+
let mut stack = vec![predicate];
71+
while let Some(predicate) = stack.pop() {
72+
let ScalarExpr::FunctionCall(function) = predicate else {
73+
continue;
74+
};
75+
76+
match function.func_name.as_str() {
77+
"and" | "and_filters" => {
78+
stack.extend(function.arguments.iter().rev());
79+
}
80+
"eq" if function.arguments.len() == 2 => {
81+
let left = &function.arguments[0];
82+
let right = &function.arguments[1];
83+
match (
84+
left.used_columns().is_empty(),
85+
right.used_columns().is_empty(),
86+
) {
87+
(true, false) => filter_keys.push(right.as_symbol_expr()?),
88+
(false, true) => filter_keys.push(left.as_symbol_expr()?),
89+
_ => {}
90+
}
91+
}
92+
_ => {}
93+
}
94+
}
95+
Ok(())
96+
}
97+
}
98+
4199
#[derive(Default, Clone, Debug)]
42100
pub struct Statistics {
43101
// We can get the precise row count of a table in databend,
44102
// which information is useful to optimize some queries like `COUNT(*)`.
45103
pub precise_cardinality: Option<u64>,
46104
/// Statistics of columns, column index -> column stat
47105
pub column_stats: ColumnStatSet,
106+
pub cluster_key_stats: ClusterKeyStatistics,
48107
}
49108

50109
#[derive(Default, Clone, Debug)]

src/query/sql/src/planner/optimizer/ir/stats/join.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ mod tests {
714714
})),
715715
}),
716716
]),
717+
cluster_key_stats: Default::default(),
717718
};
718719

719720
JoinKeyStatUpdate::finish_join_histograms(&mut statistics, Symbol::new(0), true)?;
@@ -758,6 +759,7 @@ mod tests {
758759
})),
759760
}),
760761
]),
762+
cluster_key_stats: Default::default(),
761763
};
762764
let mut statistics = original_statistics.clone();
763765
let join_stat = statistics.column_stats.get_mut(&Symbol::new(0)).unwrap();

src/query/sql/src/planner/optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ pub use optimizer::optimize;
2828
pub use optimizer::optimize_query;
2929
pub use optimizer_api::Optimizer;
3030
pub use optimizer_context::OptimizerContext;
31+
pub use statistics::CollectStatisticsOptimizer;

0 commit comments

Comments
 (0)