Skip to content

Commit b91705c

Browse files
authored
Use 2 partitions in planner tests (#17)
* test with 2 partitions * new plans * regenerate with single partition of input data
1 parent f2500d1 commit b91705c

19 files changed

+1670
-703
lines changed

README.md

-20
Original file line numberDiff line numberDiff line change
@@ -145,26 +145,6 @@ export TPCH_DATA_PATH=`pwd`/data
145145
cargo test
146146
```
147147

148-
Tests compare plans with expected plans, which unfortunately contain the
149-
path to the parquet tables. The path committed under version control is
150-
the one of a Github Runner, and won't work locally. You can fix it by
151-
running the following command:
152-
153-
```bash
154-
./scripts/replace-expected-plan-paths.sh local-dev
155-
````
156-
157-
When instead you need to regenerate the plans, which you can do by
158-
re-running the planner tests removing all the content of
159-
`testdata/expected-plans`, they will now contain your local paths. You can
160-
fix it before committing the plans running
161-
162-
```bash
163-
164-
./scripts/replace-expected-plan-paths.sh pre-ci
165-
166-
```
167-
168148
## Benchmarking
169149

170150
Create a release build when running benchmarks, then use pip to install the wheel.

src/planner.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,8 @@ mod test {
394394

395395
let file = format!("testdata/queries/q{n}.sql");
396396
let sql = fs::read_to_string(&file)?;
397-
let config = SessionConfig::new().with_target_partitions(1);
398-
let ctx = SessionContext::with_config(config);
397+
let config = SessionConfig::new().with_target_partitions(2);
398+
let ctx = SessionContext::new_with_config(config);
399399
let tables = &[
400400
"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier",
401401
];
@@ -423,7 +423,7 @@ mod test {
423423
displayable(plan.as_ref()).indent(false)
424424
));
425425

426-
output.push_str("RaySQL Plan\n===========\n\n");
426+
output.push_str("DataFusion Ray Distributed Plan\n===========\n\n");
427427
let graph = make_execution_graph(plan, false)?;
428428
for id in 0..=graph.get_final_query_stage().id {
429429
let query_stage = graph.query_stages.get(&id).unwrap();

testdata/expected-plans/q1.txt

+29-14
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,38 @@ Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
1111
DataFusion Physical Plan
1212
========================
1313

14-
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[false]
15-
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
16-
AggregateExec: mode=Single, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
17-
ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_2, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
14+
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
15+
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
16+
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
17+
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
1818
CoalesceBatchesExec: target_batch_size=8192
19-
FilterExec: l_shipdate@6 <= 1998-09-24
20-
ParquetExec: file_groups={ ... }, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], predicate=l_shipdate@10 <= 1998-09-24, pruning_predicate=CASE WHEN l_shipdate_null_count@1 = l_shipdate_row_count@2 THEN false ELSE l_shipdate_min@0 <= 1998-09-24 END, required_guarantees=[]
19+
RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 2), input_partitions=2
20+
AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
21+
ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_2, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
22+
CoalesceBatchesExec: target_batch_size=8192
23+
FilterExec: l_shipdate@6 <= 1998-09-24
24+
ParquetExec: file_groups={ ... }, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], predicate=l_shipdate@10 <= 1998-09-24, pruning_predicate=CASE WHEN l_shipdate_null_count@1 = l_shipdate_row_count@2 THEN false ELSE l_shipdate_min@0 <= 1998-09-24 END, required_guarantees=[]
2125

22-
RaySQL Plan
26+
DataFusion Ray Distributed Plan
2327
===========
2428

25-
Query Stage #0 (1 -> 1):
26-
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[false]
27-
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
28-
AggregateExec: mode=Single, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
29-
ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_2, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
29+
Query Stage #0 (2 -> 2):
30+
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
31+
AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
32+
ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_2, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
33+
CoalesceBatchesExec: target_batch_size=8192
34+
FilterExec: l_shipdate@6 <= 1998-09-24
35+
ParquetExec: file_groups={ ... }, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], predicate=l_shipdate@10 <= 1998-09-24, pruning_predicate=CASE WHEN l_shipdate_null_count@1 = l_shipdate_row_count@2 THEN false ELSE l_shipdate_min@0 <= 1998-09-24 END, required_guarantees=[]
36+
37+
Query Stage #1 (2 -> 2):
38+
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
39+
SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true]
40+
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order]
41+
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
3042
CoalesceBatchesExec: target_batch_size=8192
31-
FilterExec: l_shipdate@6 <= 1998-09-24
32-
ParquetExec: file_groups={ ... }, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], predicate=l_shipdate@10 <= 1998-09-24, pruning_predicate=CASE WHEN l_shipdate_null_count@1 = l_shipdate_row_count@2 THEN false ELSE l_shipdate_min@0 <= 1998-09-24 END, required_guarantees=[]
43+
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
44+
45+
Query Stage #2 (2 -> 1):
46+
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
47+
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
3348

0 commit comments

Comments
 (0)