Skip to content

Fix #9175: Remove reference to non-existent Scheduler.cleanup #16817

Fix #9175: Remove reference to non-existent Scheduler.cleanup

Fix #9175: Remove reference to non-existent Scheduler.cleanup #16817

GitHub Actions / Unit Test Results failed Jan 25, 2026 in 0s

1 errors, 258 fail, 104 skipped, 3 750 pass in 11h 25m 25s

    31 files  +    30      31 suites  +30   11h 25m 25s ⏱️ + 11h 24m 50s
 4 113 tests + 4 070   3 750 ✅ + 3 741    104 💤 +   70  258 ❌ +258  1 🔥 +1 
59 636 runs  +59 593  56 650 ✅ +56 641  2 465 💤 +2 431  519 ❌ +519  2 🔥 +2 

Results for commit 86daefb. ± Comparison against earlier commit aaaf89d.

Annotations

Check warning on line 0 in distributed.deploy.tests.test_local

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_local_cluster_redundant_kwarg[True] (distributed.deploy.tests.test_local)

artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 4s]
Raw output
RuntimeError: Nanny failed to start worker process
nanny = True

    @pytest.mark.parametrize("nanny", [True, False])
    @gen_test()
    async def test_local_cluster_redundant_kwarg(nanny):
        cluster = LocalCluster(
            typo_kwarg="foo",
            processes=nanny,
            n_workers=1,
            dashboard_address=":0",
            asynchronous=True,
        )
        if nanny:
            ctx = raises_with_cause(
                RuntimeError, None, TypeError, "unexpected keyword argument"
            )
        else:
            ctx = pytest.raises(TypeError, match="unexpected keyword argument")
        with ctx:
            # Extra arguments are forwarded to the worker class. Depending on
            # whether we use the nanny or not, the error treatment is quite
            # different and we should assert that an exception is raised
>           async with cluster:

distributed/deploy/tests/test_local.py:1170: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/spec.py:480: in __aenter__
    await self
distributed/deploy/spec.py:426: in _
    await self._correct_state()
