diff --git a/release/nightly_tests/dataset/tpch/common.py b/release/nightly_tests/dataset/tpch/common.py index a872a6056767..499c5dc349fe 100644 --- a/release/nightly_tests/dataset/tpch/common.py +++ b/release/nightly_tests/dataset/tpch/common.py @@ -26,7 +26,38 @@ "column13": "l_shipinstruct", "column14": "l_shipmode", "column15": "l_comment", - } + }, + # customer, orders, nation use column0, column1, ... (no zero-padding) + "customer": { + "column0": "c_custkey", + "column1": "c_name", + "column2": "c_address", + "column3": "c_nationkey", + "column4": "c_phone", + "column5": "c_acctbal", + "column6": "c_mktsegment", + "column7": "c_comment", + "column8": "c_comment_ext", + }, + "orders": { + "column0": "o_orderkey", + "column1": "o_custkey", + "column2": "o_orderstatus", + "column3": "o_totalprice", + "column4": "o_orderdate", + "column5": "o_orderpriority", + "column6": "o_clerk", + "column7": "o_shippriority", + "column8": "o_comment", + "column9": "o_comment_ext", + }, + "nation": { + "column0": "n_nationkey", + "column1": "n_name", + "column2": "n_regionkey", + "column3": "n_comment", + "column4": "n_comment_ext", + }, } diff --git a/release/nightly_tests/dataset/tpch/tpch_q10.py b/release/nightly_tests/dataset/tpch/tpch_q10.py new file mode 100644 index 000000000000..77fb710ab5dd --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q10.py @@ -0,0 +1,93 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + nation = load_table("nation", args.sf) + + # Q10 parameters + date = datetime(1993, 10, 1) + # Calculate end date (3 months later) + if date.month <= 9: + end_date = datetime(date.year, date.month + 3, date.day) + else: + end_date = datetime(date.year + 1, date.month + 3 - 12, date.day) + + # Filter orders by date (3 months range) + orders_filtered = orders.filter( + expr=((col("o_orderdate") >= date) & (col("o_orderdate") < end_date)) + ) + + # Filter lineitem by return flag + lineitem_filtered = lineitem.filter(expr=col("l_returnflag") == "R") + + # Join lineitem with orders + lineitem_orders = lineitem_filtered.join( + orders_filtered, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with customer + ds = lineitem_orders.join( + customer, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join with nation + ds = ds.join( + nation, + join_type="inner", + num_partitions=100, + on=("c_nationkey",), + right_on=("n_nationkey",), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by customer key, customer name, address, phone, account balance, nation, and region + _ = ( + ds.groupby( + [ + "c_custkey", + "c_name", + "c_acctbal", + "n_name", + "c_address", + "c_phone", + "c_comment", + ] + ) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key="revenue", descending=True) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q10", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q18.py b/release/nightly_tests/dataset/tpch/tpch_q18.py new file mode 100644 index 000000000000..ebb24ce74d92 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q18.py @@ -0,0 +1,71 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q18 parameters (spec: [312..315]) + quantity = 312 + + # Calculate total quantity per order + lineitem_quantity = lineitem.groupby("l_orderkey").aggregate( + Sum(on="l_quantity", alias_name="total_quantity") + ) + + # Filter orders with total quantity > threshold + large_orders = lineitem_quantity.filter(expr=col("total_quantity") > quantity) + + # Join lineitem with large orders + lineitem_large = lineitem.join( + large_orders, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + ) + + # Join with orders + lineitem_orders = lineitem_large.join( + orders, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Join with customer + ds = lineitem_orders.join( + customer, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Aggregate by customer name, customer key, order key, and order date + _ = ( + ds.groupby( + ["c_name", "c_custkey", "o_orderkey", "o_orderdate", "o_totalprice"] + ) + .aggregate(Sum(on="l_quantity", alias_name="sum_quantity")) + .sort(key=["o_totalprice", "o_orderdate"], descending=[True, False]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q18", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q3.py b/release/nightly_tests/dataset/tpch/tpch_q3.py new file mode 100644 index 000000000000..dfe1b6869077 --- /dev/null +++ b/release/nightly_tests/dataset/tpch/tpch_q3.py @@ -0,0 +1,68 @@ +import ray +from ray.data.aggregate import Sum +from ray.data.expressions import col +from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark + + +def main(args): + def benchmark_fn(): + from datetime import datetime + + # Load all required tables + customer = load_table("customer", args.sf) + orders = load_table("orders", args.sf) + lineitem = load_table("lineitem", args.sf) + + # Q3 parameters + date = datetime(1995, 3, 15) + segment = "BUILDING" + + # Filter customer by segment + customer_filtered = customer.filter(expr=col("c_mktsegment") == segment) + + # Filter orders by date + orders_filtered = orders.filter(expr=col("o_orderdate") < date) + + # Join orders with customer + orders_customer = orders_filtered.join( + customer_filtered, + join_type="inner", + num_partitions=100, + on=("o_custkey",), + right_on=("c_custkey",), + ) + + # Join with lineitem and filter by ship date + lineitem_filtered = lineitem.filter(expr=col("l_shipdate") > date) + ds = lineitem_filtered.join( + orders_customer, + join_type="inner", + num_partitions=100, + on=("l_orderkey",), + right_on=("o_orderkey",), + ) + + # Calculate revenue + ds = ds.with_column( + "revenue", + to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))), + ) + + # Aggregate by order key, order date, and ship priority + _ = ( + ds.groupby(["l_orderkey", "o_orderdate", "o_shippriority"]) + .aggregate(Sum(on="revenue", alias_name="revenue")) + .sort(key=["revenue", "o_orderdate"], descending=[True, False]) + .materialize() + ) + + # Report arguments for the benchmark. + return vars(args) + + run_tpch_benchmark("tpch_q3", benchmark_fn) + + +if __name__ == "__main__": + ray.init() + args = parse_tpch_args() + main(args) diff --git a/release/nightly_tests/dataset/tpch/tpch_q6.py b/release/nightly_tests/dataset/tpch/tpch_q6.py index 51b2244906f5..e8305a7ab1db 100644 --- a/release/nightly_tests/dataset/tpch/tpch_q6.py +++ b/release/nightly_tests/dataset/tpch/tpch_q6.py @@ -32,8 +32,10 @@ def benchmark_fn(): ) # Aggregate - result = ds.aggregate(Sum(on="revenue", alias_name="revenue")) - return result + _ = ds.aggregate(Sum(on="revenue", alias_name="revenue")) + + # Report arguments for the benchmark. + return vars(args) run_tpch_benchmark("tpch_q6", benchmark_fn) diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index a930ba182bb6..47a863fcd6aa 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -737,6 +737,19 @@ timeout: 5400 script: python tpch/tpch_q1.py --sf 100 +- name: "tpch_q3_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q3.py --sf 100 + - name: "tpch_q6_{{scaling}}" python: "3.10" matrix: @@ -750,6 +763,32 @@ timeout: 5400 script: python tpch/tpch_q6.py --sf 100 +- name: "tpch_q10_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q10.py --sf 100 + +- name: "tpch_q18_{{scaling}}" + python: "3.10" + matrix: + setup: + scaling: [fixed_size, autoscaling] + + cluster: + cluster_compute: "{{scaling}}_all_to_all_compute.yaml" + + run: + timeout: 5400 + script: python tpch/tpch_q18.py --sf 100 + ################################################# # Cross-AZ RPC fault tolerance test #################################################