Skip to content

Handle empty aggregations in multi-partition cudf.polars group_by #18277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: branch-25.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,28 @@ def _(
inp = translator.translate_ir(n=None)
aggs = [translate_named_expr(translator, n=e) for e in node.aggs]
keys = [translate_named_expr(translator, n=e) for e in node.keys]
return ir.GroupBy(
schema,
keys,
aggs,
node.maintain_order,
node.options,
translator.config_options,
inp,
)
if len(aggs) == 0:
# A GroupBy with no aggregations is logically equivalent to a Distinct
# on the Keys. Handling the no-agg case here lets our downstream
# expression nodes assume there will always be at least one aggregation.
return ir.Distinct(
schema,
plc.stream_compaction.DuplicateKeepOption.KEEP_ANY,
None,
node.options.slice,
node.maintain_order,
ir.Select(schema, keys, True, inp), # noqa: FBT003
)
else:
return ir.GroupBy(
schema,
keys,
aggs,
node.maintain_order,
node.options,
translator.config_options,
inp,
)


@_translate_ir.register
Expand Down
5 changes: 5 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def keys(self, node: Node) -> Iterator[tuple[str, int]]:
name = get_key_name(node)
yield from ((name, i) for i in range(self.count))

def __repr__(self) -> str: # noqa: D105
return (
f"PartitionInfo(count={self.count}, partitioned_on={self.partitioned_on})"
)


def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
Expand Down
32 changes: 1 addition & 31 deletions python/cudf_polars/cudf_polars/experimental/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,14 @@
from cudf_polars.dsl.ir import Join
from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name
from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node
from cudf_polars.experimental.shuffle import Shuffle, _partition_dataframe
from cudf_polars.experimental.shuffle import _maybe_shuffle_frame, _partition_dataframe

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.parallel import LowerIRTransformer
from cudf_polars.utils.config import ConfigOptions


def _maybe_shuffle_frame(
frame: IR,
on: tuple[NamedExpr, ...],
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
output_count: int,
) -> IR:
# Shuffle `frame` if it isn't already shuffled.
if (
partition_info[frame].partitioned_on == on
and partition_info[frame].count == output_count
):
# Already shuffled
return frame
else:
# Insert new Shuffle node
frame = Shuffle(
frame.schema,
on,
config_options,
frame,
)
partition_info[frame] = PartitionInfo(
count=output_count,
partitioned_on=on,
)
return frame


