Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion release/nightly_tests/dataset/tpch/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,69 @@
from benchmark import Benchmark

# Define schemas for TPC-H tables
# Parquet datasets use column0, column1, ... (or column00, column01 for lineitem)
TABLE_COLUMNS = {
"region": {
"column0": "r_regionkey",
"column1": "r_name",
"column2": "r_comment",
"column3": "r_dummy",
},
"nation": {
"column0": "n_nationkey",
"column1": "n_name",
"column2": "n_regionkey",
"column3": "n_comment",
},
"supplier": {
"column0": "s_suppkey",
"column1": "s_name",
"column2": "s_address",
"column3": "s_nationkey",
"column4": "s_phone",
"column5": "s_acctbal",
"column6": "s_comment",
"column7": "s_dummy",
},
"customer": {
"column0": "c_custkey",
"column1": "c_name",
"column2": "c_address",
"column3": "c_nationkey",
"column4": "c_phone",
"column5": "c_acctbal",
"column6": "c_mktsegment",
"column7": "c_comment",
},
"orders": {
"column0": "o_orderkey",
"column1": "o_custkey",
"column2": "o_orderstatus",
"column3": "o_totalprice",
"column4": "o_orderdate",
"column5": "o_orderpriority",
"column6": "o_clerk",
"column7": "o_shippriority",
"column8": "o_comment",
},
"part": {
"column0": "p_partkey",
"column1": "p_name",
"column2": "p_mfgr",
"column3": "p_brand",
"column4": "p_type",
"column5": "p_size",
"column6": "p_container",
"column7": "p_retailprice",
"column8": "p_comment",
},
"partsupp": {
"column0": "ps_partkey",
"column1": "ps_suppkey",
"column2": "ps_availqty",
"column3": "ps_supplycost",
"column4": "ps_comment",
},
"lineitem": {
"column00": "l_orderkey",
"column01": "l_partkey",
Expand All @@ -26,7 +88,7 @@
"column13": "l_shipinstruct",
"column14": "l_shipmode",
"column15": "l_comment",
}
},
}


Expand Down
99 changes: 99 additions & 0 deletions release/nightly_tests/dataset/tpch/tpch_q5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import ray
from ray.data.aggregate import Sum
from ray.data.expressions import col
from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark


def main(args):
def benchmark_fn():
from datetime import datetime

# Load all required tables
region = load_table("region", args.sf)
nation = load_table("nation", args.sf)
supplier = load_table("supplier", args.sf)
customer = load_table("customer", args.sf)
orders = load_table("orders", args.sf)
lineitem = load_table("lineitem", args.sf)

# Q5 parameters
date = datetime(1994, 1, 1)

# Filter region by name
region = region.filter(expr=col("r_name") == "ASIA")

# Join region with nation
ds = region.join(
nation,
join_type="inner",
num_partitions=100,
on=("r_regionkey",),
right_on=("n_regionkey",),
)

# Join with supplier
ds = ds.join(
supplier,
join_type="inner",
num_partitions=100,
on=("n_nationkey",),
right_on=("s_nationkey",),
)

# Join with customer
ds = ds.join(
customer,
join_type="inner",
num_partitions=100,
on=("s_nationkey",),
right_on=("c_nationkey",),
)

# Join with orders and filter by date
orders_filtered = orders.filter(
expr=(
(col("o_orderdate") >= date)
& (col("o_orderdate") < datetime(date.year + 1, date.month, date.day))
)
)
ds = ds.join(
orders_filtered,
join_type="inner",
num_partitions=100,
on=("c_custkey",),
right_on=("o_custkey",),
)

# Join with lineitem on order key and supplier key
ds = ds.join(
lineitem,
join_type="inner",
num_partitions=100,
on=("o_orderkey", "s_suppkey"),
right_on=("l_orderkey", "l_suppkey"),
)

# Calculate revenue
ds = ds.with_column(
"revenue",
to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))),
)

# Aggregate by nation name
_ = (
ds.groupby("n_name")
.aggregate(Sum(on="revenue", alias_name="revenue"))
.sort(key="revenue", descending=True)
.materialize()
)

# Report arguments for the benchmark.
return vars(args)

run_tpch_benchmark("tpch_q5", benchmark_fn)


if __name__ == "__main__":
ray.init()
args = parse_tpch_args()
main(args)
114 changes: 114 additions & 0 deletions release/nightly_tests/dataset/tpch/tpch_q7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import ray
from ray.data.aggregate import Sum
from ray.data.expressions import col
from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark


def main(args):
def benchmark_fn():
from datetime import datetime

# Load all required tables
supplier = load_table("supplier", args.sf)
lineitem = load_table("lineitem", args.sf)
orders = load_table("orders", args.sf)
customer = load_table("customer", args.sf)
nation = load_table("nation", args.sf)

# Q7 parameters
date1 = datetime(1995, 1, 1)
date2 = datetime(1997, 1, 1)
nation1 = "FRANCE"
nation2 = "GERMANY"

# Filter nations of interest (both directions: nation1 <-> nation2)
nations_of_interest = nation.filter(
expr=(col("n_name") == nation1) | (col("n_name") == nation2)
)

# Join supplier with nation (use suffix to distinguish)
supplier_nation = supplier.join(
nations_of_interest,
join_type="inner",
num_partitions=100,
on=("s_nationkey",),
right_on=("n_nationkey",),
left_suffix="",
right_suffix="_supp",
)

# Join customer with nation (use suffix to distinguish)
customer_nation = customer.join(
nations_of_interest,
join_type="inner",
num_partitions=100,
on=("c_nationkey",),
right_on=("n_nationkey",),
left_suffix="",
right_suffix="_cust",
)

# Join orders with customer
orders_customer = orders.join(
customer_nation,
join_type="inner",
num_partitions=100,
on=("o_custkey",),
right_on=("c_custkey",),
)

# Join lineitem with orders and filter by date
lineitem_filtered = lineitem.filter(
expr=((col("l_shipdate") >= date1) & (col("l_shipdate") < date2))
)
lineitem_orders = lineitem_filtered.join(
orders_customer,
join_type="inner",
num_partitions=100,
on=("l_orderkey",),
right_on=("o_orderkey",),
)

# Join with supplier
ds = lineitem_orders.join(
supplier_nation,
join_type="inner",
num_partitions=100,
on=("l_suppkey",),
right_on=("s_suppkey",),
)

# Filter to ensure we only include shipments between the two nations
# (exclude shipments within the same nation)
ds = ds.filter(expr=(col("n_name_supp") != col("n_name_cust")))

# Calculate revenue
ds = ds.with_column(
"revenue",
to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))),
)

# Extract year from shipdate
ds = ds.with_column(
"l_year",
col("l_shipdate").dt.year(),
)

# Aggregate by supplier nation, customer nation, and year
_ = (
ds.groupby(["n_name_supp", "n_name_cust", "l_year"])
.aggregate(Sum(on="revenue", alias_name="revenue"))
.sort(key=["n_name_supp", "n_name_cust", "l_year"])
.materialize()
)

# Report arguments for the benchmark.
return vars(args)

run_tpch_benchmark("tpch_q7", benchmark_fn)


if __name__ == "__main__":
ray.init()
args = parse_tpch_args()
main(args)
Loading