Skip to content

Commit cbbfd72

Browse files
authored
Dynamically scale heartbeat and scheduler_info intervals (#9046)
1 parent 0503225 commit cbbfd72

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

distributed/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,7 @@ def __init__(
11641164
"restart": self._handle_restart,
11651165
"error": self._handle_error,
11661166
"event": self._handle_event,
1167+
"adjust-heartbeat-interval": self._adjust_heartbeat_intervals,
11671168
}
11681169

11691170
self._state_handlers = {
@@ -1645,6 +1646,11 @@ def wait_for_workers(self, n_workers: int, timeout: float | None = None) -> None
16451646

16461647
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)
16471648

1649+
def _adjust_heartbeat_intervals(self, interval):
1650+
"""Adjust the heartbeat intervals for the client and scheduler"""
1651+
self._periodic_callbacks["heartbeat"].callback_time = interval * 1000
1652+
self._periodic_callbacks["scheduler-info"].callback_time = interval * 1000
1653+
16481654
def _heartbeat(self):
16491655
# Don't send heartbeat if scheduler comm or cluster are already closed
16501656
if self.scheduler_comm is not None and not (

distributed/scheduler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5701,6 +5701,14 @@ def client_heartbeat(self, client: str) -> None:
57015701
"""Handle heartbeats from Client"""
57025702
cs = self.clients[client]
57035703
cs.last_seen = time()
5704+
self.client_comms[client].send(
5705+
{
5706+
"op": "adjust-heartbeat-interval",
5707+
# heartbeat_interval is used for workers
5708+
# We don't require the clients to heartbeat this often
5709+
"interval": heartbeat_interval(len(self.clients)) * 10,
5710+
}
5711+
)
57045712

57055713
###################
57065714
# Task Validation #

distributed/tests/test_client.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8257,3 +8257,27 @@ def f(x, ev):
82578257
ev.set()
82588258
with pytest.raises(FutureCancelledError):
82598259
future.result()
8260+
8261+
8262+
@gen_cluster(client=True, nthreads=[])
8263+
async def test_adjust_heartbeat(c, s):
8264+
assert "heartbeat" in c._periodic_callbacks
8265+
heartbeat_pc = c._periodic_callbacks["heartbeat"]
8266+
scheduler_info_pc = c._periodic_callbacks["scheduler-info"]
8267+
assert 0 < (initial_value := heartbeat_pc.callback_time) < 10_000
8268+
8269+
clients = []
8270+
for _ in range(20):
8271+
clients.append(Client(s.address, asynchronous=True))
8272+
await asyncio.gather(*clients)
8273+
8274+
while initial_value == heartbeat_pc.callback_time:
8275+
await asyncio.sleep(0.1)
8276+
assert (newvalue := heartbeat_pc.callback_time) > initial_value
8277+
assert heartbeat_pc.callback_time == scheduler_info_pc.callback_time
8278+
8279+
await asyncio.gather(*[cc.close() for cc in clients])
8280+
while newvalue == heartbeat_pc.callback_time:
8281+
await asyncio.sleep(0.1)
8282+
assert heartbeat_pc.callback_time == initial_value
8283+
assert heartbeat_pc.callback_time == scheduler_info_pc.callback_time

0 commit comments

Comments
 (0)