def _make_hash_join(
Expand Down
68 changes: 61 additions & 7 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,29 @@
from functools import reduce
from typing import TYPE_CHECKING, Any, ClassVar

import cudf_polars.experimental.groupby
import cudf_polars.experimental.io
import cudf_polars.experimental.join
import cudf_polars.experimental.select
import cudf_polars.experimental.shuffle # noqa: F401
Comment on lines -12 to -16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing these lines will break a lot of things, because we are registering dispatch functions in these modules.

from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union
from cudf_polars.dsl.expr import Col, NamedExpr
from cudf_polars.dsl.ir import (
IR,
Cache,
Distinct,
Filter,
HStack,
Projection,
Select,
Union,
)
from cudf_polars.dsl.traversal import CachingVisitor, traversal
from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name
from cudf_polars.experimental.base import (
PartitionInfo,
_concat,
get_key_name,
)
from cudf_polars.experimental.dispatch import (
generate_ir_tasks,
lower_ir_node,
)
from cudf_polars.experimental.shuffle import _maybe_shuffle_frame
from cudf_polars.utils.config import ConfigOptions

if TYPE_CHECKING:
from collections.abc import MutableMapping
Expand Down Expand Up @@ -302,3 +313,46 @@ def _generate_ir_tasks_pwise(
generate_ir_tasks.register(Filter, _generate_ir_tasks_pwise)
generate_ir_tasks.register(HStack, _generate_ir_tasks_pwise)
generate_ir_tasks.register(Select, _generate_ir_tasks_pwise)
generate_ir_tasks.register(Distinct, _generate_ir_tasks_pwise)


@lower_ir_node.register(Distinct)
def _(
ir: Distinct, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Lower a Distinct node.

Distinct is implemented as a set of per-partition Distinct operations,
followed by a Shuffle, followed by a final Distinct.
"""
child, partition_info = rec(ir.children[0])
subset = ir.subset or set(ir.schema)
partitioned_on = tuple(
NamedExpr(name, Col(dtype, name))
for name, dtype in ir.schema.items()
if name in subset
)

output_count = partition_info[child].count
if output_count == 1:
new_node = ir.reconstruct([child])
partition_info[new_node] = PartitionInfo(count=1, partitioned_on=partitioned_on)
return new_node, partition_info
else:
config_options = ConfigOptions({})
new_child = _maybe_shuffle_frame(
child,
partitioned_on,
partition_info,
config_options,
output_count,
)
Comment on lines +344 to +350
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: For groupby we always do tree-reduce, here we're always doing shuffle. I guess both are fine, but maybe we want to be consistent?

if child != new_child:
ir = ir.reconstruct([new_child])

partition_info[ir] = PartitionInfo(
count=output_count, partitioned_on=partitioned_on
)

return ir, partition_info
31 changes: 31 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,34 @@ def _(
partition_info[ir.children[0]].count,
partition_info[ir].count,
)


def _maybe_shuffle_frame(
frame: IR,
on: tuple[NamedExpr, ...],
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
output_count: int,
) -> IR:
from cudf_polars.experimental.parallel import PartitionInfo

# Shuffle `frame` if it isn't already shuffled.
if (
partition_info[frame].partitioned_on == on
and partition_info[frame].count == output_count
):
# Already shuffled
return frame
else:
# Insert new Shuffle node
frame = Shuffle(
frame.schema,
on,
config_options,
frame,
)
partition_info[frame] = PartitionInfo(
count=output_count,
partitioned_on=on,
)
return frame
22 changes: 22 additions & 0 deletions python/cudf_polars/tests/experimental/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest

import polars as pl


@pytest.fixture(scope="module")
def engine():
"""
Fixture for a `GPUEngine` with the `dask-experimental` executor.

Sets the maximum number of rows per partition to 4.
"""
return pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"max_rows_per_partition": 4},
)
10 changes: 10 additions & 0 deletions python/cudf_polars/tests/experimental/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from cudf_polars.experimental.base import PartitionInfo


def test_partition_info_repr():
partition_info = PartitionInfo(count=10, partitioned_on=("a", "b"))
assert repr(partition_info) == "PartitionInfo(count=10, partitioned_on=('a', 'b'))"
15 changes: 6 additions & 9 deletions python/cudf_polars/tests/experimental/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@
from cudf_polars.testing.asserts import assert_gpu_result_equal


@pytest.fixture(scope="module")
def engine():
return pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"max_rows_per_partition": 4},
)


@pytest.fixture(scope="module")
def df():
return pl.LazyFrame(
Expand Down Expand Up @@ -80,6 +71,12 @@ def test_groupby_agg_config_options(df, op, keys):
assert_gpu_result_equal(q, engine=engine, check_row_order=False)


def test_groupby_agg_empty(df: pl.LazyFrame, engine: pl.GPUEngine) -> None:
# https://github.com/rapidsai/cudf/issues/18276
q = df.group_by("y").agg()
assert_gpu_result_equal(q, engine=engine, check_row_order=False)


def test_groupby_raises(df, engine):
q = df.group_by("y").median()
with pytest.raises(
Expand Down
28 changes: 28 additions & 0 deletions python/cudf_polars/tests/experimental/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from cudf_polars import Translator
from cudf_polars.dsl.traversal import traversal
from cudf_polars.testing.asserts import assert_gpu_result_equal


def test_evaluate_dask():
Expand Down Expand Up @@ -69,3 +70,30 @@ def test_pickle_conditional_join_args():
ir = Translator(q._ldf.visit(), GPUEngine()).translate_ir()
for node in traversal([ir]):
pickle.loads(pickle.dumps(node._non_child_args))


@pytest.mark.parametrize("subset", [None, ["a"], ["a", "b"]])
@pytest.mark.parametrize("maintain_order", [True, False])
@pytest.mark.parametrize("keep", ["first", "last", "any", "none"])
@pytest.mark.parametrize("n_repeat", [1, 2])
def test_unique(
engine: pl.GPUEngine,
subset: list[str] | None,
maintain_order: bool, # noqa: FBT001
keep: str,
n_repeat: int,
) -> None:
df = pl.LazyFrame(
{
"a": [0, 1] * n_repeat * 2,
"b": [0, 1, 2, 3] * n_repeat,
}
)
length = df.select(pl.len()).collect().item()
# check that we're on either side of the engine's max_rows_per_partition
if n_repeat == 1:
assert length <= engine.config["executor_options"]["max_rows_per_partition"]
else:
assert length > engine.config["executor_options"]["max_rows_per_partition"]
q = df.unique(subset=subset, maintain_order=maintain_order, keep=keep)
assert_gpu_result_equal(q, engine=engine, check_row_order=False)
11 changes: 1 addition & 10 deletions python/cudf_polars/tests/experimental/test_select.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations
Expand All @@ -10,15 +10,6 @@
from cudf_polars.testing.asserts import assert_gpu_result_equal


@pytest.fixture(scope="module")
def engine():
return pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"max_rows_per_partition": 3},
)


@pytest.fixture(scope="module")
def df():
return pl.LazyFrame(
Expand Down
9 changes: 0 additions & 9 deletions python/cudf_polars/tests/experimental/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
from cudf_polars.utils.config import ConfigOptions


@pytest.fixture(scope="module")
def engine():
return pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"max_rows_per_partition": 4},
)


@pytest.fixture(scope="module")
def df():
return pl.LazyFrame(
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def keys(request):
[pl.col("float").sum().round(decimals=1)],
[pl.col("float").round(decimals=1).sum()],
[pl.col("int").first(), pl.col("float").last()],
[],
],
ids=lambda aggs: "-".join(map(str, aggs)),
)
Expand Down
Loading