Skip to content

Commit cf508b9

Browse files
authored
Clean up minimum pyarrow version checks (#9260)
1 parent 89c54b1 commit cf508b9

5 files changed

Lines changed: 6 additions & 35 deletions

File tree

.github/workflows/tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ jobs:
155155
# Increase this value to reset cache if
156156
# continuous_integration/environment-${{ matrix.environment }}.yaml has not
157157
# changed. See also same variable in .pre-commit-config.yaml
158-
CACHE_NUMBER: 0
158+
CACHE_NUMBER: 2
159159
id: cache
160160

161161
- name: Update environment

distributed/protocol/arrow.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@
44

55
from distributed.protocol.serialize import dask_deserialize, dask_serialize
66

7-
if int(pyarrow.__version__.split(".")[0]) < 16:
8-
raise ImportError(
9-
"Need pyarrow >=16.0. See https://arrow.apache.org/docs/python/install.html"
10-
)
11-
127

138
@dask_serialize.register(pyarrow.RecordBatch)
149
def serialize_batch(batch):

distributed/shuffle/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
from __future__ import annotations
22

3-
from distributed.shuffle._arrow import check_minimal_arrow_version
43
from distributed.shuffle._rechunk import rechunk_p2p
54
from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
65
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
76

87
__all__ = [
9-
"check_minimal_arrow_version",
108
"rechunk_p2p",
119
"ShuffleSchedulerPlugin",
1210
"ShuffleWorkerPlugin",

distributed/shuffle/_arrow.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
from pathlib import Path
55
from typing import TYPE_CHECKING
66

7-
from packaging.version import parse
8-
97
from dask.utils import parse_bytes
108

119
if TYPE_CHECKING:
@@ -31,24 +29,6 @@ def check_dtype_support(meta_input: pd.DataFrame) -> None:
3129
raise TypeError("p2p does not support sparse data found in column '{name}'")
3230

3331

34-
def check_minimal_arrow_version() -> None:
35-
"""Verify that the the correct version of pyarrow is installed to support
36-
the P2P extension.
37-
38-
Raises a ModuleNotFoundError if pyarrow is not installed or an
39-
ImportError if the installed version is not recent enough.
40-
"""
41-
minversion = "14.0.1"
42-
try:
43-
import pyarrow as pa
44-
except ModuleNotFoundError:
45-
raise ModuleNotFoundError(f"P2P shuffling requires pyarrow>={minversion}")
46-
if parse(pa.__version__) < parse(minversion):
47-
raise ImportError(
48-
f"P2P shuffling requires pyarrow>={minversion} but only found {pa.__version__}"
49-
)
50-
51-
5232
def concat_tables(tables: Iterable[pa.Table]) -> pa.Table:
5333
import pyarrow as pa
5434

distributed/tests/test_client.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
from distributed.diagnostics.plugin import UploadDirectory, WorkerPlugin
8787
from distributed.metrics import time
8888
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
89-
from distributed.shuffle import check_minimal_arrow_version
9089
from distributed.sizeof import sizeof
9190
from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text
9291
from distributed.utils_test import (
@@ -3417,17 +3416,16 @@ async def test_cancel_clears_processing(c, s, *workers):
34173416

34183417

34193418
def test_default_get(loop_in_thread):
3420-
has_pyarrow = False
34213419
try:
3422-
check_minimal_arrow_version()
3423-
has_pyarrow = True
3424-
except ImportError:
3425-
pass
3420+
from dask.dataframe._compat import HAS_PYARROW
3421+
except ImportError: # No pandas
3422+
HAS_PYARROW = False
3423+
34263424
loop = loop_in_thread
34273425
with cluster() as (s, [a, b]):
34283426
pre_get = dask.base.get_scheduler()
34293427
# These may change in the future but the selection below should not
3430-
distributed_default = "p2p" if has_pyarrow else "tasks"
3428+
distributed_default = "p2p" if HAS_PYARROW else "tasks"
34313429
local_default = "disk"
34323430
assert get_default_shuffle_method() == local_default
34333431
with Client(s["address"], set_as_default=True, loop=loop) as c:

0 commit comments

Comments
 (0)