Skip to content

Commit abcb351

Browse files
committed
feat: impl StreamDistinct & elimination plan constant cast
1 parent a8f96b1 commit abcb351

File tree

26 files changed

+1063
-106
lines changed

26 files changed

+1063
-106
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ sqlite_bench
2424
kite_sql_tpcc
2525
copy.csv
2626

27-
tests/data/row_20000.csv
27+
tests/data/row_20000.csv
28+
tests/data/distinct_rows.csv

Cargo.lock

Lines changed: 43 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ tokio = { version = "1.36", features = ["full"], optional = true
7171

7272

7373
[target.'cfg(unix)'.dev-dependencies]
74-
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
74+
pprof = { version = "0.15", features = ["flamegraph", "criterion"] }
7575

7676
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
7777
criterion = { version = "0.5", features = ["html_reports"] }

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ Run `make tpcc-dual` to mirror every TPCC statement to an in-memory SQLite datab
9393
All cases have been fully optimized.
9494
```shell
9595
<90th Percentile RT (MaxRT)>
96-
New-Order : 0.002 (0.012)
97-
Payment : 0.001 (0.002)
98-
Order-Status : 0.002 (0.019)
99-
Delivery : 0.001 (0.001)
100-
Stock-Level : 0.002 (0.018)
96+
New-Order : 0.002 (0.006)
97+
Payment : 0.001 (0.019)
98+
Order-Status : 0.001 (0.003)
99+
Delivery : 0.022 (0.038)
100+
Stock-Level : 0.002 (0.005)
101101
<TpmC>
102-
37166 Tpmc
102+
18432 Tpmc
103103
```
104104
#### 👉[check more](tpcc/README.md)
105105

src/db.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,12 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
203203
vec![NormalizationRuleImpl::TopK],
204204
)
205205
.after_batch(
206-
"Eliminate Redundant Sort".to_string(),
206+
"Eliminate Aggregate".to_string(),
207207
HepBatchStrategy::once_topdown(),
208-
vec![NormalizationRuleImpl::EliminateRedundantSort],
208+
vec![
209+
NormalizationRuleImpl::EliminateRedundantSort,
210+
NormalizationRuleImpl::UseStreamDistinct,
211+
],
209212
)
210213
.after_batch(
211214
"Expression Remapper".to_string(),

src/execution/dql/aggregate/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod count;
1717
pub mod hash_agg;
1818
mod min_max;
1919
pub mod simple_agg;
20+
pub mod stream_distinct;
2021
mod sum;
2122

2223
use crate::errors::DatabaseError;
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// Copyright 2024 KipData/KiteSQL
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::execution::{build_read, spawn_executor, Executor, ReadExecutor};
16+
use crate::expression::ScalarExpression;
17+
use crate::planner::operator::aggregate::AggregateOperator;
18+
use crate::planner::LogicalPlan;
19+
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
20+
use crate::throw;
21+
use crate::types::tuple::Tuple;
22+
use crate::types::value::DataValue;
23+
use itertools::Itertools;
24+
25+
pub struct StreamDistinctExecutor {
26+
groupby_exprs: Vec<ScalarExpression>,
27+
input: LogicalPlan,
28+
}
29+
30+
impl From<(AggregateOperator, LogicalPlan)> for StreamDistinctExecutor {
31+
fn from((op, input): (AggregateOperator, LogicalPlan)) -> Self {
32+
StreamDistinctExecutor {
33+
groupby_exprs: op.groupby_exprs,
34+
input,
35+
}
36+
}
37+
}
38+
39+
impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for StreamDistinctExecutor {
40+
fn execute(
41+
self,
42+
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
43+
transaction: *mut T,
44+
) -> Executor<'a> {
45+
spawn_executor(move |co| async move {
46+
let StreamDistinctExecutor {
47+
groupby_exprs,
48+
mut input,
49+
} = self;
50+
51+
let schema_ref = input.output_schema().clone();
52+
let mut executor = build_read(input, cache, transaction);
53+
let mut last_keys: Option<Vec<DataValue>> = None;
54+
55+
for result in executor.by_ref() {
56+
let tuple = throw!(co, result);
57+
let group_keys: Vec<DataValue> = throw!(
58+
co,
59+
groupby_exprs
60+
.iter()
61+
.map(|expr| expr.eval(Some((&tuple, &schema_ref))))
62+
.try_collect()
63+
);
64+
65+
if last_keys.as_ref() != Some(&group_keys) {
66+
last_keys = Some(group_keys.clone());
67+
co.yield_(Ok(Tuple::new(None, group_keys))).await;
68+
}
69+
}
70+
})
71+
}
72+
}
73+
74+
#[cfg(all(test, not(target_arch = "wasm32")))]
75+
mod tests {
76+
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef};
77+
use crate::errors::DatabaseError;
78+
use crate::execution::dql::aggregate::stream_distinct::StreamDistinctExecutor;
79+
use crate::execution::{try_collect, ReadExecutor};
80+
use crate::expression::ScalarExpression;
81+
use crate::planner::operator::aggregate::AggregateOperator;
82+
use crate::planner::operator::values::ValuesOperator;
83+
use crate::planner::operator::Operator;
84+
use crate::planner::{Childrens, LogicalPlan};
85+
use crate::storage::rocksdb::RocksStorage;
86+
use crate::storage::{StatisticsMetaCache, Storage, TableCache, ViewCache};
87+
use crate::types::value::DataValue;
88+
use crate::types::LogicalType;
89+
use crate::utils::lru::SharedLruCache;
90+
use itertools::Itertools;
91+
use std::hash::RandomState;
92+
use std::sync::Arc;
93+
use tempfile::TempDir;
94+
95+
#[allow(clippy::type_complexity)]
96+
fn build_test_storage() -> Result<
97+
(
98+
Arc<TableCache>,
99+
Arc<ViewCache>,
100+
Arc<StatisticsMetaCache>,
101+
TempDir,
102+
RocksStorage,
103+
),
104+
DatabaseError,
105+
> {
106+
let meta_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?);
107+
let view_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?);
108+
let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?);
109+
110+
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
111+
let storage = RocksStorage::new(temp_dir.path())?;
112+
113+
Ok((table_cache, view_cache, meta_cache, temp_dir, storage))
114+
}
115+
116+
#[test]
117+
fn stream_distinct_single_column_sorted() -> Result<(), DatabaseError> {
118+
let desc = ColumnDesc::new(LogicalType::Integer, None, false, None)?;
119+
let schema_ref = Arc::new(vec![ColumnRef::from(ColumnCatalog::new(
120+
"c1".to_string(),
121+
true,
122+
desc,
123+
))]);
124+
125+
let input = LogicalPlan::new(
126+
Operator::Values(ValuesOperator {
127+
rows: vec![
128+
vec![DataValue::Int32(1)],
129+
vec![DataValue::Int32(1)],
130+
vec![DataValue::Int32(2)],
131+
vec![DataValue::Int32(2)],
132+
vec![DataValue::Int32(3)],
133+
],
134+
schema_ref: schema_ref.clone(),
135+
}),
136+
Childrens::None,
137+
);
138+
let agg = AggregateOperator {
139+
groupby_exprs: vec![ScalarExpression::column_expr(schema_ref[0].clone())],
140+
agg_calls: vec![],
141+
is_distinct: true,
142+
};
143+
144+
let (table_cache, view_cache, meta_cache, _temp_dir, storage) = build_test_storage()?;
145+
let mut transaction = storage.transaction()?;
146+
let tuples = try_collect(
147+
StreamDistinctExecutor::from((agg, input))
148+
.execute((&table_cache, &view_cache, &meta_cache), &mut transaction),
149+
)?;
150+
151+
let actual = tuples
152+
.into_iter()
153+
.flat_map(|tuple| tuple.values)
154+
.flat_map(|value| value.i32())
155+
.collect_vec();
156+
assert_eq!(actual, vec![1, 2, 3]);
157+
158+
Ok(())
159+
}
160+
161+
#[test]
162+
fn stream_distinct_multi_column_sorted() -> Result<(), DatabaseError> {
163+
let desc = ColumnDesc::new(LogicalType::Integer, None, false, None)?;
164+
let schema_ref = Arc::new(vec![
165+
ColumnRef::from(ColumnCatalog::new("c1".to_string(), true, desc.clone())),
166+
ColumnRef::from(ColumnCatalog::new("c2".to_string(), true, desc)),
167+
]);
168+
169+
let input = LogicalPlan::new(
170+
Operator::Values(ValuesOperator {
171+
rows: vec![
172+
vec![DataValue::Int32(1), DataValue::Int32(1)],
173+
vec![DataValue::Int32(1), DataValue::Int32(1)],
174+
vec![DataValue::Int32(1), DataValue::Int32(2)],
175+
vec![DataValue::Int32(2), DataValue::Int32(1)],
176+
vec![DataValue::Int32(2), DataValue::Int32(1)],
177+
],
178+
schema_ref: schema_ref.clone(),
179+
}),
180+
Childrens::None,
181+
);
182+
let agg = AggregateOperator {
183+
groupby_exprs: vec![
184+
ScalarExpression::column_expr(schema_ref[0].clone()),
185+
ScalarExpression::column_expr(schema_ref[1].clone()),
186+
],
187+
agg_calls: vec![],
188+
is_distinct: true,
189+
};
190+
191+
let (table_cache, view_cache, meta_cache, _temp_dir, storage) = build_test_storage()?;
192+
let mut transaction = storage.transaction()?;
193+
let tuples = try_collect(
194+
StreamDistinctExecutor::from((agg, input))
195+
.execute((&table_cache, &view_cache, &meta_cache), &mut transaction),
196+
)?;
197+
198+
let actual = tuples
199+
.into_iter()
200+
.map(|tuple| {
201+
tuple
202+
.values
203+
.into_iter()
204+
.flat_map(|value| value.i32())
205+
.collect_vec()
206+
})
207+
.collect_vec();
208+
assert_eq!(actual, vec![vec![1, 1], vec![1, 2], vec![2, 1]]);
209+
210+
Ok(())
211+
}
212+
}

0 commit comments

Comments
 (0)