Skip to content

Commit 1f941aa

Browse files
authored
Feat/MinMaxToTopK (#304)
* chore: add MinMaxToTopK and EliminateRedundantSort support TopK * perf: Adjust TupleKey encoding to default to NULLS LAST, allowing Min/Max to be rewritten as TopK without filtering NULLs
1 parent fa4473f commit 1f941aa

File tree

15 files changed

+385
-378
lines changed

15 files changed

+385
-378
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ Run `make tpcc-dual` to mirror every TPCC statement to an in-memory SQLite datab
9393
All cases have been fully optimized.
9494
```shell
9595
<90th Percentile RT (MaxRT)>
96-
New-Order : 0.002 (0.006)
97-
Payment : 0.001 (0.019)
98-
Order-Status : 0.001 (0.003)
99-
Delivery : 0.022 (0.038)
100-
Stock-Level : 0.002 (0.005)
96+
New-Order : 0.002 (0.005)
97+
Payment : 0.001 (0.013)
98+
Order-Status : 0.002 (0.006)
99+
Delivery : 0.010 (0.023)
100+
Stock-Level : 0.002 (0.017)
101101
<TpmC>
102-
18432 Tpmc
102+
27226 Tpmc
103103
```
104104
#### 👉[check more](tpcc/README.md)
105105

src/binder/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
9898
return_orderby.push(SortField::new(
9999
expr,
100100
asc.is_none_or(|asc| asc),
101-
nulls_first.unwrap_or(true),
101+
nulls_first.unwrap_or(false),
102102
));
103103
}
104104
Some(return_orderby)

src/db.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,14 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
188188
NormalizationRuleImpl::PushLimitIntoTableScan,
189189
],
190190
)
191+
.before_batch(
192+
"TopK".to_string(),
193+
HepBatchStrategy::once_topdown(),
194+
vec![
195+
NormalizationRuleImpl::MinMaxToTopK,
196+
NormalizationRuleImpl::TopK,
197+
],
198+
)
191199
.before_batch(
192200
"Combine Operators".to_string(),
193201
HepBatchStrategy::fix_point_topdown(10),
@@ -197,11 +205,6 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
197205
NormalizationRuleImpl::CombineFilter,
198206
],
199207
)
200-
.before_batch(
201-
"TopK".to_string(),
202-
HepBatchStrategy::once_topdown(),
203-
vec![NormalizationRuleImpl::TopK],
204-
)
205208
.after_batch(
206209
"Eliminate Aggregate".to_string(),
207210
HepBatchStrategy::once_topdown(),
@@ -379,8 +382,16 @@ impl<S: Storage> Database<S> {
379382
};
380383
let transaction = Box::into_raw(Box::new(self.storage.transaction()?));
381384
let (schema, executor) =
382-
self.state
383-
.execute(unsafe { &mut (*transaction) }, statement, params)?;
385+
match self
386+
.state
387+
.execute(unsafe { &mut (*transaction) }, statement, params)
388+
{
389+
Ok(result) => result,
390+
Err(err) => {
391+
unsafe { drop(Box::from_raw(transaction)) };
392+
return Err(err);
393+
}
394+
};
384395
let inner = Box::into_raw(Box::new(TransactionIter::new(schema, executor)));
385396
Ok(DatabaseIter { transaction, inner })
386397
}

src/optimizer/core/memo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ mod tests {
191191
let sort_fields = vec![SortField::new(
192192
ScalarExpression::column_expr(c1_column.clone()),
193193
true,
194-
true,
194+
false,
195195
)];
196196
let scala_functions = Default::default();
197197
let table_functions = Default::default();

src/optimizer/rule/normalization/agg_elimination.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ use crate::expression::ScalarExpression;
1818
use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate};
1919
use crate::optimizer::core::rule::{MatchPattern, NormalizationRule};
2020
use crate::optimizer::plan_utils::{only_child_mut, replace_with_only_child};
21+
use crate::planner::operator::limit::LimitOperator;
2122
use crate::planner::operator::sort::SortField;
2223
use crate::planner::operator::{Operator, PhysicalOption, PlanImpl, SortOption};
2324
use crate::planner::{Childrens, LogicalPlan};
2425
use std::sync::LazyLock;
2526

2627
static REDUNDANT_SORT_PATTERN: LazyLock<Pattern> = LazyLock::new(|| Pattern {
27-
predicate: |op| matches!(op, Operator::Sort(_)),
28+
predicate: |op| matches!(op, Operator::Sort(_) | Operator::TopK(_)),
2829
children: PatternChildrenPredicate::None,
2930
});
3031

