Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions release/nightly_tests/dataset/tpch/tpch_q5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import ray
from ray.data.aggregate import Sum
from ray.data.expressions import col
from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark


def main(args):
def benchmark_fn():
from datetime import datetime

# Load all required tables
region = load_table("region", args.sf)
nation = load_table("nation", args.sf)
supplier = load_table("supplier", args.sf)
customer = load_table("customer", args.sf)
orders = load_table("orders", args.sf)
lineitem = load_table("lineitem", args.sf)

# Q5 parameters
date = datetime(1994, 1, 1)

# Filter region by name
region = region.filter(expr=col("r_name") == "ASIA")

# Join region with nation
ds = region.join(
nation,
join_type="inner",
num_partitions=100,
on=("r_regionkey",),
right_on=("n_regionkey",),
)

# Join with supplier
ds = ds.join(
supplier,
join_type="inner",
num_partitions=100,
on=("n_nationkey",),
right_on=("s_nationkey",),
)

# Join with customer
ds = ds.join(
customer,
join_type="inner",
num_partitions=100,
on=("s_nationkey",),
right_on=("c_nationkey",),
)

# Join with orders and filter by date
orders_filtered = orders.filter(
expr=(
(col("o_orderdate") >= date)
& (col("o_orderdate") < datetime(date.year + 1, date.month, date.day))
)
)
ds = ds.join(
orders_filtered,
join_type="inner",
num_partitions=100,
on=("c_custkey",),
right_on=("o_custkey",),
)

# Join with lineitem on order key and supplier key
ds = ds.join(
lineitem,
join_type="inner",
num_partitions=100,
on=("o_orderkey", "s_suppkey"),
right_on=("l_orderkey", "l_suppkey"),
)

# Calculate revenue
ds = ds.with_column(
"revenue",
to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))),
)

# Aggregate by nation name
_ = (
ds.groupby("n_name")
.aggregate(Sum(on="revenue", alias_name="revenue"))
.sort(key="revenue", descending=True)
.materialize()
)

# Report arguments for the benchmark.
return vars(args)

run_tpch_benchmark("tpch_q5", benchmark_fn)


if __name__ == "__main__":
ray.init()
args = parse_tpch_args()
main(args)
114 changes: 114 additions & 0 deletions release/nightly_tests/dataset/tpch/tpch_q7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import ray
from ray.data.aggregate import Sum
from ray.data.expressions import col
from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark


def main(args):
def benchmark_fn():
from datetime import datetime

# Load all required tables
supplier = load_table("supplier", args.sf)
lineitem = load_table("lineitem", args.sf)
orders = load_table("orders", args.sf)
customer = load_table("customer", args.sf)
nation = load_table("nation", args.sf)

# Q7 parameters
date1 = datetime(1995, 1, 1)
date2 = datetime(1997, 1, 1)
nation1 = "FRANCE"
nation2 = "GERMANY"

# Filter nations of interest (both directions: nation1 <-> nation2)
nations_of_interest = nation.filter(
expr=(col("n_name") == nation1) | (col("n_name") == nation2)
)

# Join supplier with nation (use suffix to distinguish)
supplier_nation = supplier.join(
nations_of_interest,
join_type="inner",
num_partitions=100,
on=("s_nationkey",),
right_on=("n_nationkey",),
left_suffix="",
right_suffix="_supp",
)

# Join customer with nation (use suffix to distinguish)
customer_nation = customer.join(
nations_of_interest,
join_type="inner",
num_partitions=100,
on=("c_nationkey",),
right_on=("n_nationkey",),
left_suffix="",
right_suffix="_cust",
)

# Join orders with customer
orders_customer = orders.join(
customer_nation,
join_type="inner",
num_partitions=100,
on=("o_custkey",),
right_on=("c_custkey",),
)

# Join lineitem with orders and filter by date
lineitem_filtered = lineitem.filter(
expr=((col("l_shipdate") >= date1) & (col("l_shipdate") < date2))
)
lineitem_orders = lineitem_filtered.join(
orders_customer,
join_type="inner",
num_partitions=100,
on=("l_orderkey",),
right_on=("o_orderkey",),
)

# Join with supplier
ds = lineitem_orders.join(
supplier_nation,
join_type="inner",
num_partitions=100,
on=("l_suppkey",),
right_on=("s_suppkey",),
)

# Filter to ensure we only include shipments between the two nations
# (exclude shipments within the same nation)
ds = ds.filter(expr=(col("n_name_supp") != col("n_name_cust")))

# Calculate revenue
ds = ds.with_column(
"revenue",
to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))),
)

# Extract year from shipdate
ds = ds.with_column(
"l_year",
col("l_shipdate").dt.year(),
)

