Skip to content

Support rolling aggregations in in-memory cudf-polars execution #18681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/rolling/detail/rolling_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct is_supported_rolling_aggregation_impl {
constexpr bool operator()() const noexcept
{
return (kind == aggregation::Kind::LEAD || kind == aggregation::Kind::LAG ||
kind == aggregation::Kind::COLLECT_LIST || aggregation::Kind::COLLECT_SET) ||
kind == aggregation::Kind::COLLECT_LIST || kind == aggregation::Kind::COLLECT_SET) ||
corresponding_rolling_operator<T, kind>::type::is_supported();
}
};
Expand Down
38 changes: 38 additions & 0 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,44 @@ def sorted_like(self, like: Column, /) -> Self:
null_order=like.null_order,
)

def check_sorted(
self,
*,
order: plc.types.Order,
null_order: plc.types.NullOrder,
) -> bool:
"""
Check if the column is sorted.

Parameters
----------
order
The requested sort order.
null_order
Where nulls sort to.

Returns
-------
True if the column is sorted, false otherwise.

Notes
-----
If the sortedness flag is not set, this launches a kernel to
check sortedness.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth noting here, or somewhere, that this might mutate self to set sorted, order, and null_order.

I was initially a little worried about this, but I see now that it fits the description of this class (where the data is immutable, and these sortedness attributes are essentially caches on properties that data has).

"""
if self.obj.size() <= 1 or self.obj.size() == self.obj.null_count():
return True
if self.is_sorted == plc.types.Sorted.YES:
return self.order == order and (
self.obj.null_count() == 0 or self.null_order == null_order
)
if plc.sorting.is_sorted(plc.Table([self.obj]), [order], [null_order]):
self.sorted = plc.types.Sorted.YES
self.order = order
self.null_order = null_order
return True
return False

def astype(self, dtype: plc.DataType) -> Column:
"""
Cast the column to as the requested dtype.
Expand Down
119 changes: 110 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/expressions/rolling.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
# TODO: remove need for this
# ruff: noqa: D101
Expand All @@ -8,24 +8,125 @@

from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.expressions.base import Expr
import pylibcudf as plc

from cudf_polars.containers import Column
from cudf_polars.dsl import expr
from cudf_polars.dsl.expressions.base import ExecutionContext, Expr
from cudf_polars.dsl.utils.windows import range_window_bounds

if TYPE_CHECKING:
import pylibcudf as plc
import pyarrow as pa

from cudf_polars.containers import DataFrame
from cudf_polars.typing import ClosedInterval

__all__ = ["GroupedRollingWindow", "RollingWindow", "to_request"]


def to_request(
value: expr.Expr, orderby: Column, df: DataFrame
) -> plc.rolling.RollingRequest:
"""
Produce a rolling request for evaluation with pylibcudf.

__all__ = ["GroupedRollingWindow", "RollingWindow"]
Parameters
----------
value
The expression to perform the rolling aggregation on.
orderby
Orderby column, used as input to the request when the aggregation is Len.
df
DataFrame used to evaluate the inputs to the aggregation.
"""
min_periods = 1
if isinstance(value, expr.Len):
# A count aggregation, we need a column so use the orderby column
col = orderby
elif isinstance(value, expr.Agg):
child = value.children[0]
col = child.evaluate(df, context=ExecutionContext.ROLLING)
if value.name == "var":
# Polars variance produces null if nvalues <= ddof
# libcudf produces NaN. However, we can get the polars
# behaviour by setting the minimum window size to ddof +
# 1.
min_periods = value.options + 1
else:
col = value.evaluate(
df, context=ExecutionContext.ROLLING
) # pragma: no cover; raise before we get here because we
# don't do correct handling of empty groups
return plc.rolling.RollingRequest(col.obj, min_periods, value.agg_request)


class RollingWindow(Expr):
__slots__ = ("options",)
_non_child = ("dtype", "options")
__slots__ = ("closed_window", "following", "orderby", "preceding")
_non_child = ("dtype", "preceding", "following", "closed_window", "orderby")

def __init__(self, dtype: plc.DataType, options: Any, agg: Expr) -> None:
def __init__(
self,
dtype: plc.DataType,
preceding: pa.Scalar,
following: pa.Scalar,
closed_window: ClosedInterval,
orderby: str,
agg: Expr,
) -> None:
self.dtype = dtype
self.options = options
self.preceding = preceding
self.following = following
self.closed_window = closed_window
self.orderby = orderby
self.children = (agg,)
self.is_pointwise = False
raise NotImplementedError("Rolling window not implemented")
if agg.agg_request.kind() == plc.aggregation.Kind.COLLECT_LIST:
raise NotImplementedError(
"Incorrect handling of empty groups for list collection"
)
if not plc.rolling.is_valid_rolling_aggregation(agg.dtype, agg.agg_request):
raise NotImplementedError(f"Unsupported rolling aggregation {agg}")

def do_evaluate( # noqa: D102
self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
) -> Column:
if context != ExecutionContext.FRAME:
raise RuntimeError(
"Rolling aggregation inside groupby/over/rolling"
) # pragma: no cover; translation raises first
(agg,) = self.children
orderby = df.column_map[self.orderby]
# Polars casts integral orderby to int64, but only for calculating window bounds
if (
plc.traits.is_integral(orderby.obj.type())
and orderby.obj.type().id() != plc.TypeId.INT64
):
orderby_obj = plc.unary.cast(orderby.obj, plc.DataType(plc.TypeId.INT64))
else:
orderby_obj = orderby.obj
preceding, following = range_window_bounds(
self.preceding, self.following, self.closed_window
)
if orderby.obj.null_count() != 0:
raise RuntimeError(
f"Index column '{self.orderby}' in rolling may not contain nulls"
)
if not orderby.check_sorted(
order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.BEFORE
):
raise RuntimeError(
f"Index column '{self.orderby}' in rolling is not sorted, please sort first"
)
(result,) = plc.rolling.grouped_range_rolling_window(
plc.Table([]),
orderby_obj,
plc.types.Order.ASCENDING,
plc.types.NullOrder.BEFORE,
preceding,
following,
[to_request(agg, orderby, df)],
).columns()
return Column(result)


class GroupedRollingWindow(Expr):
Expand Down
Loading
Loading