Skip to content

Commit b012bb2

Browse files
committed
chore: codefmt
1 parent abcb351 commit b012bb2

File tree

6 files changed

+61
-233
lines changed

6 files changed

+61
-233
lines changed

src/execution/dql/aggregate/stream_distinct.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for StreamDistinctExecutor {
6464

6565
if last_keys.as_ref() != Some(&group_keys) {
6666
last_keys = Some(group_keys.clone());
67-
co.yield_(Ok(Tuple::new(None, group_keys))).await;
67+
co.yield_(Ok(Tuple::new(tuple.pk, group_keys))).await;
6868
}
6969
}
7070
})
@@ -78,11 +78,14 @@ mod tests {
7878
use crate::execution::dql::aggregate::stream_distinct::StreamDistinctExecutor;
7979
use crate::execution::{try_collect, ReadExecutor};
8080
use crate::expression::ScalarExpression;
81+
use crate::optimizer::heuristic::batch::HepBatchStrategy;
82+
use crate::optimizer::heuristic::optimizer::HepOptimizerPipeline;
83+
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
8184
use crate::planner::operator::aggregate::AggregateOperator;
8285
use crate::planner::operator::values::ValuesOperator;
8386
use crate::planner::operator::Operator;
8487
use crate::planner::{Childrens, LogicalPlan};
85-
use crate::storage::rocksdb::RocksStorage;
88+
use crate::storage::rocksdb::{RocksStorage, RocksTransaction};
8689
use crate::storage::{StatisticsMetaCache, Storage, TableCache, ViewCache};
8790
use crate::types::value::DataValue;
8891
use crate::types::LogicalType;
@@ -113,6 +116,21 @@ mod tests {
113116
Ok((table_cache, view_cache, meta_cache, temp_dir, storage))
114117
}
115118

119+
fn optimize_exprs(plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
120+
HepOptimizerPipeline::builder()
121+
.before_batch(
122+
"Expression Remapper".to_string(),
123+
HepBatchStrategy::once_topdown(),
124+
vec![
125+
NormalizationRuleImpl::BindExpressionPosition,
126+
NormalizationRuleImpl::EvaluatorBind,
127+
],
128+
)
129+
.build()
130+
.instantiate(plan)
131+
.find_best::<RocksTransaction>(None)
132+
}
133+
116134
#[test]
117135
fn stream_distinct_single_column_sorted() -> Result<(), DatabaseError> {
118136
let desc = ColumnDesc::new(LogicalType::Integer, None, false, None)?;
@@ -140,11 +158,16 @@ mod tests {
140158
agg_calls: vec![],
141159
is_distinct: true,
142160
};
161+
let plan = LogicalPlan::new(Operator::Aggregate(agg), Childrens::Only(Box::new(input)));
162+
let plan = optimize_exprs(plan)?;
163+
let Operator::Aggregate(agg) = plan.operator else {
164+
unreachable!()
165+
};
143166

144167
let (table_cache, view_cache, meta_cache, _temp_dir, storage) = build_test_storage()?;
145168
let mut transaction = storage.transaction()?;
146169
let tuples = try_collect(
147-
StreamDistinctExecutor::from((agg, input))
170+
StreamDistinctExecutor::from((agg, plan.childrens.pop_only()))
148171
.execute((&table_cache, &view_cache, &meta_cache), &mut transaction),
149172
)?;
150173

@@ -187,11 +210,16 @@ mod tests {
187210
agg_calls: vec![],
188211
is_distinct: true,
189212
};
213+
let plan = LogicalPlan::new(Operator::Aggregate(agg), Childrens::Only(Box::new(input)));
214+
let plan = optimize_exprs(plan)?;
215+
let Operator::Aggregate(agg) = plan.operator else {
216+
unreachable!()
217+
};
190218

191219
let (table_cache, view_cache, meta_cache, _temp_dir, storage) = build_test_storage()?;
192220
let mut transaction = storage.transaction()?;
193221
let tuples = try_collect(
194-
StreamDistinctExecutor::from((agg, input))
222+
StreamDistinctExecutor::from((agg, plan.childrens.pop_only()))
195223
.execute((&table_cache, &view_cache, &meta_cache), &mut transaction),
196224
)?;
197225

src/expression/simplify.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ impl VisitorMut<'_> for ConstantCalculator {
100100
ScalarExpression::TypeCast { expr: arg_expr, ty } => {
101101
self.visit(arg_expr)?;
102102

103-
if let ScalarExpression::Constant(value) = arg_expr.as_ref() {
104-
let casted = value.clone().cast(ty)?;
103+
if let ScalarExpression::Constant(value) = arg_expr.as_mut() {
104+
let casted = mem::replace(value, DataValue::Null).cast(ty)?;
105105
let _ = mem::replace(expr, ScalarExpression::Constant(casted));
106106
}
107107
}

tests/slt/where_by_index.slt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ create index p_index on t1 (c1, c2);
1919
statement ok
2020
analyze table t1;
2121

22-
query IIT
22+
query IIT
2323
select * from t1 limit 10;
2424
----
2525
0 1 2
@@ -256,3 +256,26 @@ select c2, c1 from t1 where c1 < 10 and c1 > 0 and c2 > 0 and c2 < 10;
256256

257257
statement ok
258258
drop table t1;
259+
260+
statement ok
261+
create table t_cover(id int primary key, c1 int, c2 int, c3 int);
262+
263+
statement ok
264+
insert into t_cover values
265+
(1, 1, 10, 11),
266+
(2, 2, 20, 21),
267+
(3, 2, 22, 23),
268+
(4, 3, 30, 31);
269+
270+
statement ok
271+
create index idx_cover on t_cover (c1, c2, c3);
272+
273+
# composite index trailing columns cover (index columns > output columns)
274+
query II rowsort
275+
select c2, c3 from t_cover where c1 = 2;
276+
----
277+
20 21
278+
22 23
279+
280+
statement ok
281+
drop table t_cover;

tpcc/src/backend/dual.rs

Lines changed: 0 additions & 223 deletions
Original file line numberDiff line numberDiff line change
@@ -290,226 +290,3 @@ fn sqlite_statement_spec(spec: &StatementSpec) -> StatementSpec {
290290
spec.clone()
291291
}
292292
}
293-
294-
#[cfg(test)]
295-
mod tests {
296-
use super::*;
297-
use crate::backend::{
298-
BackendControl, BackendTransaction, ColumnType, DbParam, PreparedStatement, StatementSpec,
299-
};
300-
use chrono::NaiveDateTime;
301-
use kite_sql::types::value::DataValue;
302-
use rand::Rng;
303-
use std::fs;
304-
use std::path::PathBuf;
305-
306-
const DISTINCT_ORDER_BY_SQL: &str = "SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 AND ol_o_id < ?3 AND ol_o_id >= (?4 - 20) ORDER BY ol_i_id";
307-
const DISTINCT_DELIVERY_SQL: &str =
308-
"SELECT DISTINCT ol_delivery_d FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 ORDER BY ol_delivery_d";
309-
const INT32_RESULT: [ColumnType; 1] = [ColumnType::Int32];
310-
const DATETIME_RESULT: [ColumnType; 1] = [ColumnType::NullableDateTime];
311-
312-
struct TempKiteDir {
313-
path: PathBuf,
314-
}
315-
316-
impl TempKiteDir {
317-
fn new() -> Result<Self, TpccError> {
318-
let mut path = std::env::temp_dir();
319-
let suffix: u64 = rand::thread_rng().gen();
320-
path.push(format!("tpcc_dual_test_{suffix}"));
321-
if path.exists() {
322-
fs::remove_dir_all(&path)?;
323-
}
324-
Ok(Self { path })
325-
}
326-
}
327-
328-
impl Drop for TempKiteDir {
329-
fn drop(&mut self) {
330-
let _ = fs::remove_dir_all(&self.path);
331-
}
332-
}
333-
334-
fn prepare_single<B: BackendControl>(
335-
backend: &B,
336-
spec: StatementSpec,
337-
) -> Result<PreparedStatement, TpccError> {
338-
let mut groups = backend.prepare_statements(&[vec![spec]])?;
339-
Ok(groups
340-
.pop()
341-
.and_then(|mut group| group.pop())
342-
.expect("missing prepared statement"))
343-
}
344-
345-
fn collect_i32<B: BackendControl>(
346-
backend: &B,
347-
sql: &'static str,
348-
params: &[DbParam],
349-
) -> Result<Vec<i32>, TpccError> {
350-
let statement = prepare_single(
351-
backend,
352-
StatementSpec {
353-
sql,
354-
result_types: &INT32_RESULT,
355-
},
356-
)?;
357-
let mut tx = backend.new_transaction()?;
358-
let rows = {
359-
let mut iter = tx.execute(&statement, params)?;
360-
let mut rows = Vec::new();
361-
while let Some(row) = iter.next() {
362-
let tuple = row?;
363-
let value = tuple.values[0].i32().expect("expected Int32 column");
364-
rows.push(value);
365-
}
366-
rows
367-
};
368-
tx.commit()?;
369-
Ok(rows)
370-
}
371-
372-
fn collect_datetime<B: BackendControl>(
373-
backend: &B,
374-
sql: &'static str,
375-
params: &[DbParam],
376-
) -> Result<Vec<Option<NaiveDateTime>>, TpccError> {
377-
let statement = prepare_single(
378-
backend,
379-
StatementSpec {
380-
sql,
381-
result_types: &DATETIME_RESULT,
382-
},
383-
)?;
384-
let mut tx = backend.new_transaction()?;
385-
let rows = {
386-
let mut iter = tx.execute(&statement, params)?;
387-
let mut rows = Vec::new();
388-
while let Some(row) = iter.next() {
389-
let tuple = row?;
390-
rows.push(tuple.values[0].datetime());
391-
}
392-
rows
393-
};
394-
tx.commit()?;
395-
Ok(rows)
396-
}
397-
398-
fn parse_dt(value: &str) -> NaiveDateTime {
399-
NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S").expect("invalid datetime literal")
400-
}
401-
402-
#[test]
403-
fn dual_stock_level_distinct_ordering_mismatch() -> Result<(), TpccError> {
404-
let temp_dir = TempKiteDir::new()?;
405-
let backend = DualBackend::new(
406-
temp_dir
407-
.path
408-
.to_str()
409-
.expect("temporary path should be valid utf-8"),
410-
)?;
411-
412-
backend.execute_batch(
413-
"create table order_line (
414-
ol_o_id int not null,
415-
ol_d_id tinyint not null,
416-
ol_w_id smallint not null,
417-
ol_number tinyint not null,
418-
ol_i_id int,
419-
ol_supply_w_id smallint,
420-
ol_delivery_d datetime,
421-
ol_quantity tinyint,
422-
ol_amount decimal(6,2),
423-
ol_dist_info char(24),
424-
PRIMARY KEY(ol_w_id, ol_d_id, ol_o_id, ol_number)
425-
);",
426-
)?;
427-
backend.execute_batch(
428-
"CREATE INDEX fkey_order_line_1 ON order_line (ol_o_id, ol_d_id, ol_w_id);",
429-
)?;
430-
431-
// Large distinct set makes accidental order matches vanishingly unlikely.
432-
let i_ids = [
433-
87181, 5901, 26250, 58438, 92124, 84344, 72959, 10014, 46998, 2731, 11001, 12002,
434-
13003, 14004, 15005, 16006, 17007, 18008, 19009, 20010,
435-
];
436-
let delivery_samples = [
437-
None,
438-
Some("2024-01-02 00:00:00"),
439-
Some("2024-01-01 00:00:00"),
440-
Some("2023-12-31 00:00:00"),
441-
];
442-
443-
for (idx, i_id) in i_ids.iter().enumerate() {
444-
let o_id = idx + 1;
445-
let delivery = delivery_samples
446-
.get(idx)
447-
.and_then(|value| *value)
448-
.map(|value| format!("'{value}'"))
449-
.unwrap_or_else(|| "NULL".to_string());
450-
backend.execute_batch(&format!(
451-
"insert into order_line values ({o_id}, 1, 1, 1, {i_id}, 1, {delivery}, 1, 1.00, 'dist')"
452-
))?;
453-
}
454-
backend.execute_batch(
455-
"insert into order_line values (21, 1, 1, 1, 87181, 1, NULL, 1, 1.00, 'dist')",
456-
)?;
457-
backend.execute_batch(
458-
"insert into order_line values (22, 2, 1, 1, 99999, 1, NULL, 1, 1.00, 'dist')",
459-
)?;
460-
461-
let params: [DbParam; 4] = [
462-
("?1", DataValue::Int16(1)),
463-
("?2", DataValue::Int8(1)),
464-
("?3", DataValue::Int32(21)),
465-
("?4", DataValue::Int32(21)),
466-
];
467-
468-
let kite_unordered = collect_i32(&backend.kite, STOCK_LEVEL_DISTINCT_SQL, &params)?;
469-
let sqlite_unordered = collect_i32(&backend.sqlite, STOCK_LEVEL_DISTINCT_SQLITE, &params)?;
470-
let dual_unordered = collect_i32(&backend, STOCK_LEVEL_DISTINCT_SQL, &params)?;
471-
472-
let mut kite_sorted = kite_unordered.clone();
473-
let mut sqlite_sorted = sqlite_unordered.clone();
474-
kite_sorted.sort_unstable();
475-
sqlite_sorted.sort_unstable();
476-
assert_eq!(
477-
kite_sorted, sqlite_sorted,
478-
"distinct sets diverge; ordering is the only allowed difference"
479-
);
480-
assert_ne!(
481-
kite_unordered, sqlite_unordered,
482-
"unordered DISTINCT should not be compared row-by-row"
483-
);
484-
let mut dual_sorted = dual_unordered.clone();
485-
dual_sorted.sort_unstable();
486-
assert_eq!(
487-
dual_sorted, kite_sorted,
488-
"dual comparison should match KiteSQL distinct set"
489-
);
490-
491-
let kite_ordered = collect_i32(&backend.kite, DISTINCT_ORDER_BY_SQL, &params)?;
492-
let sqlite_ordered = collect_i32(&backend.sqlite, DISTINCT_ORDER_BY_SQL, &params)?;
493-
assert_eq!(
494-
kite_ordered, sqlite_ordered,
495-
"explicit ORDER BY should align results across engines"
496-
);
497-
498-
let params_delivery: [DbParam; 2] =
499-
[("?1", DataValue::Int16(1)), ("?2", DataValue::Int8(1))];
500-
let expected_delivery = vec![
501-
None,
502-
Some(parse_dt("2023-12-31 00:00:00")),
503-
Some(parse_dt("2024-01-01 00:00:00")),
504-
Some(parse_dt("2024-01-02 00:00:00")),
505-
];
506-
let kite_delivery =
507-
collect_datetime(&backend.kite, DISTINCT_DELIVERY_SQL, &params_delivery)?;
508-
let sqlite_delivery =
509-
collect_datetime(&backend.sqlite, DISTINCT_DELIVERY_SQL, &params_delivery)?;
510-
assert_eq!(kite_delivery, expected_delivery);
511-
assert_eq!(sqlite_delivery, expected_delivery);
512-
513-
Ok(())
514-
}
515-
}

tpcc/src/load.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,9 +785,9 @@ impl Load {
785785

786786
let ol_dist_info = generate_string(rng, 24, 24);
787787
let (ol_delivery_d, ol_amount) = if o_id > 2100 {
788-
("null".to_string(), "0.00".to_string())
788+
("null", "0.00".to_string())
789789
} else {
790-
(date.clone(), format!("{:.2}", rng.gen_range(0.1..100.0)))
790+
(date.as_str(), format!("{:.2}", rng.gen_range(0.1..100.0)))
791791
};
792792
exec.execute_batch(&format!(
793793
"insert into order_line values({}, {}, {}, {}, {}, {}, {}, {}, {}, '{}')",

tpcc/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ fn explain_tpcc() -> Result<(), DatabaseError> {
696696
use kite_sql::db::DataBaseBuilder;
697697
use kite_sql::types::tuple::create_table;
698698

699-
let database = DataBaseBuilder::path("./../kite_sql_tpcc").build()?;
699+
let database = DataBaseBuilder::path("./kite_sql_tpcc").build()?;
700700
let mut tx = database.new_transaction()?;
701701

702702
let customer_tuple = tx

0 commit comments

Comments
 (0)