From f8b74a0bd231820d6fa011fb76372f4203ef4593 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 20:00:02 +0800 Subject: [PATCH 1/4] [release test] Add TPCH queries Q3, Q10, and Q18 to release data tests Signed-off-by: daiping8 --- .../nightly_tests/dataset/tpch/tpch_q10.py | 93 +++++++++++++++++++ .../nightly_tests/dataset/tpch/tpch_q18.py | 71 ++++++++++++++ release/nightly_tests/dataset/tpch/tpch_q3.py | 68 ++++++++++++++ release/release_data_tests.yaml | 39 ++++++++ 4 files changed, 271 insertions(+) create mode 100644 release/nightly_tests/dataset/tpch/tpch_q10.py create mode 100644 release/nightly_tests/dataset/tpch/tpch_q18.py create mode 100644 release/nightly_tests/dataset/tpch/tpch_q3.py diff --git a/release/nightly_tests/dataset/tpch/tpch_q10.py b/release/nightly_tests/dataset/tpch/tpch_q10.py new file mode 100644 index 000000000000..77fb710ab5dd --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q10.py @@ -0,0 +1,93 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + nation = load_table("nation", args.sf) + + # Q10 parameters + date = datetime(1993, 10, 1) + # Calculate end date (3 months later) + if date.month <= 9: + end_date = datetime(date.year, date.month + 3, date.day) + else: + end_date = datetime(date.year + 1, date.month + 3 - 12, date.day) + + # Filter orders by date (3 months range) + orders_filtered = orders.filter( + expr=((col("o_orderdate") >= date) & (col("o_orderdate") < end_date)) + ) + + # Filter lineitem by return flag + lineitem_filtered = lineitem.filter(expr=col("l_returnflag") == "R") + + # Join lineitem with orders + lineitem_orders = lineitem_filtered.join( + orders_filtered, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with customer + ds = lineitem_orders.join( + customer, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join with nation + ds = ds.join( + nation, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by customer key, customer name, address, phone, account balance, nation, and region + _ = ( + ds.groupby( + [ + "c_custkey", + "c_name", + "c_acctbal", + "n_name", + "c_address", + "c_phone", + "c_comment", + ] + ) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key="revenue", descending=True) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q10", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q18.py b/release/nightly_tests/dataset/tpch/tpch_q18.py new file mode 100644 index 000000000000..eb5be680aa98 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q18.py @@ -0,0 +1,71 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q18 parameters + quantity = 300 + + # Calculate total quantity per order + lineitem_quantity = lineitem.groupby("l_orderkey").aggregate( + Sum(on="l_quantity", alias_name="total_quantity") + ) + + # Filter orders with total quantity > threshold + large_orders = lineitem_quantity.filter(expr=col("total_quantity") > quantity) + + # Join lineitem with large orders + lineitem_large = lineitem.join( + large_orders, + join_type="inner", + num_partitions=100, + on="l_orderkey", + ) + + # Join with orders + lineitem_orders = lineitem_large.join( + orders, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with customer + ds = lineitem_orders.join( + customer, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Aggregate by customer name, customer key, order key, and order date + _ = ( + ds.groupby( + ["c_name", "c_custkey", "o_orderkey", "o_orderdate", "o_totalprice"] + ) + .aggregate(Sum(on="l_quantity", alias_name="sum_quantity")) + .sort(key=["o_totalprice", "o_orderdate"], descending=[True, False]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q18", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q3.py b/release/nightly_tests/dataset/tpch/tpch_q3.py new file mode 100644 index 000000000000..dfe1b6869077 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q3.py @@ -0,0 +1,68 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q3 parameters + date = datetime(1995, 3, 15) + segment = "BUILDING" + + # Filter customer by segment + customer_filtered = customer.filter(expr=col("c_mktsegment") == segment) + + # Filter orders by date + orders_filtered = orders.filter(expr=col("o_orderdate") < date) + + # Join orders with customer + orders_customer = orders_filtered.join( + customer_filtered, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join with lineitem and filter by ship date + lineitem_filtered = lineitem.filter(expr=col("l_shipdate") > date) + ds = lineitem_filtered.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by order key, order date, and ship priority + _ = ( + ds.groupby(["l_orderkey", "o_orderdate", "o_shippriority"]) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key=["revenue", "o_orderdate"], descending=[True, False]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q3", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index a930ba182bb6..47a863fcd6aa 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -737,6 +737,19 @@ timeout: 5400 script: python tpch/tpch_q1.py --sf 100 +- name: "tpch_q3_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q3.py --sf 100 + - name: "tpch_q6_{{scaling}}" python: "3.10" matrix: @@ -750,6 +763,32 @@ timeout: 5400 script: python tpch/tpch_q6.py --sf 100 +- name: "tpch_q10_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q10.py --sf 100 + +- name: "tpch_q18_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q18.py --sf 100 + ################################################# # Cross-AZ RPC fault tolerance test ################################################# From 116c9971683cc4aacd938f3734fb8d71fd387de0 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Tue, 3 Feb 2026 09:44:32 +0800 Subject: [PATCH 2/4] [tpch] Add customer, orders, and nation table definitions to common.py Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/common.py | 29 +++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index a872a6056767..2fdd15b6f4b2 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -26,7 +26,34 @@ "column13": "l_shipinstruct", "column14": "l_shipmode", "column15": "l_comment", - } + }, + "customer": { + "column0": "c_custkey", + "column1": "c_name", + "column2": "c_address", + "column3": "c_nationkey", + "column4": "c_phone", + "column5": "c_acctbal", + "column6": "c_mktsegment", + "column7": "c_comment", + }, + "orders": { + "column0": "o_orderkey", + "column1": "o_custkey", + "column2": "o_orderstatus", + "column3": "o_totalprice", + "column4": "o_orderdate", + "column5": "o_orderpriority", + "column6": "o_clerk", + "column7": "o_shippriority", + "column8": "o_comment", + }, + "nation": { + "column0": "n_nationkey", + "column1": "n_name", + "column2": "n_regionkey", + "column3": "n_comment", + }, } From 82f3d7a25a2aa16552cb9cd7db5870c89a157698 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Tue, 3 Feb 2026 11:43:22 +0800 Subject: [PATCH 3/4] [tpch] Update column keys in common.py for customer, orders, and nation tables Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/common.py | 42 ++++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index 2fdd15b6f4b2..58b3fde8ca9b 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -28,31 +28,31 @@ "column15": "l_comment", }, "customer": { - "column0": "c_custkey", - "column1": "c_name", - "column2": "c_address", - "column3": "c_nationkey", - "column4": "c_phone", - "column5": "c_acctbal", - "column6": "c_mktsegment", - "column7": "c_comment", + "column00": "c_custkey", + "column01": "c_name", + "column02": "c_address", + "column03": "c_nationkey", + "column04": "c_phone", + "column05": "c_acctbal", + "column06": "c_mktsegment", + "column07": "c_comment", }, "orders": { - "column0": "o_orderkey", - "column1": "o_custkey", - "column2": "o_orderstatus", - "column3": "o_totalprice", - "column4": "o_orderdate", - "column5": "o_orderpriority", - "column6": "o_clerk", - "column7": "o_shippriority", - "column8": "o_comment", + "column00": "o_orderkey", + "column01": "o_custkey", + "column02": "o_orderstatus", + "column03": "o_totalprice", + "column04": "o_orderdate", + "column05": "o_orderpriority", + "column06": "o_clerk", + "column07": "o_shippriority", + "column08": "o_comment", }, "nation": { - "column0": "n_nationkey", - "column1": "n_name", - "column2": "n_regionkey", - "column3": "n_comment", + "column00": "n_nationkey", + "column01": "n_name", + "column02": "n_regionkey", + "column03": "n_comment", }, } From 9f4cbd5c2b2851f773f62f210eb343409bdf9712 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Sat, 7 Feb 2026 14:21:18 +0800 Subject: [PATCH 4/4] [Refactor] Remove lazy import of pandas in data batch conversion functions Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/common.py | 46 ++++++++++--------- .../nightly_tests/dataset/tpch/tpch_q18.py | 6 +-- release/nightly_tests/dataset/tpch/tpch_q6.py | 6 ++- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index 58b3fde8ca9b..499c5dc349fe 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -27,32 +27,36 @@ "column14": "l_shipmode", "column15": "l_comment", }, + # customer, orders, nation use column0, column1, ... (no zero-padding) "customer": { - "column00": "c_custkey", - "column01": "c_name", - "column02": "c_address", - "column03": "c_nationkey", - "column04": "c_phone", - "column05": "c_acctbal", - "column06": "c_mktsegment", - "column07": "c_comment", + "column0": "c_custkey", + "column1": "c_name", + "column2": "c_address", + "column3": "c_nationkey", + "column4": "c_phone", + "column5": "c_acctbal", + "column6": "c_mktsegment", + "column7": "c_comment", + "column8": "c_comment_ext", }, "orders": { - "column00": "o_orderkey", - "column01": "o_custkey", - "column02": "o_orderstatus", - "column03": "o_totalprice", - "column04": "o_orderdate", - "column05": "o_orderpriority", - "column06": "o_clerk", - "column07": "o_shippriority", - "column08": "o_comment", + "column0": "o_orderkey", + "column1": "o_custkey", + "column2": "o_orderstatus", + "column3": "o_totalprice", + "column4": "o_orderdate", + "column5": "o_orderpriority", + "column6": "o_clerk", + "column7": "o_shippriority", + "column8": "o_comment", + "column9": "o_comment_ext", }, "nation": { - "column00": "n_nationkey", - "column01": "n_name", - "column02": "n_regionkey", - "column03": "n_comment", + "column0": "n_nationkey", + "column1": "n_name", + "column2": "n_regionkey", + "column3": "n_comment", + "column4": "n_comment_ext", }, } diff --git a/release/nightly_tests/dataset/tpch/tpch_q18.py b/release/nightly_tests/dataset/tpch/tpch_q18.py index eb5be680aa98..ebb24ce74d92 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q18.py +++ b/release/nightly_tests/dataset/tpch/tpch_q18.py @@ -12,8 +12,8 @@ def benchmark_fn(): orders = load_table("orders", args.sf) lineitem = load_table("lineitem", args.sf) - # Q18 parameters - quantity = 300 + # Q18 parameters (spec: [312..315]) + quantity = 312 # Calculate total quantity per order lineitem_quantity = lineitem.groupby("l_orderkey").aggregate( @@ -28,7 +28,7 @@ def benchmark_fn(): large_orders, join_type="inner", num_partitions=100, - on="l_orderkey", + on=("l_orderkey",), ) # Join with orders diff --git a/release/nightly_tests/dataset/tpch/tpch_q6.py b/release/nightly_tests/dataset/tpch/tpch_q6.py index 51b2244906f5..e8305a7ab1db 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q6.py +++ b/release/nightly_tests/dataset/tpch/tpch_q6.py @@ -32,8 +32,10 @@ def benchmark_fn(): ) # Aggregate - result = ds.aggregate(Sum(on="revenue", alias_name="revenue")) - return result + _ = ds.aggregate(Sum(on="revenue", alias_name="revenue")) + + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q6", benchmark_fn)