Skip to content

Commit 491fe7f

Browse files
authored
Expose details about worker start timeout in the exception message (#9092)
so that calling code can have more precise logic about how to handle the error
1 parent 257435b commit 491fe7f

File tree

3 files changed

+36
-7
lines changed

3 files changed

+36
-7
lines changed

distributed/client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
WorkerPlugin,
8787
_get_plugin_name,
8888
)
89+
from distributed.exceptions import WorkerStartTimeoutError
8990
from distributed.metrics import time
9091
from distributed.objects import HasWhat, SchedulerInfo, WhoHas
9192
from distributed.protocol import serialize, to_serialize
@@ -1631,10 +1632,8 @@ def running_workers(info):
16311632

16321633
while running_workers(info) < n_workers:
16331634
if deadline and time() > deadline:
1634-
raise TimeoutError(
1635-
"Only %d/%d workers arrived after %s"
1636-
% (running_workers(info), n_workers, timeout)
1637-
)
1635+
assert timeout is not None
1636+
raise WorkerStartTimeoutError(running_workers(info), n_workers, timeout)
16381637
await asyncio.sleep(0.1)
16391638
info = await self.scheduler.identity(n_workers=-1)
16401639
self._scheduler_identity = SchedulerInfo(info)

distributed/deploy/cluster.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from distributed.compatibility import PeriodicCallback
1919
from distributed.core import Status
2020
from distributed.deploy.adaptive import Adaptive
21+
from distributed.exceptions import WorkerStartTimeoutError
2122
from distributed.metrics import time
2223
from distributed.objects import SchedulerInfo
2324
from distributed.utils import (
@@ -610,9 +611,8 @@ def running_workers(info):
610611

611612
while n_workers and running_workers(self.scheduler_info) < n_workers:
612613
if deadline and time() > deadline:
613-
raise TimeoutError(
614-
"Only %d/%d workers arrived after %s"
615-
% (running_workers(self.scheduler_info), n_workers, timeout)
614+
raise WorkerStartTimeoutError(
615+
running_workers(self.scheduler_info), n_workers, timeout
616616
)
617617
await asyncio.sleep(0.1)
618618

distributed/exceptions.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
from asyncio import TimeoutError
4+
35

46
class Reschedule(Exception):
57
"""Reschedule this task
@@ -13,3 +15,31 @@ class Reschedule(Exception):
1315
load across the cluster has significantly changed since first scheduling
1416
the task.
1517
"""
18+
19+
20+
class WorkerStartTimeoutError(TimeoutError):
21+
"""Raised when the expected number of workers to not start within the timeout period."""
22+
23+
#: Number of workers that are available.
24+
available_workers: int
25+
26+
#: Number of workers that were expected to be available.
27+
expected_workers: int
28+
29+
#: Timeout period in seconds.
30+
timeout: float
31+
32+
def __init__(
33+
self, available_workers: int, expected_workers: int, timeout: float
34+
) -> None:
35+
self.available_workers = available_workers
36+
self.expected_workers = expected_workers
37+
self.timeout = timeout
38+
super().__init__(available_workers, expected_workers, timeout)
39+
40+
def __str__(self) -> str:
41+
return "Only %d/%d workers arrived after %s" % (
42+
self.available_workers,
43+
self.expected_workers,
44+
self.timeout,
45+
)

0 commit comments

Comments
 (0)