Skip to content

Commit e1ae156

Browse files
authored
Add config option for direct-to-workers (#9097)
1 parent 3d2685c commit e1ae156

File tree

4 files changed

+18
-0
lines changed

4 files changed

+18
-0
lines changed

distributed/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,8 @@ def __init__(
10841084
if deserializers is None:
10851085
deserializers = serializers
10861086
self._deserializers = deserializers
1087+
if direct_to_workers is None:
1088+
direct_to_workers = dask.config.get("distributed.client.direct-to-workers")
10871089
self.direct_to_workers = direct_to_workers
10881090
self._previous_as_current = None
10891091

distributed/distributed-schema.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,11 @@ properties:
678678
Configuration settings for Dask Clients
679679
680680
properties:
681+
direct-to-workers:
682+
type: [boolean, 'null']
683+
description:
684+
Whether to connect directly to workers for gather / scatter.
685+
681686
heartbeat:
682687
type: string
683688
description:

distributed/distributed.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ distributed:
201201
OPENBLAS_NUM_THREADS: 1
202202

203203
client:
204+
direct-to-workers: null # Whether to connect directly to workers for gather / scatter
204205
heartbeat: 5s # Interval between client heartbeats
205206
scheduler-info-interval: 2s # Interval between scheduler-info updates
206207
security-loader: null # A callable to load security credentials if none are provided explicitl

distributed/tests/test_client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6002,12 +6002,22 @@ def test_no_threads_lingering():
60026002

60036003
@gen_cluster()
60046004
async def test_direct_async(s, a, b):
6005+
# Keyword option
60056006
async with Client(s.address, asynchronous=True, direct_to_workers=True) as c:
60066007
assert c.direct_to_workers
60076008

60086009
async with Client(s.address, asynchronous=True, direct_to_workers=False) as c:
60096010
assert not c.direct_to_workers
60106011

6012+
# Config option
6013+
with dask.config.set({"distributed.client.direct-to-workers": True}):
6014+
async with Client(s.address, asynchronous=True) as c:
6015+
assert c.direct_to_workers
6016+
6017+
with dask.config.set({"distributed.client.direct-to-workers": False}):
6018+
async with Client(s.address, asynchronous=True) as c:
6019+
assert not c.direct_to_workers
6020+
60116021

60126022
def test_direct_sync(c):
60136023
assert not c.direct_to_workers

0 commit comments

Comments
 (0)