From 7ea156e34f902039d69c5e4739514e64de5b6f20 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 18:00:20 +0800 Subject: [PATCH 1/7] [Data] Add TPCH queries 5 to 9 for benchmarking Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/tpch_q5.py | 100 ++++++++++++++ release/nightly_tests/dataset/tpch/tpch_q7.py | 110 +++++++++++++++ release/nightly_tests/dataset/tpch/tpch_q8.py | 130 ++++++++++++++++++ release/nightly_tests/dataset/tpch/tpch_q9.py | 114 +++++++++++++++ release/release_data_tests.yaml | 52 +++++++ 5 files changed, 506 insertions(+) create mode 100644 release/nightly_tests/dataset/tpch/tpch_q5.py create mode 100644 release/nightly_tests/dataset/tpch/tpch_q7.py create mode 100644 release/nightly_tests/dataset/tpch/tpch_q8.py create mode 100644 release/nightly_tests/dataset/tpch/tpch_q9.py diff --git a/release/nightly_tests/dataset/tpch/tpch_q5.py b/release/nightly_tests/dataset/tpch/tpch_q5.py new file mode 100644 index 000000000000..bb36d37b0559 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q5.py @@ -0,0 +1,100 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + region = load_table("region", args.sf) + nation = load_table("nation", args.sf) + supplier = load_table("supplier", args.sf) + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q5 parameters + date = datetime(1994, 1, 1) + + # Filter region by name + region = region.filter(expr=col("r_name") == "ASIA") + + # Join region with nation + ds = region.join( + nation, + join_type="inner", + num_partitions=100, + on=("r_regionkey",), + right_on=("n_regionkey",), + ) + + # Join with supplier + ds = ds.join( + supplier, + join_type="inner", + num_partitions=100, + on=("n_nationkey",), + right_on=("s_nationkey",), + ) + + # Join with customer + ds = ds.join( + customer, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("c_nationkey",), + ) + + # Join with orders and filter by date + orders_filtered = orders.filter( + expr=( + (col("o_orderdate") >= date) + & (col("o_orderdate") < datetime(date.year + 1, date.month, date.day)) + ) + ) + ds = ds.join( + orders_filtered, + join_type="inner", + num_partitions=100, + on=("c_custkey",), + right_on=("o_custkey",), + ) + + # Join with lineitem + ds = ds.join( + lineitem, + join_type="inner", + num_partitions=100, + on=("o_orderkey",), + right_on=("l_orderkey",), + ) + + # Filter lineitem by supplier key + ds = ds.filter(expr=col("l_suppkey") == col("s_suppkey")) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by nation name + result = ( + ds.groupby("n_name") + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key="revenue", descending=True) + ) + + return result + + run_tpch_benchmark("tpch_q5", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q7.py b/release/nightly_tests/dataset/tpch/tpch_q7.py new file mode 100644 index 000000000000..0b4dfbdf1b88 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q7.py @@ -0,0 +1,110 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + supplier = load_table("supplier", args.sf) + lineitem = load_table("lineitem", args.sf) + orders = load_table("orders", args.sf) + customer = load_table("customer", args.sf) + nation = load_table("nation", args.sf) + + # Q7 parameters + date1 = datetime(1995, 1, 1) + date2 = datetime(1997, 1, 1) + nation1 = "FRANCE" + nation2 = "GERMANY" + + # Filter nations + nation_supp = nation.filter(expr=col("n_name") == nation1) + nation_cust = nation.filter(expr=col("n_name") == nation2) + + # Join supplier with nation (use suffix to distinguish) + supplier_nation = supplier.join( + nation_supp, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_supp", + ) + + # Join customer with nation (use suffix to distinguish) + customer_nation = customer.join( + nation_cust, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_cust", + ) + + # Join orders with customer + orders_customer = orders.join( + customer_nation, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join lineitem with orders and filter by date + lineitem_filtered = lineitem.filter( + expr=( + (col("l_shipdate") >= date1) + & (col("l_shipdate") < date2) + ) + ) + lineitem_orders = lineitem_filtered.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with supplier + ds = lineitem_orders.join( + supplier_nation, + join_type="inner", + num_partitions=100, + on=("l_suppkey",), + right_on=("s_suppkey",), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Extract year from shipdate + ds = ds.with_column( + "l_year", + col("l_shipdate").dt.year(), + ) + + # Aggregate by supplier nation, customer nation, and year + result = ( + ds.groupby(["n_name_supp", "n_name_cust", "l_year"]) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key=["n_name_supp", "n_name_cust", "l_year"]) + ) + + return result + + run_tpch_benchmark("tpch_q7", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q8.py b/release/nightly_tests/dataset/tpch/tpch_q8.py new file mode 100644 index 000000000000..d4dfb09f02f2 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q8.py @@ -0,0 +1,130 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + region = load_table("region", args.sf) + nation = load_table("nation", args.sf) + supplier = load_table("supplier", args.sf) + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + part = load_table("part", args.sf) + + # Q8 parameters + date1 = datetime(1995, 1, 1) + date2 = datetime(1997, 1, 1) + region_name = "AMERICA" + part_type = "ECONOMY ANODIZED STEEL" + + # Filter region + region_filtered = region.filter(expr=col("r_name") == region_name) + + # Join region with nation + nation_region = region_filtered.join( + nation, + join_type="inner", + num_partitions=100, + on=("r_regionkey",), + right_on=("n_regionkey",), + ) + + # Join customer with nation + customer_nation = customer.join( + nation_region, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + ) + + # Join supplier with nation (for supplier nation) + supplier_nation = supplier.join( + nation, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_supp", + ) + + # Filter part by type + part_filtered = part.filter(expr=col("p_type") == part_type) + + # Join orders with customer and filter by date + orders_filtered = orders.filter( + expr=( + (col("o_orderdate") >= date1) + & (col("o_orderdate") < date2) + ) + ) + orders_customer = orders_filtered.join( + customer_nation, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join lineitem with orders + lineitem_orders = lineitem.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with part + lineitem_part = lineitem_orders.join( + part_filtered, + join_type="inner", + num_partitions=100, + on=("l_partkey",), + right_on=("p_partkey",), + ) + + # Join with supplier + ds = lineitem_part.join( + supplier_nation, + join_type="inner", + num_partitions=100, + on=("l_suppkey",), + right_on=("s_suppkey",), + ) + + # Calculate volume + ds = ds.with_column( + "volume", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Extract year from orderdate + ds = ds.with_column( + "o_year", + col("o_orderdate").dt.year(), + ) + + # Aggregate by year and supplier nation + result = ( + ds.groupby(["o_year", "n_name_supp"]) + .aggregate(Sum(on="volume", alias_name="volume")) + .sort(key=["o_year", "n_name_supp"]) + ) + + return result + + run_tpch_benchmark("tpch_q8", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q9.py b/release/nightly_tests/dataset/tpch/tpch_q9.py new file mode 100644 index 000000000000..3e4609416e7b --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q9.py @@ -0,0 +1,114 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + part = load_table("part", args.sf) + supplier = load_table("supplier", args.sf) + partsupp = load_table("partsupp", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + nation = load_table("nation", args.sf) + + # Q9 parameters + date1 = datetime(1994, 1, 1) + date2 = datetime(1995, 1, 1) + part_name_pattern = "GREEN" + + # Filter part by name pattern + part_filtered = part.filter( + expr=col("p_name").str.contains(part_name_pattern) + ) + + # Join partsupp with supplier + partsupp_supplier = partsupp.join( + supplier, + join_type="inner", + num_partitions=100, + on=("ps_suppkey",), + right_on=("s_suppkey",), + ) + + # Join with part + partsupp_part = partsupp_supplier.join( + part_filtered, + join_type="inner", + num_partitions=100, + on=("ps_partkey",), + right_on=("p_partkey",), + ) + + # Join supplier with nation + partsupp_nation = partsupp_part.join( + nation, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + ) + + # Join orders with lineitem and filter by date + orders_filtered = orders.filter( + expr=( + (col("o_orderdate") >= date1) + & (col("o_orderdate") < date2) + ) + ) + lineitem_orders = lineitem.join( + orders_filtered, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join lineitem with partsupp + # Filter to ensure l_partkey matches ps_partkey and l_suppkey matches ps_suppkey + ds = lineitem_orders.join( + partsupp_nation, + join_type="inner", + num_partitions=100, + on=("l_partkey",), + right_on=("ps_partkey",), + ) + + # Filter by supplier key match + ds = ds.filter( + expr=(col("l_suppkey") == col("ps_suppkey")) + ) + + # Calculate profit + ds = ds.with_column( + "profit", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))) + - to_f64(col("ps_supplycost")) * to_f64(col("l_quantity")), + ) + + # Extract year from orderdate + ds = ds.with_column( + "o_year", + col("o_orderdate").dt.year(), + ) + + # Aggregate by nation and year + result = ( + ds.groupby(["n_name", "o_year"]) + .aggregate(Sum(on="profit", alias_name="profit")) + .sort(key=["n_name", "o_year"], descending=[False, True]) + ) + + return result + + run_tpch_benchmark("tpch_q9", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index a930ba182bb6..12f4be1c9737 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -737,6 +737,19 @@ timeout: 5400 script: python tpch/tpch_q1.py --sf 100 +- name: "tpch_q5_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q5.py --sf 100 + - name: "tpch_q6_{{scaling}}" python: "3.10" matrix: @@ -750,6 +763,45 @@ timeout: 5400 script: python tpch/tpch_q6.py --sf 100 +- name: "tpch_q7_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q7.py --sf 100 + +- name: "tpch_q8_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q8.py --sf 100 + +- name: "tpch_q9_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q9.py --sf 100 + ################################################# # Cross-AZ RPC fault tolerance test ################################################# From 03fe459e9d8b8d2c3639275ef9f44780c51b12a2 Mon Sep 17 00:00:00 2001 From: ZTE Ray Date: Mon, 2 Feb 2026 18:14:24 +0800 Subject: [PATCH 2/7] Update release/nightly_tests/dataset/tpch/tpch_q5.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: ZTE Ray --- release/nightly_tests/dataset/tpch/tpch_q5.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/tpch_q5.py b/release/nightly_tests/dataset/tpch/tpch_q5.py index bb36d37b0559..0f36ee460a6b 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q5.py +++ b/release/nightly_tests/dataset/tpch/tpch_q5.py @@ -64,18 +64,15 @@ def benchmark_fn(): right_on=("o_custkey",), ) - # Join with lineitem + # Join with lineitem on order key and supplier key ds = ds.join( lineitem, join_type="inner", num_partitions=100, - on=("o_orderkey",), - right_on=("l_orderkey",), + on=["o_orderkey", "s_suppkey"], + right_on=["l_orderkey", "l_suppkey"], ) - # Filter lineitem by supplier key - ds = ds.filter(expr=col("l_suppkey") == col("s_suppkey")) - # Calculate revenue ds = ds.with_column( "revenue", From 4d09ca539a4c681f249cd3648f5653c20f99577c Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 18:37:05 +0800 Subject: [PATCH 3/7] gemini-code-assist suggestion Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/tpch_q7.py | 20 ++++---- release/nightly_tests/dataset/tpch/tpch_q8.py | 47 +++++++++++++++---- release/nightly_tests/dataset/tpch/tpch_q9.py | 32 ++++--------- 3 files changed, 57 insertions(+), 42 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/tpch_q7.py b/release/nightly_tests/dataset/tpch/tpch_q7.py index 0b4dfbdf1b88..a9e09cdc1dfd 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q7.py +++ b/release/nightly_tests/dataset/tpch/tpch_q7.py @@ -21,13 +21,14 @@ def benchmark_fn(): nation1 = "FRANCE" nation2 = "GERMANY" - # Filter nations - nation_supp = nation.filter(expr=col("n_name") == nation1) - nation_cust = nation.filter(expr=col("n_name") == nation2) + # Filter nations of interest (both directions: nation1 <-> nation2) + nations_of_interest = nation.filter( + expr=(col("n_name") == nation1) | (col("n_name") == nation2) + ) # Join supplier with nation (use suffix to distinguish) supplier_nation = supplier.join( - nation_supp, + nations_of_interest, join_type="inner", num_partitions=100, on=("s_nationkey",), @@ -38,7 +39,7 @@ def benchmark_fn(): # Join customer with nation (use suffix to distinguish) customer_nation = customer.join( - nation_cust, + nations_of_interest, join_type="inner", num_partitions=100, on=("c_nationkey",), @@ -58,10 +59,7 @@ def benchmark_fn(): # Join lineitem with orders and filter by date lineitem_filtered = lineitem.filter( - expr=( - (col("l_shipdate") >= date1) - & (col("l_shipdate") < date2) - ) + expr=((col("l_shipdate") >= date1) & (col("l_shipdate") < date2)) ) lineitem_orders = lineitem_filtered.join( orders_customer, @@ -80,6 +78,10 @@ def benchmark_fn(): right_on=("s_suppkey",), ) + # Filter to ensure we only include shipments between the two nations + # (exclude shipments within the same nation) + ds = ds.filter(expr=(col("n_name_supp") != col("n_name_cust"))) + # Calculate revenue ds = ds.with_column( "revenue", diff --git a/release/nightly_tests/dataset/tpch/tpch_q8.py b/release/nightly_tests/dataset/tpch/tpch_q8.py index d4dfb09f02f2..01decce08025 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q8.py +++ b/release/nightly_tests/dataset/tpch/tpch_q8.py @@ -45,8 +45,9 @@ def benchmark_fn(): ) # Join supplier with nation (for supplier nation) + # Note: For Q8, suppliers should also be in the same region supplier_nation = supplier.join( - nation, + nation_region, join_type="inner", num_partitions=100, on=("s_nationkey",), @@ -60,10 +61,7 @@ def benchmark_fn(): # Join orders with customer and filter by date orders_filtered = orders.filter( - expr=( - (col("o_orderdate") >= date1) - & (col("o_orderdate") < date2) - ) + expr=((col("o_orderdate") >= date1) & (col("o_orderdate") < date2)) ) orders_customer = orders_filtered.join( customer_nation, @@ -112,11 +110,42 @@ def benchmark_fn(): col("o_orderdate").dt.year(), ) - # Aggregate by year and supplier nation + # For Q8, we need to calculate market share per supplier nation per year + # Market share = (volume for a specific supplier nation) / (total volume in region) + + # First, calculate total volume per year (all suppliers in the region) + total_by_year = ds.groupby("o_year").aggregate( + Sum(on="volume", alias_name="total_volume") + ) + + # Calculate volume per supplier nation per year + supplier_by_year = ds.groupby(["o_year", "n_name_supp"]).aggregate( + Sum(on="volume", alias_name="supplier_volume") + ) + + # Join to get total volume for each year + result = supplier_by_year.join( + total_by_year, + join_type="inner", + num_partitions=100, + on=("o_year",), + ) + + # Calculate market share for each supplier nation + result = result.with_column( + "mkt_share", + col("supplier_volume") / col("total_volume"), + ) + + # For Q8, we aggregate market share by year (sum of all supplier nations' market shares per year) + # Or we might need to select a specific supplier nation - let's aggregate by year + # Actually, Q8 typically outputs one market share value per year for the region + # So we sum the market shares (which should equal 1.0) or we might need different logic + # Based on the comment, output should be (o_year, mkt_share), so let's aggregate result = ( - ds.groupby(["o_year", "n_name_supp"]) - .aggregate(Sum(on="volume", alias_name="volume")) - .sort(key=["o_year", "n_name_supp"]) + result.groupby("o_year") + .aggregate(Sum(on="mkt_share", alias_name="mkt_share")) + .sort(key="o_year") ) return result diff --git a/release/nightly_tests/dataset/tpch/tpch_q9.py b/release/nightly_tests/dataset/tpch/tpch_q9.py index 3e4609416e7b..68865ec70f13 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q9.py +++ b/release/nightly_tests/dataset/tpch/tpch_q9.py @@ -6,8 +6,6 @@ def main(args): def benchmark_fn(): - from datetime import datetime - # Load all required tables part = load_table("part", args.sf) supplier = load_table("supplier", args.sf) @@ -17,14 +15,10 @@ def benchmark_fn(): nation = load_table("nation", args.sf) # Q9 parameters - date1 = datetime(1994, 1, 1) - date2 = datetime(1995, 1, 1) part_name_pattern = "GREEN" # Filter part by name pattern - part_filtered = part.filter( - expr=col("p_name").str.contains(part_name_pattern) - ) + part_filtered = part.filter(expr=col("p_name").str.contains(part_name_pattern)) # Join partsupp with supplier partsupp_supplier = partsupp.join( @@ -53,34 +47,24 @@ def benchmark_fn(): right_on=("n_nationkey",), ) - # Join orders with lineitem and filter by date - orders_filtered = orders.filter( - expr=( - (col("o_orderdate") >= date1) - & (col("o_orderdate") < date2) - ) - ) + # Join orders with lineitem + # Note: TPC-H Q9 processes all orders and groups by year, no date filter lineitem_orders = lineitem.join( - orders_filtered, + orders, join_type="inner", num_partitions=100, on=("l_orderkey",), right_on=("o_orderkey",), ) - # Join lineitem with partsupp - # Filter to ensure l_partkey matches ps_partkey and l_suppkey matches ps_suppkey + # Join lineitem with partsupp on part key and supplier key + # Using multi-key join is more efficient than join + filter ds = lineitem_orders.join( partsupp_nation, join_type="inner", num_partitions=100, - on=("l_partkey",), - right_on=("ps_partkey",), - ) - - # Filter by supplier key match - ds = ds.filter( - expr=(col("l_suppkey") == col("ps_suppkey")) + on=("l_partkey", "l_suppkey"), + right_on=("ps_partkey", "ps_suppkey"), ) # Calculate profit From 036e722833dc2cb1eb19cbe16d0996d8d7e5f3db Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 18:56:07 +0800 Subject: [PATCH 4/7] cursor suggestion Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/tpch_q5.py | 6 ++- release/nightly_tests/dataset/tpch/tpch_q7.py | 6 ++- release/nightly_tests/dataset/tpch/tpch_q8.py | 50 +++++++++++-------- release/nightly_tests/dataset/tpch/tpch_q9.py | 6 ++- 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/tpch_q5.py b/release/nightly_tests/dataset/tpch/tpch_q5.py index 0f36ee460a6b..8f567ebae92d 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q5.py +++ b/release/nightly_tests/dataset/tpch/tpch_q5.py @@ -80,13 +80,15 @@ def benchmark_fn(): ) # Aggregate by nation name - result = ( + _ = ( ds.groupby("n_name") .aggregate(Sum(on="revenue", alias_name="revenue")) .sort(key="revenue", descending=True) + .materialize() ) - return result + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q5", benchmark_fn) diff --git a/release/nightly_tests/dataset/tpch/tpch_q7.py b/release/nightly_tests/dataset/tpch/tpch_q7.py index a9e09cdc1dfd..9384b9edade6 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q7.py +++ b/release/nightly_tests/dataset/tpch/tpch_q7.py @@ -95,13 +95,15 @@ def benchmark_fn(): ) # Aggregate by supplier nation, customer nation, and year - result = ( + _ = ( ds.groupby(["n_name_supp", "n_name_cust", "l_year"]) .aggregate(Sum(on="revenue", alias_name="revenue")) .sort(key=["n_name_supp", "n_name_cust", "l_year"]) + .materialize() ) - return result + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q7", benchmark_fn) diff --git a/release/nightly_tests/dataset/tpch/tpch_q8.py b/release/nightly_tests/dataset/tpch/tpch_q8.py index 01decce08025..7d464b81423d 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q8.py +++ b/release/nightly_tests/dataset/tpch/tpch_q8.py @@ -22,6 +22,7 @@ def benchmark_fn(): date2 = datetime(1997, 1, 1) region_name = "AMERICA" part_type = "ECONOMY ANODIZED STEEL" + nation_name = "BRAZIL" # Specific nation for market share calculation # Filter region region_filtered = region.filter(expr=col("r_name") == region_name) @@ -45,9 +46,9 @@ def benchmark_fn(): ) # Join supplier with nation (for supplier nation) - # Note: For Q8, suppliers should also be in the same region + # Note: For Q8, suppliers can be from ALL nations, not just the region supplier_nation = supplier.join( - nation_region, + nation, join_type="inner", num_partitions=100, on=("s_nationkey",), @@ -110,45 +111,54 @@ def benchmark_fn(): col("o_orderdate").dt.year(), ) - # For Q8, we need to calculate market share per supplier nation per year - # Market share = (volume for a specific supplier nation) / (total volume in region) + # For Q8, we need to calculate market share for a specific nation (e.g., BRAZIL) + # Market share = (volume from specific nation suppliers) / (total volume in region) + + # Create a conditional column for volume from the specific nation + # Use conditional expression: if n_name_supp == nation_name then volume else 0 + # Convert boolean to numeric: True -> 1, False -> 0, then multiply by volume + ds = ds.with_column( + "is_nation", + to_f64((col("n_name_supp") == nation_name)), + ) + ds = ds.with_column( + "nation_volume", + col("is_nation") * col("volume"), + ) - # First, calculate total volume per year (all suppliers in the region) + # Calculate total volume per year (all suppliers, for customers in the region) total_by_year = ds.groupby("o_year").aggregate( Sum(on="volume", alias_name="total_volume") ) - # Calculate volume per supplier nation per year - supplier_by_year = ds.groupby(["o_year", "n_name_supp"]).aggregate( - Sum(on="volume", alias_name="supplier_volume") + # Calculate volume from specific nation per year (conditional sum) + nation_by_year = ds.groupby("o_year").aggregate( + Sum(on="nation_volume", alias_name="nation_volume") ) # Join to get total volume for each year - result = supplier_by_year.join( + result = nation_by_year.join( total_by_year, join_type="inner", num_partitions=100, on=("o_year",), ) - # Calculate market share for each supplier nation + # Calculate market share for the specific nation result = result.with_column( "mkt_share", - col("supplier_volume") / col("total_volume"), + col("nation_volume") / col("total_volume"), ) - # For Q8, we aggregate market share by year (sum of all supplier nations' market shares per year) - # Or we might need to select a specific supplier nation - let's aggregate by year - # Actually, Q8 typically outputs one market share value per year for the region - # So we sum the market shares (which should equal 1.0) or we might need different logic - # Based on the comment, output should be (o_year, mkt_share), so let's aggregate - result = ( - result.groupby("o_year") - .aggregate(Sum(on="mkt_share", alias_name="mkt_share")) + # Select and sort by year + _ = ( + result.select_columns(["o_year", "mkt_share"]) .sort(key="o_year") + .materialize() ) - return result + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q8", benchmark_fn) diff --git a/release/nightly_tests/dataset/tpch/tpch_q9.py b/release/nightly_tests/dataset/tpch/tpch_q9.py index 68865ec70f13..013a398eb959 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q9.py +++ b/release/nightly_tests/dataset/tpch/tpch_q9.py @@ -81,13 +81,15 @@ def benchmark_fn(): ) # Aggregate by nation and year - result = ( + _ = ( ds.groupby(["n_name", "o_year"]) .aggregate(Sum(on="profit", alias_name="profit")) .sort(key=["n_name", "o_year"], descending=[False, True]) + .materialize() ) - return result + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q9", benchmark_fn) From 37da62978da3565c3c34a66d8c8bacb7798962ed Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 19:58:52 +0800 Subject: [PATCH 5/7] Refactor join parameters in tpch_q5.py to use tuples instead of lists for improved clarity and consistency. Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/tpch_q5.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/release/nightly_tests/dataset/tpch/tpch_q5.py b/release/nightly_tests/dataset/tpch/tpch_q5.py index 8f567ebae92d..a9f4eb1b8e71 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q5.py +++ b/release/nightly_tests/dataset/tpch/tpch_q5.py @@ -69,8 +69,8 @@ def benchmark_fn(): lineitem, join_type="inner", num_partitions=100, - on=["o_orderkey", "s_suppkey"], - right_on=["l_orderkey", "l_suppkey"], + on=("o_orderkey", "s_suppkey"), + right_on=("l_orderkey", "l_suppkey"), ) # Calculate revenue From 9d4addd47146029c8923ad8b4bbf96c533aab2db Mon Sep 17 00:00:00 2001 From: daiping8 Date: Mon, 2 Feb 2026 20:27:24 +0800 Subject: [PATCH 6/7] 1 Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/tpch_q9.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/dataset/tpch/tpch_q9.py b/release/nightly_tests/dataset/tpch/tpch_q9.py index 013a398eb959..b7677195612c 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q9.py +++ b/release/nightly_tests/dataset/tpch/tpch_q9.py @@ -15,7 +15,7 @@ def benchmark_fn(): nation = load_table("nation", args.sf) # Q9 parameters - part_name_pattern = "GREEN" + part_name_pattern = "green" # Filter part by name pattern part_filtered = part.filter(expr=col("p_name").str.contains(part_name_pattern)) From 66560ab257b441151903e41c1f8ebc78967c62f6 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Sat, 7 Feb 2026 13:51:57 +0800 Subject: [PATCH 7/7] [tpch] Extend TABLE_COLUMNS with additional TPC-H schemas for region, nation, supplier, customer, orders, part, and partsupp Signed-off-by: daiping8 --- release/nightly_tests/dataset/tpch/common.py | 64 +++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index a872a6056767..d877ebb07cfb 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -8,7 +8,69 @@ from benchmark import Benchmark # Define schemas for TPC-H tables +# Parquet datasets use column0, column1, ... (or column00, column01 for lineitem) TABLE_COLUMNS = { + "region": { + "column0": "r_regionkey", + "column1": "r_name", + "column2": "r_comment", + "column3": "r_dummy", + }, + "nation": { + "column0": "n_nationkey", + "column1": "n_name", + "column2": "n_regionkey", + "column3": "n_comment", + }, + "supplier": { + "column0": "s_suppkey", + "column1": "s_name", + "column2": "s_address", + "column3": "s_nationkey", + "column4": "s_phone", + "column5": "s_acctbal", + "column6": "s_comment", + "column7": "s_dummy", + }, + "customer": { + "column0": "c_custkey", + "column1": "c_name", + "column2": "c_address", + "column3": "c_nationkey", + "column4": "c_phone", + "column5": "c_acctbal", + "column6": "c_mktsegment", + "column7": "c_comment", + }, + "orders": { + "column0": "o_orderkey", + "column1": "o_custkey", + "column2": "o_orderstatus", + "column3": "o_totalprice", + "column4": "o_orderdate", + "column5": "o_orderpriority", + "column6": "o_clerk", + "column7": "o_shippriority", + "column8": "o_comment", + }, + "part": { + "column0": "p_partkey", + "column1": "p_name", + "column2": "p_mfgr", + "column3": "p_brand", + "column4": "p_type", + "column5": "p_size", + "column6": "p_container", + "column7": "p_retailprice", + "column8": "p_comment", + }, + "partsupp": { + "column0": "ps_partkey", + "column1": "ps_suppkey", + "column2": "ps_availqty", + "column3": "ps_supplycost", + "column4": "ps_comment", + }, "lineitem": { "column00": "l_orderkey", "column01": "l_partkey", @@ -26,7 +88,7 @@ "column13": "l_shipinstruct", "column14": "l_shipmode", "column15": "l_comment", - } + }, }