Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flox for grouped first, last. #10148

Merged
merged 9 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ New Features
- Support reading to `GPU memory with Zarr <https://zarr.readthedocs.io/en/stable/user-guide/gpu.html>`_ (:pull:`10078`).
By `Deepak Cherian <https://github.com/dcherian>`_.

Performance
~~~~~~~~~~~
- :py:meth:`DatasetGroupBy.first` and :py:meth:`DatasetGroupBy.last` can now use ``flox`` if available. (:issue:`9647`)
By `Deepak Cherian <https://github.com/dcherian>`_.

Breaking changes
~~~~~~~~~~~~~~~~
- Rolled back code that would attempt to catch integer overflow when encoding
Expand Down Expand Up @@ -172,8 +177,6 @@ New Features
:py:class:`pandas.DatetimeIndex` (:pull:`9965`). By `Spencer Clark
<https://github.com/spencerkclark>`_ and `Kai Mühlbauer
<https://github.com/kmuehlbauer>`_.
- :py:meth:`DatasetGroupBy.first` and :py:meth:`DatasetGroupBy.last` can now use ``flox`` if available. (:issue:`9647`)
By `Deepak Cherian <https://github.com/dcherian>`_.

Breaking changes
~~~~~~~~~~~~~~~~
Expand Down
77 changes: 66 additions & 11 deletions xarray/core/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ def _flox_reduce(
dim: Dims,
keep_attrs: bool | None = None,
**kwargs: Any,
):
) -> T_Xarray:
"""Adaptor function that translates our groupby API to that of flox."""
import flox
from flox.xarray import xarray_reduce
Expand Down Expand Up @@ -1116,6 +1116,8 @@ def _flox_reduce(
# flox always assigns an index so we must drop it here if we don't need it.
to_drop.append(grouper.name)
continue
# TODO: We can't simply use `self.encoded.coords` here because it corresponds to `unique_coord`,
# NOT `full_index`. We would need to construct a new Coordinates object, that corresponds to `full_index`.
new_coords.append(
# Using IndexVariable here ensures we reconstruct PandasMultiIndex with
# all associated levels properly.
Expand Down Expand Up @@ -1361,7 +1363,12 @@ def where(self, cond, other=dtypes.NA) -> T_Xarray:
"""
return ops.where_method(self, cond, other)

def _first_or_last(self, op, skipna, keep_attrs):
def _first_or_last(
self,
op: Literal["first" | "last"],
skipna: bool | None,
keep_attrs: bool | None,
):
if all(
isinstance(maybe_slice, slice)
and (maybe_slice.stop == maybe_slice.start + 1)
Expand All @@ -1372,17 +1379,65 @@ def _first_or_last(self, op, skipna, keep_attrs):
return self._obj
if keep_attrs is None:
keep_attrs = _get_keep_attrs(default=True)
return self.reduce(
op, dim=[self._group_dim], skipna=skipna, keep_attrs=keep_attrs
)
if (
module_available("flox", minversion="0.10.0")
and OPTIONS["use_flox"]
and contains_only_chunked_or_numpy(self._obj)
):
result = self._flox_reduce(
dim=None, func=op, skipna=skipna, keep_attrs=keep_attrs
)
else:
result = self.reduce(
getattr(duck_array_ops, op),
dim=[self._group_dim],
skipna=skipna,
keep_attrs=keep_attrs,
)
return result

def first(self, skipna: bool | None = None, keep_attrs: bool | None = None):
"""Return the first element of each group along the group dimension"""
return self._first_or_last(duck_array_ops.first, skipna, keep_attrs)
def first(
self, skipna: bool | None = None, keep_attrs: bool | None = None
) -> T_Xarray:
"""
Return the first element of each group along the group dimension

def last(self, skipna: bool | None = None, keep_attrs: bool | None = None):
"""Return the last element of each group along the group dimension"""
return self._first_or_last(duck_array_ops.last, skipna, keep_attrs)
Parameters
----------
skipna : bool or None, optional
If True, skip missing values (as marked by NaN). By default, only
skips missing values for float dtypes; other dtypes either do not
have a sentinel missing value (int) or ``skipna=True`` has not been
implemented (object, datetime64 or timedelta64).
keep_attrs : bool or None, optional
If True, ``attrs`` will be copied from the original
object to the new one. If False, the new object will be
returned without attributes.

"""
return self._first_or_last("first", skipna, keep_attrs)

def last(
self, skipna: bool | None = None, keep_attrs: bool | None = None
) -> T_Xarray:
"""
Return the last element of each group along the group dimension

Parameters
----------
skipna : bool or None, optional
If True, skip missing values (as marked by NaN). By default, only
skips missing values for float dtypes; other dtypes either do not
have a sentinel missing value (int) or ``skipna=True`` has not been
implemented (object, datetime64 or timedelta64).
keep_attrs : bool or None, optional
If True, ``attrs`` will be copied from the original
object to the new one. If False, the new object will be
returned without attributes.


"""
return self._first_or_last("last", skipna, keep_attrs)

def assign_coords(self, coords=None, **coords_kwargs):
"""Assign coordinates by group.
Expand Down
24 changes: 21 additions & 3 deletions xarray/core/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import warnings
from collections.abc import Callable, Hashable, Iterable, Sequence
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

from xarray.core._aggregations import (
DataArrayResampleAggregations,
Expand Down Expand Up @@ -55,8 +55,11 @@ def _flox_reduce(
keep_attrs: bool | None = None,
**kwargs,
) -> T_Xarray:
result = super()._flox_reduce(dim=dim, keep_attrs=keep_attrs, **kwargs)
result = result.rename({RESAMPLE_DIM: self._group_dim})
result: T_Xarray = (
super()
._flox_reduce(dim=dim, keep_attrs=keep_attrs, **kwargs)
.rename({RESAMPLE_DIM: self._group_dim}) # type: ignore[assignment]
)
return result

def shuffle_to_chunks(self, chunks: T_Chunks = None):
Expand Down Expand Up @@ -103,6 +106,21 @@ def shuffle_to_chunks(self, chunks: T_Chunks = None):
(grouper,) = self.groupers
return self._shuffle_obj(chunks).drop_vars(RESAMPLE_DIM)

def _first_or_last(
self, op: Literal["first", "last"], skipna: bool | None, keep_attrs: bool | None
) -> T_Xarray:
from xarray.core.dataset import Dataset

result = super()._first_or_last(op=op, skipna=skipna, keep_attrs=keep_attrs)
if isinstance(result, Dataset):
# Can't do this in the base class because group_dim is RESAMPLE_DIM
# which is not present in the original object
for var in result.data_vars:
result._variables[var] = result._variables[var].transpose(
*self._obj._variables[var].dims
)
return result

def _drop_coords(self) -> T_Xarray:
"""Drop non-dimension coordinates along the resampled dimension."""
obj = self._obj
Expand Down
6 changes: 5 additions & 1 deletion xarray/groupers.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ def _factorize_unique(self) -> EncodedGroups:
unique_coord = Variable(
dims=codes.name, data=unique_values, attrs=self.group.attrs
)
full_index = pd.Index(unique_values)
full_index = (
unique_values
if isinstance(unique_values, pd.MultiIndex)
else pd.Index(unique_values)
)

return EncodedGroups(
codes=codes,
Expand Down
8 changes: 5 additions & 3 deletions xarray/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,8 @@ def test_groupby_first_and_last(self) -> None:
expected = array # should be a no-op
assert_identical(expected, actual)

# TODO: groupby_bins too

def make_groupby_multidim_example_array(self) -> DataArray:
return DataArray(
[[[0, 1], [2, 3]], [[5, 10], [15, 20]]],
Expand Down Expand Up @@ -2374,13 +2376,13 @@ def test_resample_and_first(self) -> None:
# upsampling
expected_time = pd.date_range("2000-01-01", freq="3h", periods=19)
expected = ds.reindex(time=expected_time)
actual = ds.resample(time="3h")
rs = ds.resample(time="3h")
for how in ["mean", "sum", "first", "last"]:
method = getattr(actual, how)
method = getattr(rs, how)
result = method()
assert_equal(expected, result)
for method in [np.mean]:
result = actual.reduce(method)
result = rs.reduce(method)
assert_equal(expected, result)

def test_resample_min_count(self) -> None:
Expand Down
Loading