Skip to content

Commit 2af917e

Browse files
committed
Dynamically scale heartbeat and scheduler_info intervals
1 parent 16aa189 commit 2af917e

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

distributed/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,7 @@ def __init__(
11641164
"restart": self._handle_restart,
11651165
"error": self._handle_error,
11661166
"event": self._handle_event,
1167+
"adjust-heartbeat-interval": self._adjust_heartbeat_intervals,
11671168
}
11681169

11691170
self._state_handlers = {
@@ -1643,6 +1644,11 @@ def wait_for_workers(self, n_workers: int, timeout: float | None = None) -> None
16431644

16441645
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)
16451646

1647+
def _adjust_heartbeat_intervals(self, interval):
1648+
"""Adjust the heartbeat intervals for the client and scheduler"""
1649+
self._periodic_callbacks["heartbeat"].callback_time = interval * 1000
1650+
self._periodic_callbacks["scheduler-info"].callback_time = interval * 1000
1651+
16461652
def _heartbeat(self):
16471653
# Don't send heartbeat if scheduler comm or cluster are already closed
16481654
if self.scheduler_comm is not None and not (

distributed/scheduler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5681,6 +5681,14 @@ def client_heartbeat(self, client: str) -> None:
56815681
"""Handle heartbeats from Client"""
56825682
cs = self.clients[client]
56835683
cs.last_seen = time()
5684+
self.client_comms[client].send(
5685+
{
5686+
"op": "adjust-heartbeat-interval",
5687+
# heartbeat_interval is used for workers
5688+
# We don't require the clients to heartbeat this often
5689+
"interval": heartbeat_interval(len(self.clients)) * 10,
5690+
}
5691+
)
56845692

56855693
###################
56865694
# Task Validation #

distributed/tests/test_client.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8253,3 +8253,27 @@ def f(x, ev):
82538253
ev.set()
82548254
with pytest.raises(FutureCancelledError):
82558255
future.result()
8256+
8257+
8258+
@gen_cluster(client=True, nthreads=[])
8259+
async def test_adjust_hearbeat(c, s):
8260+
assert "heartbeat" in c._periodic_callbacks
8261+
heartbeat_pc = c._periodic_callbacks["heartbeat"]
8262+
scheduler_info_pc = c._periodic_callbacks["scheduler-info"]
8263+
assert 0 < (initial_value := heartbeat_pc.callback_time) < 10_000
8264+
8265+
clients = []
8266+
for _ in range(20):
8267+
clients.append(Client(s.address, asynchronous=True))
8268+
await asyncio.gather(*clients)
8269+
8270+
while initial_value == heartbeat_pc.callback_time:
8271+
await asyncio.sleep(0.1)
8272+
assert (newvalue := heartbeat_pc.callback_time) > initial_value
8273+
assert heartbeat_pc.callback_time == scheduler_info_pc.callback_time
8274+
8275+
await asyncio.gather(*[cc.close() for cc in clients])
8276+
while newvalue == heartbeat_pc.callback_time:
8277+
await asyncio.sleep(0.1)
8278+
assert heartbeat_pc.callback_time == initial_value
8279+
assert heartbeat_pc.callback_time == scheduler_info_pc.callback_time

0 commit comments

Comments
 (0)