-
Notifications
You must be signed in to change notification settings - Fork 176
fix: Fix over with partition_by when partition_by contains null values #3308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
narwhals/_arrow/expr.py
Outdated
| if any( | ||
| ca.null_count > 0 | ||
| for ca in tmp.simple_select(*partition_by).native.columns | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a follow up, but maybe something similar to the workaround we already have in
narwhals/narwhals/_arrow/group_by.py
Line 179 in 41e123d
| def __iter__(self) -> Iterator[tuple[Any, ArrowDataFrame]]: |
can help within this if block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FBruzzesi It is true that you could reuse that solution.
But I think we should do some careful benchmarking before making a decision on what route is best here.
I've got a couple of variations of that in #2572
Show GroupBy.__iter__ and DataFrame.partition_by
narwhals/narwhals/_plan/arrow/group_by.py
Lines 152 to 160 in 3a6c25a
| def __iter__(self) -> Iterator[tuple[Any, Frame]]: | |
| by = self.key_names | |
| from_native = self.compliant._with_native | |
| for partition in partition_by(self.compliant.native, by): | |
| t = from_native(partition) | |
| yield ( | |
| t.select_names(*by).row(0), | |
| t.select_names(*self._column_names_original), | |
| ) |
narwhals/narwhals/_plan/arrow/group_by.py
Lines 182 to 226 in 3a6c25a
| def partition_by( | |
| native: pa.Table, by: Sequence[str], *, include_key: bool = True | |
| ) -> Iterator[pa.Table]: | |
| if len(by) == 1: | |
| yield from _partition_by_one(native, by[0], include_key=include_key) | |
| else: | |
| yield from _partition_by_many(native, by, include_key=include_key) | |
| def _partition_by_one( | |
| native: pa.Table, by: str, *, include_key: bool = True | |
| ) -> Iterator[pa.Table]: | |
| """Optimized path for single-column partition.""" | |
| arr_dict: Incomplete = fn.array(native.column(by).dictionary_encode("encode")) | |
| indices: pa.Int32Array = arr_dict.indices | |
| if not include_key: | |
| native = native.remove_column(native.schema.get_field_index(by)) | |
| for idx in range(len(arr_dict.dictionary)): | |
| # NOTE: Acero filter doesn't support `null_selection_behavior="emit_null"` | |
| # Is there any reasonable way to do this in Acero? | |
| yield native.filter(pc.equal(pa.scalar(idx), indices)) | |
| def _partition_by_many( | |
| native: pa.Table, by: Sequence[str], *, include_key: bool = True | |
| ) -> Iterator[pa.Table]: | |
| original_names = native.column_names | |
| temp_name = temp.column_name(original_names) | |
| key = acero.col(temp_name) | |
| composite_values = _composite_key(acero.select_names_table(native, by)) | |
| # Need to iterate over the whole thing, so py_list first should be faster | |
| unique_py = composite_values.unique().to_pylist() | |
| re_keyed = native.add_column(0, temp_name, composite_values) | |
| source = acero.table_source(re_keyed) | |
| if include_key: | |
| keep = original_names | |
| else: | |
| ignore = {*by, temp_name} | |
| keep = [name for name in original_names if name not in ignore] | |
| select = acero.select_names(keep) | |
| for v in unique_py: | |
| # NOTE: May want to split the `Declaration` production iterator into it's own function | |
| # E.g, to push down column selection to *before* collection | |
| # Not needed for this task though | |
| yield acero.collect(source, acero.filter(key == v), select) |
But my intuition is that the solution I proposed on discord might scale better.
Here, I assume we pay some cost for the dictionary_encode - but it might be offset by the fact that the group_by(...) is working with integers?
It doesn't seem too unreasonable to try encoding each column with nulls?
Or allowing at-most 1 null column - but permitting multiple columns if all the others don't have nulls π€
Show alternative
import pyarrow as pa
data = {"a": [1, 1, None, 3, 3], "b": [1, 3, 4, 5, 6], "c": [1, 1, None, 3, 4]}
TEMP_NAME = "hey marco!"
PARTITION_BY = "a"
table = pa.table(data)
dictionary_array = table.column(PARTITION_BY).dictionary_encode("encode").combine_chunks()
table_encoded = table.append_column(TEMP_NAME, dictionary_array.indices)
windowed = (
table_encoded.group_by(TEMP_NAME)
.aggregate([("b", "hash_min"), ("b", "hash_max")])
.rename_columns({"b_min": "bmin", "b_max": "bmax"})
)
with_columns = table_encoded.join(windowed, TEMP_NAME).drop([TEMP_NAME])
select = table_encoded.join(windowed, TEMP_NAME).select(["bmin", "bmax"])
print(f"with_columns:\n\n{with_columns!r}\n")
print(f"select:\n\n{select!r}")On the other hand, the simplest option is prioritize the single column case and open an issue upstream π
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whops! I did a commit literally one second ago and came here to comment that: de3f02d
On the other hand, the simplest option is prioritize the single column case and open an issue upstream π
There is an issue tracking the issue with the join (apache/arrow#13408), but I would argue it's not really an issue as pandas seems to be the odd one. Rather there is no native way to perform an "over" operation on a pyarrow table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also had the idea of somehow doing this with lists on the tip of my tongue, but haven't worked it out yet π
There's an example in (apache/arrow#48060 (comment)), but it isn't a direct solution for us here
This is a starting point though that I think you could get the results of min and max from:
import pyarrow as pa
data = {"a": [1, 1, None, 3, 3], "b": [1, 3, 4, 5, 6], "c": [1, 1, None, 3, 4]}
pa.table(data).group_by(["a", "c"]).aggregate([("b", "hash_list")])pyarrow.Table
a: int64
c: int64
b_list: list<item: int64>
child 0, item: int64
----
a: [[1,null,3,3]]
c: [[1,null,3,4]]
b_list: [[[1,3],[4],[5],[6]]]There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, the simplest option is prioritize the single column case and open an issue upstream π
totally agree, I don't think we can use the __iter__ solution here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will revert the commit, sorry for the added entropy.
Update: Reverted in f3d35cf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks both!
have added the solution suggested by Dan (https://discord.com/channels/1235257048170762310/1438922034091659374/1438956207044952084), which works wonderfully. Thanks Dan, good one! I hadn't understood it at first, but now that I see it, it looks perfectly safe!
|
Should we try to revive this issue? The discussion makes it sound like the functionality is there on the c++ side |
Gonna use towards the fix like #3308
565b0de to
49b3e89
Compare
|
yeah, sure |
|
thanks all for reviews! |
closes #3300
What type of PR is this? (check all applicable)
Related issues
Checklist
If you have comments or can explain your changes, please do so below