Skip to content

Commit 45b6c23

Browse files
committed
fix(sql): preserve only probe-side cluster keys
1 parent 9f193b7 commit 45b6c23

3 files changed

Lines changed: 191 additions & 5 deletions

File tree

src/query/sql/src/planner/plans/join.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,10 @@ impl Join {
510510
let inner_join_cardinality = join_estimation.join_card();
511511
let cardinality =
512512
self.join_cardinality(left_cardinality, right_cardinality, inner_join_cardinality);
513-
let mut cluster_keys = left_statistics.cluster_keys.clone();
514-
cluster_keys.extend(right_statistics.cluster_keys.clone());
513+
// Hash join output follows the probe side. Build-side clustering is not
514+
// preserved by hash table lookups, even though build-side columns remain
515+
// available in the joined rows.
516+
let cluster_keys = left_statistics.cluster_keys.clone();
515517
if let Some(columns) = join_estimation.updated_columns() {
516518
match self.join_type {
517519
JoinType::LeftSemi => {

src/query/sql/tests/it/optimizer/cluster_key_join_order.rs

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,16 @@ use crate::framework::golden::write_case_title;
4343
struct JoinMemoCase<'a> {
4444
name: &'a str,
4545
description: &'a str,
46+
table_columns: &'a str,
4647
cluster_by: &'a str,
4748
sql: &'a str,
49+
column_statistics: fn(u64) -> HashMap<String, BasicColumnStatistics>,
4850
}
4951

52+
const KEY_TABLE_COLUMNS: &str = "(k1 BIGINT, k2 BIGINT, v BIGINT)";
53+
const TRACE_TABLE_COLUMNS: &str = "\
54+
(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING)";
55+
5056
fn table_statistics(rows: u64) -> TableStatistics {
5157
TableStatistics {
5258
num_rows: Some(rows),
@@ -78,6 +84,29 @@ fn column_statistics(rows: u64) -> HashMap<String, BasicColumnStatistics> {
7884
.collect()
7985
}
8086

87+
fn trace_column_statistics(rows: u64) -> HashMap<String, BasicColumnStatistics> {
88+
let mut stats = column_statistics(rows);
89+
stats.insert("start_day".to_string(), BasicColumnStatistics {
90+
min: Some(Datum::UInt(20240101)),
91+
max: Some(Datum::UInt(20241231)),
92+
ndv: Some(NdvEstimate::exact(365.0)),
93+
null_count: 0,
94+
in_memory_size: rows.saturating_mul(4),
95+
});
96+
stats.insert("trace_id".to_string(), BasicColumnStatistics {
97+
min: Some(Datum::Bytes(
98+
b"0000000000000000000000000000000000000000".to_vec(),
99+
)),
100+
max: Some(Datum::Bytes(
101+
b"ffffffffffffffffffffffffffffffffffffffff".to_vec(),
102+
)),
103+
ndv: Some(NdvEstimate::exact(rows as f64)),
104+
null_count: 0,
105+
in_memory_size: rows.saturating_mul(40),
106+
});
107+
stats
108+
}
109+
81110
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
82111
async fn test_cluster_key_order_join_memo_golden() -> Result<()> {
83112
let mut file = open_golden_file("optimizer", "cluster_key_join_order.txt")?;
@@ -86,46 +115,85 @@ async fn test_cluster_key_order_join_memo_golden() -> Result<()> {
86115
JoinMemoCase {
87116
name: "k1_k2_prefix",
88117
description: "Full memo output when the clustered probe can first match a.k1.",
118+
table_columns: KEY_TABLE_COLUMNS,
89119
cluster_by: "CLUSTER BY (k1, k2)",
90120
sql: "
91121
SELECT *
92122
FROM a
93123
JOIN b ON a.k1 = b.k1
94124
JOIN c ON a.k2 = c.k2
95125
",
126+
column_statistics,
96127
},
97128
JoinMemoCase {
98129
name: "k2_k1_prefix",
99130
description: "Full memo output when the clustered probe can first match a.k2.",
131+
table_columns: KEY_TABLE_COLUMNS,
100132
cluster_by: "CLUSTER BY (k2, k1)",
101133
sql: "
102134
SELECT *
103135
FROM a
104136
JOIN b ON a.k1 = b.k1
105137
JOIN c ON a.k2 = c.k2
106138
",
139+
column_statistics,
107140
},
108141
JoinMemoCase {
109142
name: "filter_preserves_cluster_keys",
110143
description: "Cluster keys still affect join order after a filter on the clustered table.",
144+
table_columns: KEY_TABLE_COLUMNS,
111145
cluster_by: "CLUSTER BY (k1, k2)",
112146
sql: "
113147
SELECT *
114148
FROM (SELECT * FROM a WHERE v >= 0) a
115149
JOIN b ON a.k1 = b.k1
116150
JOIN c ON a.k2 = c.k2
117151
",
152+
column_statistics,
118153
},
119154
JoinMemoCase {
120155
name: "limit_and_join_preserve_cluster_keys",
121156
description: "Cluster keys still affect join order after a limit subquery and a partial join.",
157+
table_columns: KEY_TABLE_COLUMNS,
122158
cluster_by: "CLUSTER BY (k1, k2)",
123159
sql: "
124160
SELECT *
125161
FROM (SELECT * FROM a LIMIT 1000) a
126162
JOIN b ON a.k1 = b.k1
127163
JOIN c ON a.k2 = c.k2
128164
",
165+
column_statistics,
166+
},
167+
JoinMemoCase {
168+
name: "build_side_cluster_keys_do_not_propagate",
169+
description: "Cluster keys from a build-side clustered table do not affect later join costs.",
170+
table_columns: KEY_TABLE_COLUMNS,
171+
cluster_by: "CLUSTER BY (k1, k2)",
172+
sql: "
173+
SELECT *
174+
FROM b
175+
JOIN (SELECT * FROM a LIMIT 100) a ON b.k1 = a.k1
176+
JOIN (SELECT * FROM c LIMIT 10) c ON a.k2 = c.k2
177+
",
178+
column_statistics,
179+
},
180+
JoinMemoCase {
181+
name: "linear_expression_cluster_key",
182+
description: "A LINEAR cluster key with to_yyyymmdd and substring expressions affects join costs.",
183+
table_columns: TRACE_TABLE_COLUMNS,
184+
cluster_by: "CLUSTER BY linear (
185+
to_yyyymmdd(start_time),
186+
SUBSTRING(trace_id FROM 1 FOR 40)
187+
)",
188+
sql: "
189+
SELECT *
190+
FROM a
191+
JOIN b
192+
ON to_yyyymmdd(a.start_time) = b.start_day
193+
AND SUBSTRING(a.trace_id FROM 1 FOR 40) = b.trace_id
194+
JOIN c ON a.k2 = c.k2
195+
",
196+
column_statistics: trace_column_statistics,
129197
},
130198
] {
131199
write_cluster_key_join_order_memo(&mut file, case).await?;
@@ -146,16 +214,19 @@ async fn write_cluster_key_join_order_memo(
146214
for table in ["a", "b", "c"] {
147215
let table_cluster_by = if table == "a" { case.cluster_by } else { "" };
148216
let setup_sql = match table_cluster_by {
149-
"" => format!("CREATE TABLE {table}(k1 BIGINT, k2 BIGINT, v BIGINT)"),
217+
"" => format!("CREATE TABLE {table}{}", case.table_columns),
150218
_ => {
151-
format!("CREATE TABLE {table}(k1 BIGINT, k2 BIGINT, v BIGINT) {table_cluster_by}")
219+
format!(
220+
"CREATE TABLE {table}{} {table_cluster_by}",
221+
case.table_columns
222+
)
152223
}
153224
};
154225
writeln!(file, "setup: {setup_sql}")?;
155226
ctx.register_table_sql_with_stats(
156227
&setup_sql,
157228
Some(table_statistics(1000)),
158-
column_statistics(1000),
229+
(case.column_statistics)(1000),
159230
)
160231
.await?;
161232
}

src/query/sql/tests/it/optimizer/cluster_key_join_order.txt

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,116 @@ Memo
194194
└── #0 EvalScalar [#6]
195195

196196

197+
=== build_side_cluster_keys_do_not_propagate ===
198+
description: Cluster keys from a build-side clustered table do not affect later join costs.
199+
setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2)
200+
setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT)
201+
setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT)
202+
sql: SELECT *
203+
FROM b
204+
JOIN (SELECT * FROM a LIMIT 100) a ON b.k1 = a.k1
205+
JOIN (SELECT * FROM c LIMIT 10) c ON a.k2 = c.k2
206+
memo:
207+
DPhpyOptimizer:
208+
join_order_candidate:
209+
- parent: [a, c], left: [a], right: [c], cost: 1000.000, previous best: -, probe factor: 1.000, selected: true
210+
- parent: [a, b], left: [b], right: [a], cost: 100000.000, previous best: -, probe factor: 1.000, selected: true
211+
- parent: [a, b, c], left: [b], right: [a, c], cost: 1001000.000, previous best: -, probe factor: 1.000, selected: true
212+
- parent: [a, b, c], left: [a, b], right: [c], cost: 1100000.000, previous best: 1001000.000, probe factor: 1.000, selected: false
213+
214+
Memo
215+
├── root group: #9
216+
├── estimated memory: 6.25 KiB
217+
├── Group #0
218+
│ ├── Best properties
219+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
220+
│ └── #0 Scan []
221+
├── Group #1
222+
│ ├── Best properties
223+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
224+
│ └── #0 Scan []
225+
├── Group #2
226+
│ ├── Best properties
227+
│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }]
228+
│ └── #0 Limit [#1]
229+
├── Group #3
230+
│ ├── Best properties
231+
│ │ └── { dist: Any }: expr: #0, cost: 2100.000, children: [{ dist: Any }]
232+
│ └── #0 EvalScalar [#2]
233+
├── Group #4
234+
│ ├── Best properties
235+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
236+
│ └── #0 Scan []
237+
├── Group #5
238+
│ ├── Best properties
239+
│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }]
240+
│ └── #0 Limit [#4]
241+
├── Group #6
242+
│ ├── Best properties
243+
│ │ └── { dist: Any }: expr: #0, cost: 2010.000, children: [{ dist: Any }]
244+
│ └── #0 EvalScalar [#5]
245+
├── Group #7
246+
│ ├── Best properties
247+
│ │ └── { dist: Any }: expr: #0, cost: 4310.000, children: [{ dist: Any }, { dist: Any }]
248+
│ └── #0 Join [#3, #6]
249+
├── Group #8
250+
│ ├── Best properties
251+
│ │ └── { dist: Any }: expr: #0, cost: 16310.000, children: [{ dist: Any }, { dist: Any }]
252+
│ └── #0 Join [#0, #7]
253+
└── Group #9
254+
├── Best properties
255+
│ └── { dist: Any }: expr: #0, cost: 1016310.000, children: [{ dist: Any }]
256+
└── #0 EvalScalar [#8]
257+
258+
259+
=== linear_expression_cluster_key ===
260+
description: A LINEAR cluster key with to_yyyymmdd and substring expressions affects join costs.
261+
setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING) CLUSTER BY linear (
262+
to_yyyymmdd(start_time),
263+
SUBSTRING(trace_id FROM 1 FOR 40)
264+
)
265+
setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING)
266+
setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING)
267+
sql: SELECT *
268+
FROM a
269+
JOIN b
270+
ON to_yyyymmdd(a.start_time) = b.start_day
271+
AND SUBSTRING(a.trace_id FROM 1 FOR 40) = b.trace_id
272+
JOIN c ON a.k2 = c.k2
273+
memo:
274+
DPhpyOptimizer:
275+
join_order_candidate:
276+
- parent: [a, c], left: [a], right: [c], cost: 1000.000, previous best: -, probe factor: 1.000, selected: true
277+
- parent: [a, b], left: [a], right: [b], cost: 902500.000, previous best: -, probe factor: 0.902, selected: true
278+
- parent: [a, b, c], left: [a, b], right: [c], cost: 903500.000, previous best: -, probe factor: 1.000, selected: true
279+
- parent: [a, b, c], left: [a, c], right: [b], cost: 1001000.000, previous best: 903500.000, probe factor: 1.000, selected: false
280+
281+
Memo
282+
├── root group: #5
283+
├── estimated memory: 3.75 KiB
284+
├── Group #0
285+
│ ├── Best properties
286+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
287+
│ └── #0 Scan []
288+
├── Group #1
289+
│ ├── Best properties
290+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
291+
│ └── #0 Scan []
292+
├── Group #2
293+
│ ├── Best properties
294+
│ │ └── { dist: Any }: expr: #0, cost: 13000.000, children: [{ dist: Any }, { dist: Any }]
295+
│ └── #0 Join [#0, #1]
296+
├── Group #3
297+
│ ├── Best properties
298+
│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: []
299+
│ └── #0 Scan []
300+
├── Group #4
301+
│ ├── Best properties
302+
│ │ └── { dist: Any }: expr: #0, cost: 1024000.000, children: [{ dist: Any }, { dist: Any }]
303+
│ └── #0 Join [#2, #3]
304+
└── Group #5
305+
├── Best properties
306+
│ └── { dist: Any }: expr: #0, cost: 1025000.000, children: [{ dist: Any }]
307+
└── #0 EvalScalar [#4]
308+
309+

0 commit comments

Comments
 (0)