distributed/deploy/spec.py:396: in _correct_state_internal
    await asyncio.gather(*worker_futs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:650: in _wrap_awaitable
    return (yield from awaitable.__await__())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Nanny: None, threads: 4>

    @final
    async def start(self):
        async with self._startup_lock:
            if self.status == Status.failed:
                assert self.__startup_exc is not None
                raise self.__startup_exc
            elif self.status != Status.init:
                return self
            timeout = getattr(self, "death_timeout", None)
    
            async def _close_on_failure(exc: Exception) -> None:
                await self.close(reason=f"failure-to-start-{type(exc)}")
                self.status = Status.failed
                self.__startup_exc = exc
    
            try:
                await wait_for(self.start_unsafe(), timeout=timeout)
            except asyncio.TimeoutError as exc:
                await _close_on_failure(exc)
                raise asyncio.TimeoutError(
                    f"{type(self).__name__} start timed out after {timeout}s."
                ) from exc
            except Exception as exc:
                await _close_on_failure(exc)
>               raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
E               RuntimeError: Nanny failed to start.

distributed/core.py:536: RuntimeError

During handling of the above exception, another exception occurred:

nanny = True

    @pytest.mark.parametrize("nanny", [True, False])
    @gen_test()
    async def test_local_cluster_redundant_kwarg(nanny):
        cluster = LocalCluster(
            typo_kwarg="foo",
            processes=nanny,
            n_workers=1,
            dashboard_address=":0",
            asynchronous=True,
        )
        if nanny:
            ctx = raises_with_cause(
                RuntimeError, None, TypeError, "unexpected keyword argument"
            )
        else:
            ctx = pytest.raises(TypeError, match="unexpected keyword argument")
>       with ctx:

distributed/deploy/tests/test_local.py:1166: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
distributed/utils_test.py:2112: in raises_with_cause
    raise exc
distributed/core.py:528: in start
    await wait_for(self.start_unsafe(), timeout=timeout)
distributed/utils.py:1933: in wait_for
    return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:408: in wait_for
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Nanny: None, threads: 4>

    async def start_unsafe(self):
        """Start nanny, start local process, start watching"""
    
        await super().start_unsafe()
    
        ports = parse_ports(self._start_port)
        for port in ports:
            start_address = address_from_user_args(
                host=self._start_host,
                port=port,
                interface=self._interface,
                protocol=self._protocol,
                security=self.security,
            )
            try:
                await self.listen(
                    start_address, **self.security.get_listen_args("worker")
                )
            except OSError as e:
                if len(ports) > 1 and e.errno == errno.EADDRINUSE:
                    continue
                else:
                    raise
            else:
                self._start_address = start_address
                break
        else:
            raise ValueError(
                f"Could not start Nanny on host {self._start_host} "
                f"with port {self._start_port}"
            )
    
        self.ip = get_address_host(self.address)
    
        await self.preloads.start()
        saddr = self.scheduler.addr
        comm = await self.rpc.connect(saddr)
        comm.name = "Nanny->Scheduler (registration)"
    
        try:
            await comm.write({"op": "register_nanny", "address": self.address})
            msg = await comm.read()
            try:
                for name, plugin in msg["nanny-plugins"].items():
                    await self.plugin_add(plugin=plugin, name=name)
    
                logger.info("        Start Nanny at: %r", self.address)
                response = await self.instantiate()
    
                if response != Status.running:
>                   raise RuntimeError("Nanny failed to start worker process")
E                   RuntimeError: Nanny failed to start worker process

distributed/nanny.py:372: RuntimeError

Check warning on line 0 in distributed.http.scheduler.tests.test_scheduler_http

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_prometheus_collect_worker_states (distributed.http.scheduler.tests.test_scheduler_http)

artifacts/windows-latest-3.14-default-notci1/pytest.xml [took 3s]
Raw output
TimeoutError: Test timeout (3) hit after 3.004400799999985s.
========== Test stack trace starts here ==========
Stack for <Task pending name='Task-43842' coro=<test_prometheus_collect_worker_states() running at D:\a\distributed\distributed\distributed\http\scheduler\tests\test_scheduler_http.py:523> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[shield.<locals>._clear_awaited_by_callback() at C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\tasks.py:975, _log_on_exception() at C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\tasks.py:911]> (most recent call last):
  File "D:\a\distributed\distributed\distributed\http\scheduler\tests\test_scheduler_http.py", line 523, in test_prometheus_collect_worker_states
    assert await fetch_metrics() == {
args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)
                   ^^^^^^^^^^^^^^^^^^^

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\contextlib.py:85: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\contextlib.py:85: in inner
    return func(*args, **kwds)
           ^^^^^^^^^^^^^^^^^^^
distributed\utils_test.py:1093: in test_func
    return _run_and_close_tornado(async_fn_outer)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
distributed\utils_test.py:381: in _run_and_close_tornado
    return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\runners.py:204: in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\runners.py:127: in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\base_events.py:719: in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
distributed\utils_test.py:378: in inner_fn
    return await async_fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
distributed\utils_test.py:1090: in async_fn_outer
    return await utils_wait_for(async_fn(), timeout=timeout * 2)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
distributed\utils.py:1928: in wait_for
    return await fut
           ^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    async def async_fn():
        result = None
        with dask.config.set(config):
            async with (
                _cluster_factory() as (s, workers),
                _client_factory(s) as c,
            ):
                args = [s] + workers
                if c is not None:
                    args = [c] + args
                try:
                    coro = func(*args, *outer_args, **kwargs)
                    task = asyncio.create_task(coro)
                    coro2 = utils_wait_for(
                        asyncio.shield(task), timeout=deadline.remaining
                    )
                    result = await coro2
                    validate_state(s, *workers)
    
                except asyncio.TimeoutError:
                    assert task
                    elapsed = deadline.elapsed
                    buffer = io.StringIO()
                    # This stack indicates where the coro/test is suspended
                    task.print_stack(file=buffer)
    
                    task.cancel()
                    while not task.cancelled():
                        await asyncio.sleep(0.01)
    
                    # Hopefully, the hang has been caused by inconsistent
                    # state, which should be much more meaningful than the
                    # timeout
                    validate_state(s, *workers)
    
                    # Remove as much of the traceback as possible; it's
                    # uninteresting boilerplate from utils_test and asyncio
                    # and not from the code being tested.
>                   raise asyncio.TimeoutError(
                        f"Test timeout ({timeout}) hit after {elapsed}s.\n"
                        "========== Test stack trace starts here ==========\n"
                        f"{buffer.getvalue()}"
                    ) from None
E                   TimeoutError: Test timeout (3) hit after 3.004400799999985s.
E                   ========== Test stack trace starts here ==========
E                   Stack for <Task pending name='Task-43842' coro=<test_prometheus_collect_worker_states() running at D:\a\distributed\distributed\distributed\http\scheduler\tests\test_scheduler_http.py:523> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[shield.<locals>._clear_awaited_by_callback() at C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\tasks.py:975, _log_on_exception() at C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\tasks.py:911]> (most recent call last):
E                     File "D:\a\distributed\distributed\distributed\http\scheduler\tests\test_scheduler_http.py", line 523, in test_prometheus_collect_worker_states
E                       assert await fetch_metrics() == {

distributed\utils_test.py:1044: TimeoutError

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_combo_of_layer_types (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 7e54d06afbe834e5d4b7981fbded9282 failed during barrier phase
>   return get_worker_plugin().barrier(id, run_ids)
    ^^^^^^^^^^^^^^^^^

distributed/shuffle/_core.py:575: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_worker_plugin.py:390: in barrier
    result = sync(self.worker.loop, self._barrier, shuffle_id, run_ids)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:361: in _barrier
    shuffle_run = await self.shuffle_runs.get_most_recent(shuffle_id, run_ids)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:176: in get_most_recent
    return await self.get_with_run_id(shuffle_id=shuffle_id, run_id=max(run_ids))
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34871', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:37983', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45925', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_combo_of_layer_types(c, s, a, b):
        """Check pack/unpack of a HLG that has everything!"""
    
        def add(x, y, z, extra_arg):
            return x + y + z + extra_arg
    
        y = c.submit(lambda x: x, 2)
        z = c.submit(lambda x: x, 3)
        xx = await c.submit(lambda x: x + 1, y)
        x = da.blockwise(
            add,
            "x",
            da.zeros((3,), chunks=(1,)),
            "x",
            da.ones((3,), chunks=(1,)),
            "x",
            y,
            None,
            concatenate=False,
            dtype=int,
            extra_arg=z,
        )
    
        df = dd.from_pandas(pd.DataFrame({"a": np.arange(3)}), npartitions=3)
        df = df.shuffle("a")
        df = df["a"].to_dask_array()
    
        res = x.sum() + df.sum()
>       res = await c.compute(res, optimize_graph=False)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/protocol/tests/test_highlevelgraph.py:47: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during barrier phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 7e54d06afbe834e5d4b7981fbded9282 failed during barrier phase

distributed/shuffle/_core.py:583: RuntimeError

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_shuffle (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P f4cb5f8c84e2eced853a9afe0e1088b7 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45549', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45821', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33983', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_shuffle(c, s, a, b):
        """Check pack/unpack of a shuffled dataframe"""
    
        df = dd.from_pandas(
            pd.DataFrame(
                {"a": np.arange(10, dtype=int), "b": np.arange(10, 0, -1, dtype=float)}
            ),
            npartitions=5,
        )
        df = df.shuffle("a", max_branch=2)
        df = df["a"] + df["b"]
>       res = await c.compute(df, optimize_graph=False)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/protocol/tests/test_highlevelgraph.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_shuffle.py:548: in _shuffle_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P f4cb5f8c84e2eced853a9afe0e1088b7 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_dataframe_annotations (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P f4cb5f8c84e2eced853a9afe0e1088b7 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42791', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40031', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45563', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_dataframe_annotations(c, s, a, b):
        retries = 5
        plugin = ExampleAnnotationPlugin(retries=retries)
        s.add_plugin(plugin)
    
        assert plugin in s.plugins.values()
    
        df = dd.from_pandas(
            pd.DataFrame(
                {"a": np.arange(10, dtype=int), "b": np.arange(10, 0, -1, dtype=float)}
            ),
            npartitions=5,
        )
        df = df.shuffle("a", max_branch=2)
        acol = df["a"]
        bcol = df["b"]
    
        ctx = pytest.warns(
            UserWarning, match="Annotations will be ignored when using query-planning"
        )
    
        with dask.annotate(retries=retries), ctx:
            df = acol + bcol
    
        with dask.config.set(optimization__fuse__active=False):
>           rdf = await c.compute(df)
                  ^^^^^^^^^^^^^^^^^^^

distributed/protocol/tests/test_highlevelgraph.py:184: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_shuffle.py:548: in _shuffle_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P f4cb5f8c84e2eced853a9afe0e1088b7 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_basic_state (distributed.shuffle.tests.test_graph)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 1s]
Raw output
RuntimeError: P2P 3444f076d5f5a6e0f34234b48a4616f8 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35359', workers: 0, cores: 0, tasks: 0>
workers = (<Worker 'tcp://127.0.0.1:33711', name: 0, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>, <W... 0>, <Worker 'tcp://127.0.0.1:35317', name: 3, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>)
df = Dask DataFrame Structure:
                  name     id        x        y
npartitions=12                              ...      ...      ...
Dask Name: assign, 5 expressions
Expr=Assign(frame=ArrowStringConversion(frame=Timeseries(4f864de)))
shuffled = Dask DataFrame Structure:
                  name     id        x        y
npartitions=12                              ...  ...
                   ...    ...      ...      ...
Dask Name: rearrangebycolumn, 6 expressions
Expr=Shuffle(67c034a)
exts = [<ShuffleWorkerPlugin, worker='tcp://127.0.0.1:33711', closed=True>, <ShuffleWorkerPlugin, worker='tcp://127.0.0.1:405...ugin, worker='tcp://127.0.0.1:40437', closed=True>, <ShuffleWorkerPlugin, worker='tcp://127.0.0.1:35317', closed=True>]

    @gen_cluster([("", 2)] * 4, client=True)
    async def test_basic_state(c, s, *workers):
        df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
        df["name"] = df["name"].astype("string[python]")
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            shuffled = df.shuffle("id")
    
        exts = [w.extensions["shuffle"] for w in workers]
        for ext in exts:
            assert not ext.shuffle_runs._active_runs
    
        f = c.compute(shuffled)
        # TODO this is a bad/pointless test. the `f.done()` is necessary in case the shuffle is really fast.
        # To test state more thoroughly, we'd need a way to 'stop the world' at various stages. Like have the
        # scheduler pause everything when the barrier is reached. Not sure yet how to implement that.
        while (
            not all(len(ext.shuffle_runs._active_runs) == 1 for ext in exts)
            and not f.done()
        ):
            await asyncio.sleep(0.1)
    
>       await f

distributed/shuffle/tests/test_graph.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_shuffle.py:548: in _shuffle_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 3444f076d5f5a6e0f34234b48a4616f8 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[idx-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 206f0eca68ca2cfa15567ae91cd0f4bc failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33705', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44051', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45357', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = 'idx', how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 206f0eca68ca2cfa15567ae91cd0f4bc failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[idx-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P dcf9bafa7a58f97b088b04595cd3ccfd failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45847', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36389', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33725', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = 'idx', how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P dcf9bafa7a58f97b088b04595cd3ccfd failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[idx-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 22b6257ef6e3796997899bf59c7df879 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39855', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40901', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41891', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = 'idx', how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 22b6257ef6e3796997899bf59c7df879 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[idx-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 9e7d2b78fa376ed71c9af62007dddfda failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44635', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38431', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34933', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = 'idx', how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 9e7d2b78fa376ed71c9af62007dddfda failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on1-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P d31b810f306228c2ca222b969a6b92df failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35621', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:33903', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43723', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P d31b810f306228c2ca222b969a6b92df failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on1-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 501760df587737640f5a1eec72863028 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35763', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35415', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35653', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 501760df587737640f5a1eec72863028 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on1-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 6bee7323d8fc9c25c63911fb4dc9fc8b failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39325', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36313', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:44699', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 6bee7323d8fc9c25c63911fb4dc9fc8b failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on1-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 35b0364d85fe95bc2736092d8dd37d93 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45131', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34813', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45259', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 35b0364d85fe95bc2736092d8dd37d93 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on2-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P d3a8bf446deea7972fa406832880e73a failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34945', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45215', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:44461', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx', 'k'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P d3a8bf446deea7972fa406832880e73a failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on2-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 6c90f5c14ba69612a93b1ebba37d3a58 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41295', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38663', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45123', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx', 'k'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 6c90f5c14ba69612a93b1ebba37d3a58 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on2-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 31f3f16f1eb0c4f6088b68741b9fa771 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35121', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39047', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46787', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx', 'k'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 31f3f16f1eb0c4f6088b68741b9fa771 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on2-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 19ccd87539722d215f1f8d017d14cfda failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43909', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44337', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38109', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['idx', 'k'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 19ccd87539722d215f1f8d017d14cfda failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on3-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P e43b6599f063ce57cac9de024f81dcbd failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39133', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45409', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46405', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['k', 'idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P e43b6599f063ce57cac9de024f81dcbd failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on3-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 6e493ed888b674f8eec951432024ad0f failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45679', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43219', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36013', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['k', 'idx'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 6e493ed888b674f8eec951432024ad0f failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on3-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P fcbf618b4697ff0cc1ec801d849dc770 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43335', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45245', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34803', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['k', 'idx'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P fcbf618b4697ff0cc1ec801d849dc770 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_known_to_unknown[on3-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 7d27ee2541fed690a9a3afcd21af7129 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44197', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43985', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38099', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
on = ['k', 'idx'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 7d27ee2541fed690a9a3afcd21af7129 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_unknown_to_known[idx-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P a5f5c993fd03311925e5fb847962d450 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:32965', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:33947', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41765', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
on = 'idx', how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P a5f5c993fd03311925e5fb847962d450 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_unknown_to_known[idx-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 615691226594e8886e7381edd1ade209 failed during transfer phase
>   yield

distributed/shuffle/_core.py:524: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:348: in add_partition
    return shuffle_run.add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:369: in add_partition
    sync(self._loop, self._write_to_comm, shards)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:273: in _write_to_comm
    await self._comm_buffer.write(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_comms.py:70: in _process
    response = await self.send(address, shards)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:244: in send
    return await retry(
    ^^^^^^^^^^^^^^^^^
distributed/utils_comm.py:385: in retry
    return await coro()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:222: in _send
    return await self.rpc(address).shuffle_receive(
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1259: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:1043: in send_recv
    raise exc.with_traceback(tb)
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33735', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38837', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40267', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
on = 'idx', how = 'left'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 615691226594e8886e7381edd1ade209 failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

2 out of 13 runs failed: test_merge_unknown_to_known[idx-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P 358840fb74c826c9c1368faa4963e94a failed during transfer phase
>   result = await result
    ^^^^^^^^^^^^^^^^^

distributed/core.py:834: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_worker_plugin.py:316: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:371: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:524: in handle_transfer_errors
    yield
distributed/shuffle/_shuffle.py:56: in shuffle_transfer
    return get_worker_plugin().add_partition(
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:347: in add_partition
    shuffle_run = self.get_or_create_shuffle(id)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:410: in get_or_create_shuffle
    return sync(
distributed/utils.py:457: in sync
    raise error
distributed/utils.py:431: in f
    result = yield future
    ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/gen.py:783: in run
    value = future.result()
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:152: in get_or_create
    raise shuffle_run._exception
    ^^^^^^^^^^^^^^^^^
distributed/core.py:834: in _handle_comm
    result = await result
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_worker_plugin.py:317: in shuffle_receive
    return await shuffle_run.receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:330: in receive
    await self._receive(data)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:263: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_core.py:256: in offload
    return await run_in_executor_with_context(
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1525: in run_in_executor_with_context
    return await loop.run_in_executor(
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/utils.py:1526: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:274: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:135: in split_by_partition
    partitions = t.select([column]).to_pandas()[column].unique()
      ^^^^^^^^^^^^^^^^^
pyarrow/array.pxi:884: in pyarrow.lib._PandasConvertible.to_pandas
    ???
pyarrow/table.pxi:4192: in pyarrow.lib.Table._to_pandas
    ???
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:776: in table_to_dataframe
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in _table_to_blocks
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:1131: in <listcomp>
    return [_reconstruct_block(item, columns, extension_columns)
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pyarrow/pandas_compat.py:739: in _reconstruct_block
    block = _int.make_block(block_arr, placement=placement)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   warnings.warn(
      ^^^^^^^^^^^^^
        # GH#56815
        "make_block is deprecated and will be removed in a future version. "
        "Use pd.api.internals.create_dataframe_from_blocks or "
        "(recommended) higher-level public APIs instead.",
        Pandas4Warning,
        stacklevel=2,
    )
E   pandas.errors.Pandas4Warning: make_block is deprecated and will be removed in a future version. Use pd.api.internals.create_dataframe_from_blocks or (recommended) higher-level public APIs instead.

../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/pandas/core/internals/api.py:101: Pandas4Warning

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34899', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43377', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42335', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...           ...    ...
                  ...    ...
Dask Name: return_input, 2 expressions
Expr=ClearDivisions(frame=df)
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...     ...    ...
10                ...    ...
11                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
on = 'idx', how = 'right'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:414: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.11/site-packages/dask/dataframe/dask_expr/_merge.py:823: in assign_index_merge_transfer
    return merge_transfer(df, id, input_partition)
    ^^^^^^^^^^^^^^^^^
distributed/shuffle/_merge.py:26: in merge_transfer
    return shuffle_transfer(
      ^^^^^^^^^^^^^^^^^
distributed/shuffle/_shuffle.py:55: in shuffle_transfer
    with handle_transfer_errors(id):
      ^^^^^^^^^^^^^^^^^
../../../miniconda3/envs/dask-distributed/lib/python3.11/contextlib.py:158: in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P {id} failed during transfer phase") from e
    ^^^^^^^^^^^^^^^^^
E   RuntimeError: P2P 358840fb74c826c9c1368faa4963e94a failed during transfer phase

distributed/shuffle/_core.py:532: RuntimeError