# Aggregate by supplier nation, customer nation, and year
_ = (
ds.groupby(["n_name_supp", "n_name_cust", "l_year"])
.aggregate(Sum(on="revenue", alias_name="revenue"))
.sort(key=["n_name_supp", "n_name_cust", "l_year"])
.materialize()
)

# Report arguments for the benchmark.
return vars(args)

run_tpch_benchmark("tpch_q7", benchmark_fn)


if __name__ == "__main__":
ray.init()
args = parse_tpch_args()
main(args)
169 changes: 169 additions & 0 deletions release/nightly_tests/dataset/tpch/tpch_q8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import ray
from ray.data.aggregate import Sum
from ray.data.expressions import col
from common import parse_tpch_args, load_table, to_f64, run_tpch_benchmark


def main(args):
def benchmark_fn():
from datetime import datetime

# Load all required tables
region = load_table("region", args.sf)
nation = load_table("nation", args.sf)
supplier = load_table("supplier", args.sf)
customer = load_table("customer", args.sf)
orders = load_table("orders", args.sf)
lineitem = load_table("lineitem", args.sf)
part = load_table("part", args.sf)

# Q8 parameters
date1 = datetime(1995, 1, 1)
date2 = datetime(1997, 1, 1)
region_name = "AMERICA"
part_type = "ECONOMY ANODIZED STEEL"
nation_name = "BRAZIL" # Specific nation for market share calculation

# Filter region
region_filtered = region.filter(expr=col("r_name") == region_name)

# Join region with nation
nation_region = region_filtered.join(
nation,
join_type="inner",
num_partitions=100,
on=("r_regionkey",),
right_on=("n_regionkey",),
)

# Join customer with nation
customer_nation = customer.join(
nation_region,
join_type="inner",
num_partitions=100,
on=("c_nationkey",),
right_on=("n_nationkey",),
)

# Join supplier with nation (for supplier nation)
# Note: For Q8, suppliers can be from ALL nations, not just the region
supplier_nation = supplier.join(
nation,
join_type="inner",
num_partitions=100,
on=("s_nationkey",),
right_on=("n_nationkey",),
left_suffix="",
right_suffix="_supp",
)

# Filter part by type
part_filtered = part.filter(expr=col("p_type") == part_type)

# Join orders with customer and filter by date
orders_filtered = orders.filter(
expr=((col("o_orderdate") >= date1) & (col("o_orderdate") < date2))
)
orders_customer = orders_filtered.join(
customer_nation,
join_type="inner",
num_partitions=100,
on=("o_custkey",),
right_on=("c_custkey",),
)

# Join lineitem with orders
lineitem_orders = lineitem.join(
orders_customer,
join_type="inner",
num_partitions=100,
on=("l_orderkey",),
right_on=("o_orderkey",),
)

# Join with part
lineitem_part = lineitem_orders.join(
part_filtered,
join_type="inner",
num_partitions=100,
on=("l_partkey",),
right_on=("p_partkey",),
)

# Join with supplier
ds = lineitem_part.join(
supplier_nation,
join_type="inner",
num_partitions=100,
on=("l_suppkey",),
right_on=("s_suppkey",),
)

# Calculate volume
ds = ds.with_column(
"volume",
to_f64(col("l_extendedprice")) * (1 - to_f64(col("l_discount"))),
)

# Extract year from orderdate
ds = ds.with_column(
"o_year",
col("o_orderdate").dt.year(),
)

# For Q8, we need to calculate market share for a specific nation (e.g., BRAZIL)
# Market share = (volume from specific nation suppliers) / (total volume in region)

# Create a conditional column for volume from the specific nation
# Use conditional expression: if n_name_supp == nation_name then volume else 0
# Convert boolean to numeric: True -> 1, False -> 0, then multiply by volume
ds = ds.with_column(
"is_nation",
to_f64((col("n_name_supp") == nation_name)),
)
ds = ds.with_column(
"nation_volume",
col("is_nation") * col("volume"),
)

# Calculate total volume per year (all suppliers, for customers in the region)
total_by_year = ds.groupby("o_year").aggregate(
Sum(on="volume", alias_name="total_volume")
)

# Calculate volume from specific nation per year (conditional sum)
nation_by_year = ds.groupby("o_year").aggregate(
Sum(on="nation_volume", alias_name="nation_volume")
)

# Join to get total volume for each year
result = nation_by_year.join(
total_by_year,
join_type="inner",
num_partitions=100,
on=("o_year",),
)

# Calculate market share for the specific nation
result = result.with_column(
"mkt_share",
col("nation_volume") / col("total_volume"),
)

# Select and sort by year
_ = (
result.select_columns(["o_year", "mkt_share"])
.sort(key="o_year")
.materialize()
)

# Report arguments for the benchmark.
return vars(args)

run_tpch_benchmark("tpch_q8", benchmark_fn)


if __name__ == "__main__":
ray.init()
args = parse_tpch_args()
main(args)
Loading