diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index a872a6056767..d877ebb07cfb 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -8,7 +8,69 @@ from benchmark import Benchmark # Define schemas for TPC-H tables +# Parquet datasets use column0, column1, ... (or column00, column01 for lineitem) TABLE_COLUMNS = { + "region": { + "column0": "r_regionkey", + "column1": "r_name", + "column2": "r_comment", + "column3": "r_dummy", + }, + "nation": { + "column0": "n_nationkey", + "column1": "n_name", + "column2": "n_regionkey", + "column3": "n_comment", + }, + "supplier": { + "column0": "s_suppkey", + "column1": "s_name", + "column2": "s_address", + "column3": "s_nationkey", + "column4": "s_phone", + "column5": "s_acctbal", + "column6": "s_comment", + "column7": "s_dummy", + }, + "customer": { + "column0": "c_custkey", + "column1": "c_name", + "column2": "c_address", + "column3": "c_nationkey", + "column4": "c_phone", + "column5": "c_acctbal", + "column6": "c_mktsegment", + "column7": "c_comment", + }, + "orders": { + "column0": "o_orderkey", + "column1": "o_custkey", + "column2": "o_orderstatus", + "column3": "o_totalprice", + "column4": "o_orderdate", + "column5": "o_orderpriority", + "column6": "o_clerk", + "column7": "o_shippriority", + "column8": "o_comment", + }, + "part": { + "column0": "p_partkey", + "column1": "p_name", + "column2": "p_mfgr", + "column3": "p_brand", + "column4": "p_type", + "column5": "p_size", + "column6": "p_container", + "column7": "p_retailprice", + "column8": "p_comment", + }, + "partsupp": { + "column0": "ps_partkey", + "column1": "ps_suppkey", + "column2": "ps_availqty", + "column3": "ps_supplycost", + "column4": "ps_comment", + }, "lineitem": { "column00": "l_orderkey", "column01": "l_partkey", @@ -26,7 +88,7 @@ "column13": "l_shipinstruct", "column14": "l_shipmode", "column15": "l_comment", - } + }, } diff --git a/release/nightly_tests/dataset/tpch/tpch_q5.py b/release/nightly_tests/dataset/tpch/tpch_q5.py new file mode 100644 index 000000000000..a9f4eb1b8e71 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q5.py @@ -0,0 +1,99 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + region = load_table("region", args.sf) + nation = load_table("nation", args.sf) + supplier = load_table("supplier", args.sf) + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q5 parameters + date = datetime(1994, 1, 1) + + # Filter region by name + region = region.filter(expr=col("r_name") == "ASIA") + + # Join region with nation + ds = region.join( + nation, + join_type="inner", + num_partitions=100, + on=("r_regionkey",), + right_on=("n_regionkey",), + ) + + # Join with supplier + ds = ds.join( + supplier, + join_type="inner", + num_partitions=100, + on=("n_nationkey",), + right_on=("s_nationkey",), + ) + + # Join with customer + ds = ds.join( + customer, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("c_nationkey",), + ) + + # Join with orders and filter by date + orders_filtered = orders.filter( + expr=( + (col("o_orderdate") >= date) + & (col("o_orderdate") < datetime(date.year + 1, date.month, date.day)) + ) + ) + ds = ds.join( + orders_filtered, + join_type="inner", + num_partitions=100, + on=("c_custkey",), + right_on=("o_custkey",), + ) + + # Join with lineitem on order key and supplier key + ds = ds.join( + lineitem, + join_type="inner", + num_partitions=100, + on=("o_orderkey", "s_suppkey"), + right_on=("l_orderkey", "l_suppkey"), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by nation name + _ = ( + ds.groupby("n_name") + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key="revenue", descending=True) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q5", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q7.py b/release/nightly_tests/dataset/tpch/tpch_q7.py new file mode 100644 index 000000000000..9384b9edade6 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q7.py @@ -0,0 +1,114 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + supplier = load_table("supplier", args.sf) + lineitem = load_table("lineitem", args.sf) + orders = load_table("orders", args.sf) + customer = load_table("customer", args.sf) + nation = load_table("nation", args.sf) + + # Q7 parameters + date1 = datetime(1995, 1, 1) + date2 = datetime(1997, 1, 1) + nation1 = "FRANCE" + nation2 = "GERMANY" + + # Filter nations of interest (both directions: nation1 <-> nation2) + nations_of_interest = nation.filter( + expr=(col("n_name") == nation1) | (col("n_name") == nation2) + ) + + # Join supplier with nation (use suffix to distinguish) + supplier_nation = supplier.join( + nations_of_interest, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_supp", + ) + + # Join customer with nation (use suffix to distinguish) + customer_nation = customer.join( + nations_of_interest, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_cust", + ) + + # Join orders with customer + orders_customer = orders.join( + customer_nation, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join lineitem with orders and filter by date + lineitem_filtered = lineitem.filter( + expr=((col("l_shipdate") >= date1) & (col("l_shipdate") < date2)) + ) + lineitem_orders = lineitem_filtered.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with supplier + ds = lineitem_orders.join( + supplier_nation, + join_type="inner", + num_partitions=100, + on=("l_suppkey",), + right_on=("s_suppkey",), + ) + + # Filter to ensure we only include shipments between the two nations + # (exclude shipments within the same nation) + ds = ds.filter(expr=(col("n_name_supp") != col("n_name_cust"))) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Extract year from shipdate + ds = ds.with_column( + "l_year", + col("l_shipdate").dt.year(), + ) + + # Aggregate by supplier nation, customer nation, and year + _ = ( + ds.groupby(["n_name_supp", "n_name_cust", "l_year"]) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key=["n_name_supp", "n_name_cust", "l_year"]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q7", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q8.py b/release/nightly_tests/dataset/tpch/tpch_q8.py new file mode 100644 index 000000000000..7d464b81423d --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q8.py @@ -0,0 +1,169 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + region = load_table("region", args.sf) + nation = load_table("nation", args.sf) + supplier = load_table("supplier", args.sf) + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + part = load_table("part", args.sf) + + # Q8 parameters + date1 = datetime(1995, 1, 1) + date2 = datetime(1997, 1, 1) + region_name = "AMERICA" + part_type = "ECONOMY ANODIZED STEEL" + nation_name = "BRAZIL" # Specific nation for market share calculation + + # Filter region + region_filtered = region.filter(expr=col("r_name") == region_name) + + # Join region with nation + nation_region = region_filtered.join( + nation, + join_type="inner", + num_partitions=100, + on=("r_regionkey",), + right_on=("n_regionkey",), + ) + + # Join customer with nation + customer_nation = customer.join( + nation_region, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + ) + + # Join supplier with nation (for supplier nation) + # Note: For Q8, suppliers can be from ALL nations, not just the region + supplier_nation = supplier.join( + nation, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + left_suffix="", + right_suffix="_supp", + ) + + # Filter part by type + part_filtered = part.filter(expr=col("p_type") == part_type) + + # Join orders with customer and filter by date + orders_filtered = orders.filter( + expr=((col("o_orderdate") >= date1) & (col("o_orderdate") < date2)) + ) + orders_customer = orders_filtered.join( + customer_nation, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join lineitem with orders + lineitem_orders = lineitem.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with part + lineitem_part = lineitem_orders.join( + part_filtered, + join_type="inner", + num_partitions=100, + on=("l_partkey",), + right_on=("p_partkey",), + ) + + # Join with supplier + ds = lineitem_part.join( + supplier_nation, + join_type="inner", + num_partitions=100, + on=("l_suppkey",), + right_on=("s_suppkey",), + ) + + # Calculate volume + ds = ds.with_column( + "volume", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Extract year from orderdate + ds = ds.with_column( + "o_year", + col("o_orderdate").dt.year(), + ) + + # For Q8, we need to calculate market share for a specific nation (e.g., BRAZIL) + # Market share = (volume from specific nation suppliers) / (total volume in region) + + # Create a conditional column for volume from the specific nation + # Use conditional expression: if n_name_supp == nation_name then volume else 0 + # Convert boolean to numeric: True -> 1, False -> 0, then multiply by volume + ds = ds.with_column( + "is_nation", + to_f64((col("n_name_supp") == nation_name)), + ) + ds = ds.with_column( + "nation_volume", + col("is_nation") * col("volume"), + ) + + # Calculate total volume per year (all suppliers, for customers in the region) + total_by_year = ds.groupby("o_year").aggregate( + Sum(on="volume", alias_name="total_volume") + ) + + # Calculate volume from specific nation per year (conditional sum) + nation_by_year = ds.groupby("o_year").aggregate( + Sum(on="nation_volume", alias_name="nation_volume") + ) + + # Join to get total volume for each year + result = nation_by_year.join( + total_by_year, + join_type="inner", + num_partitions=100, + on=("o_year",), + ) + + # Calculate market share for the specific nation + result = result.with_column( + "mkt_share", + col("nation_volume") / col("total_volume"), + ) + + # Select and sort by year + _ = ( + result.select_columns(["o_year", "mkt_share"]) + .sort(key="o_year") + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q8", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q9.py b/release/nightly_tests/dataset/tpch/tpch_q9.py new file mode 100644 index 000000000000..b7677195612c --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q9.py @@ -0,0 +1,100 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + # Load all required tables + part = load_table("part", args.sf) + supplier = load_table("supplier", args.sf) + partsupp = load_table("partsupp", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + nation = load_table("nation", args.sf) + + # Q9 parameters + part_name_pattern = "green" + + # Filter part by name pattern + part_filtered = part.filter(expr=col("p_name").str.contains(part_name_pattern)) + + # Join partsupp with supplier + partsupp_supplier = partsupp.join( + supplier, + join_type="inner", + num_partitions=100, + on=("ps_suppkey",), + right_on=("s_suppkey",), + ) + + # Join with part + partsupp_part = partsupp_supplier.join( + part_filtered, + join_type="inner", + num_partitions=100, + on=("ps_partkey",), + right_on=("p_partkey",), + ) + + # Join supplier with nation + partsupp_nation = partsupp_part.join( + nation, + join_type="inner", + num_partitions=100, + on=("s_nationkey",), + right_on=("n_nationkey",), + ) + + # Join orders with lineitem + # Note: TPC-H Q9 processes all orders and groups by year, no date filter + lineitem_orders = lineitem.join( + orders, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join lineitem with partsupp on part key and supplier key + # Using multi-key join is more efficient than join + filter + ds = lineitem_orders.join( + partsupp_nation, + join_type="inner", + num_partitions=100, + on=("l_partkey", "l_suppkey"), + right_on=("ps_partkey", "ps_suppkey"), + ) + + # Calculate profit + ds = ds.with_column( + "profit", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))) + - to_f64(col("ps_supplycost")) * to_f64(col("l_quantity")), + ) + + # Extract year from orderdate + ds = ds.with_column( + "o_year", + col("o_orderdate").dt.year(), + ) + + # Aggregate by nation and year + _ = ( + ds.groupby(["n_name", "o_year"]) + .aggregate(Sum(on="profit", alias_name="profit")) + .sort(key=["n_name", "o_year"], descending=[False, True]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q9", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index a930ba182bb6..12f4be1c9737 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -737,6 +737,19 @@ timeout: 5400 script: python tpch/tpch_q1.py --sf 100 +- name: "tpch_q5_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q5.py --sf 100 + - name: "tpch_q6_{{scaling}}" python: "3.10" matrix: @@ -750,6 +763,45 @@ timeout: 5400 script: python tpch/tpch_q6.py --sf 100 +- name: "tpch_q7_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q7.py --sf 100 + +- name: "tpch_q8_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q8.py --sf 100 + +- name: "tpch_q9_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q9.py --sf 100 + ################################################# # Cross-AZ RPC fault tolerance test #################################################