@@ -38,8 +39,12 @@ impl MatchPattern for EliminateRedundantSort {
3839

3940
impl NormalizationRule for EliminateRedundantSort {
4041
fn apply(&self, plan: &mut LogicalPlan) -> Result<bool, DatabaseError> {
41-
let sort_fields = match &plan.operator {
42-
Operator::Sort(sort_op) => sort_op.sort_fields.clone(),
42+
let (sort_fields, topk_limit) = match &plan.operator {
43+
Operator::Sort(sort_op) => (sort_op.sort_fields.clone(), None),
44+
Operator::TopK(topk_op) => (
45+
topk_op.sort_fields.clone(),
46+
Some((topk_op.limit, topk_op.offset)),
47+
),
4348
_ => return Ok(false),
4449
};
4550

@@ -54,6 +59,15 @@ impl NormalizationRule for EliminateRedundantSort {
5459
return Ok(false);
5560
}
5661

62+
if let Some((limit, offset)) = topk_limit {
63+
plan.operator = Operator::Limit(LimitOperator {
64+
offset,
65+
limit: Some(limit),
66+
});
67+
plan.physical_option = Some(PhysicalOption::new(PlanImpl::Limit, SortOption::Follow));
68+
return Ok(true);
69+
}
70+
5771
Ok(replace_with_only_child(plan))
5872
}
5973
}
@@ -165,7 +179,7 @@ fn distinct_sort_fields(groupby_exprs: &[ScalarExpression]) -> Vec<SortField> {
165179
groupby_exprs
166180
.iter()
167181
.cloned()
168-
.map(|expr| SortField::new(expr, true, true))
182+
.map(|expr| SortField::new(expr, true, false))
169183
.collect()
170184
}
171185

@@ -322,6 +336,7 @@ mod tests {
322336
use crate::planner::operator::filter::FilterOperator;
323337
use crate::planner::operator::sort::{SortField, SortOperator};
324338
use crate::planner::operator::table_scan::TableScanOperator;
339+
use crate::planner::operator::top_k::TopKOperator;
325340
use crate::planner::operator::{Operator, PhysicalOption, PlanImpl, SortOption};
326341
use crate::planner::{Childrens, LogicalPlan};
327342
use crate::types::index::{IndexInfo, IndexMeta, IndexType};
@@ -334,7 +349,7 @@ mod tests {
334349

335350
fn make_sort_field(name: &str) -> SortField {
336351
let column = ColumnRef::from(ColumnCatalog::new_dummy(name.to_string()));
337-
SortField::new(ScalarExpression::column_expr(column), true, true)
352+
SortField::new(ScalarExpression::column_expr(column), true, false)
338353
}
339354

340355
fn build_plan(
@@ -412,7 +427,7 @@ mod tests {
412427
let sort_fields = vec![SortField::new(
413428
ScalarExpression::column_expr(c1.clone()),
414429
true,
415-
true,
430+
false,
416431
)];
417432
let sort_option = SortOption::OrderBy {
418433
fields: sort_fields.clone(),
@@ -471,6 +486,28 @@ mod tests {
471486
Ok(())
472487
}
473488

489+
#[test]
490+
fn remove_topk_when_index_matches_order() -> Result<(), DatabaseError> {
491+
let sort_field = make_sort_field("c1");
492+
let mut plan = build_plan(vec![sort_field.clone()], vec![sort_field.clone()], 0);
493+
plan.operator = Operator::TopK(TopKOperator {
494+
sort_fields: vec![sort_field],
495+
limit: 10,
496+
offset: Some(5),
497+
});
498+
let rule = EliminateRedundantSort;
499+
500+
assert!(rule.apply(&mut plan)?);
501+
match plan.operator {
502+
Operator::Limit(limit_op) => {
503+
assert_eq!(limit_op.limit, Some(10));
504+
assert_eq!(limit_op.offset, Some(5));
505+
}
506+
_ => unreachable!("expected limit operator after removing topk"),
507+
}
508+
Ok(())
509+
}
510+
474511
#[test]
475512
fn remove_sort_when_prefix_can_be_ignored() -> Result<(), DatabaseError> {
476513
let c1 = make_sort_field("c1");
@@ -486,7 +523,7 @@ mod tests {
486523
#[test]
487524
fn annotate_sets_sort_hint_on_table_scan() -> Result<(), DatabaseError> {
488525
let column = ColumnRef::from(ColumnCatalog::new_dummy("c1".to_string()));
489-
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, true);
526+
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, false);
490527
let (index_info, _) = build_index_info(vec![sort_field.clone()], 0);
491528

492529
let mut columns = BTreeMap::new();
@@ -588,7 +625,7 @@ mod tests {
588625
#[test]
589626
fn promote_index_to_remove_sort() -> Result<(), DatabaseError> {
590627
let column = ColumnRef::from(ColumnCatalog::new_dummy("c_first".to_string()));
591-
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, true);
628+
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, false);
592629
let (mut index_info, _) = build_index_info(vec![sort_field.clone()], 0);
593630
index_info.range = Some(Range::Scope {
594631
min: Bound::Unbounded,

0 commit comments

Comments
 (0)