Description
I was profiling a large cluster of about ~800 workers running a workload using worker_client. The scheduler CPU was hopelessly pinned to 100% utilization.
Profiling this easily reveals the culprit to be Scheduler.identity
which is triggered by every client on connection and by every client every 2s, i.e. on a cluster like this the scheduler is hit with 400 requests pre second. The identity is only used to render a nice HTML view of the cluster so this is entirely meaningless for a worker client
I performed a couple of measurements and did some math to get an estimate how expensive this really is...
The Scheduler.identity payload is huge. The identity is returning garbage data for every single worker. For a 800 worker cluster, that's a dict of about 5-7MB in memory.
Dumping this thing through our protocol takes about 1.83ms, i.e. to serve 400 RPS it spends about 73.2% of CPU time just serializing this stuff (not even accounting for the overhead of looping over 800 workers and calling a method, etc.).
On top of that, to calculate the "identity" of a worker we recomputing the "host" of a Worker over and over again (i.e. get_address_host("tcp://1.2.3.4:5678") => 1.2.3.4
. We're calling this about 320000 (400 x 800) times per second in this example. at 800ns per call, that's another 25.6% of CPU time, i.e. at this point we're at roundabout 98% of CPU time for trivial, meaningless work.
(Assuming that the scheduler network could even send that data out)
- Cache WorkerState.host Cache WorkerState host property #9044
- Reduce size of Scheduler.identity Reduce size of scheduler_info #9045
- Disable update-identity on worker clients / make this dynamic to scale with # of clients Scheduler not robust to many connected clients #9043
Can be easily reproduced with the below code snippet (likely with less workers as well). Without setting any additional config, this cluster is rendered entirely disfunctional.
from coiled import Cluster
cluster = Cluster(
n_workers=800,
worker_vm_types=["m6g.medium", "m6g.large"],
scheduler_vm_types=["m6g.large"],
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
from distributed import Client
from distributed import get_client, get_worker
import time
import random
def func_spawn_clients(_, lock, clients_per_task):
# This is using the worker client but we want to stress the scheduler
from distributed.client import _global_clients
worker = get_worker()
with lock:
if not hasattr(worker, "clients"):
worker.clients = []
for _ in range(clients_per_task):
worker._get_client()
_global_clients.clear()
worker.clients.append(worker._client)
worker._client = None
return len(worker.clients)
from dask.utils import SerializableLock
futs = client.map(
func_spawn_clients,
range(1000),
lock=SerializableLock(),
clients_per_task=1,
)
_ = client.gather(futs)
For people running into this, setting the below config value makes everything go away
import dask
dask.config.set({
"distributed.client.scheduler-info-interval": "24h",
})