Skip to content

When adding collumns from 2 dataframes will not compute in some instances, fix for one instance seems to break the other #11395

Open
dask/dask-expr
#1143
@JimHBeam

Description

@JimHBeam

Describe the issue:
Upgrading to pandas 2, existing code. WE add some collumns together as part of our process some data seems to require a compute mid process but this then causes other data to fail, this seems buggy, there are 2 errors generated in the code below, it seems the fix for one bit of data breaks the process for the other data.

Minimal Complete Verifiable Example:

import pandas as pd
import dask.dataframe as dd

preds1=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1],
    "Idx":[1,4],
})

preds1=preds1.set_index("Idx")


ads1=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1,],
    "Idx":[1,4],
})

ads1=ads1.set_index("Idx")

preds2=pd.DataFrame({
    "prediction_probability":[1.0] * 4,
    "prediction":  [1,1,1,1],
    "num_runs": [1,1,1,1],
    "Idx":[1,2,3,4],
})

preds2=preds2.set_index("Idx")


ads2=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1],
    "Idx":[1,2],
})

ads2=ads2.set_index("Idx")

# computing at end
# this works
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction, fill_value=0
)
print(preds_dd.compute())

# this fails
preds_dd = dd.from_pandas(preds2)
ads_dd = dd.from_pandas(ads2)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction, fill_value=0
)
print(preds_dd.compute())


# extra compute in the middle on the series 

# this fails
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction.compute(), fill_value=0
)

print(preds_dd.compute())

# this works
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction.compute(), fill_value=0
)

print(preds_dd.compute())

Anything else we need to know?:
stack trace

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In[6], line 6
      2 ads_dd = dd.from_pandas(ads1)
      3 preds_dd["prediction"] = preds_dd.prediction.add(
      4                     ads_dd.prediction.compute(), fill_value=0
      5 )
----> 6 print(preds_dd.compute())

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_collection.py:476, in FrameBase.compute(self, fuse, **kwargs)
    474 if not isinstance(out, Scalar):
    475     out = out.repartition(npartitions=1)
--> 476 out = out.optimize(fuse=fuse)
    477 return DaskMethodsMixin.compute(out, **kwargs)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_collection.py:591, in FrameBase.optimize(self, fuse)
    573 def optimize(self, fuse: bool = True):
    574     """Optimizes the DataFrame.
    575 
    576     Runs the optimizer with all steps over the DataFrame and wraps the result in a
   (...)
    589         The optimized Dask Dataframe
    590     """
--> 591     return new_collection(self.expr.optimize(fuse=fuse))

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:94, in Expr.optimize(self, **kwargs)
     93 def optimize(self, **kwargs):
---> 94     return optimize(self, **kwargs)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3070, in optimize(expr, fuse)
   3049 """High level query optimization
   3050 
   3051 This leverages three optimization passes:
   (...)
   3066 optimize_blockwise_fusion
   3067 """
   3068 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 3070 return optimize_until(expr, stage)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3031, in optimize_until(expr, stage)
   3028     return expr
   3030 # Lower
-> 3031 expr = expr.lower_completely()
   3032 if stage == "physical":
   3033     return expr

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_core.py:447, in Expr.lower_completely(self)
    445 lowered = {}
    446 while True:
--> 447     new = expr.lower_once(lowered)
    448     if new._name == expr._name:
    449         break

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_core.py:402, in Expr.lower_once(self, lowered)
    399 expr = self
    401 # Lower this node
--> 402 out = expr._lower()
    403 if out is None:
    404     out = expr

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3435, in MaybeAlignPartitions._lower(self)
   3432 def _lower(self):
   3433     # This can be expensive when something that has expensive division
   3434     # calculation is in the Expression
-> 3435     dfs = self.args
   3436     if (
   3437         len(dfs) == 1
   3438         or all(
   (...)
   3441         or len(self.divisions) == 2
   3442     ):
   3443         return self._expr_cls(*self.operands)

File [~/.pyenv/versions/3.10.14/lib/python3.10/functools.py:981](http://localhost:8888/home/honej/.pyenv/versions/3.10.14/lib/python3.10/functools.py#line=980), in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3430, in MaybeAlignPartitions.args(self)
   3427 @functools.cached_property
   3428 def args(self):
   3429     dfs = [op for op in self.operands if isinstance(op, Expr)]
-> 3430     return [op for op in dfs if not is_broadcastable(dfs, op)]

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3430, in <listcomp>(.0)
   3427 @functools.cached_property
   3428 def args(self):
   3429     dfs = [op for op in self.operands if isinstance(op, Expr)]
-> 3430     return [op for op in dfs if not is_broadcastable(dfs, op)]

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3086, in is_broadcastable(dfs, s)
   3081     except (TypeError, ValueError):
   3082         return False
   3084 return (
   3085     s.ndim == 1
-> 3086     and s.npartitions == 1
   3087     and s.known_divisions
   3088     and any(compare(s, df) for df in dfs if df.ndim == 2)
   3089     or s.ndim == 0
   3090 )

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:398, in Expr.npartitions(self)
    396     return self.operands[idx]
    397 else:
--> 398     return len(self.divisions) - 1

File [~/.pyenv/versions/3.10.14/lib/python3.10/functools.py:981](http://localhost:8888/home/honej/.pyenv/versions/3.10.14/lib/python3.10/functools.py#line=980), in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:382, in Expr.divisions(self)
    380 @functools.cached_property
    381 def divisions(self):
--> 382     return tuple(self._divisions())

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:2633, in Binop._divisions(self)
   2631     return tuple(self.operation(left_divisions, right_divisions))
   2632 else:
-> 2633     return super()._divisions()

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:530, in Blockwise._divisions(self)
    528 for arg in dependencies:
    529     if not self._broadcast_dep(arg):
--> 530         assert arg.divisions == dependencies[0].divisions
    531 return dependencies[0].divisions

AssertionError:

Environment:

  • Dask version: 2024.8.2
  • Python version: 3.10.14
  • Operating System:Ubuntu
  • Install method (conda, pip, source):Poetry

Metadata

Metadata

Assignees

No one assigned

    Labels

    dask-exprneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions