Reuse Comm objects in Scheduler.broadcast#9083
Merged
jacobtomlinson merged 2 commits intodask:mainfrom Jun 3, 2025
Merged
Conversation
This updates Scheduler.broadcast to reuse (i.e. *not* close) the Comm object that sends the message to the worker. This is motivated by the UCX comms, which are relatively expensive to create and destroy. We noticed that a simple ``client.run`` (which uses ``Scheduler.broadcast`` internally) took ~100s of ms to complete with UCX, most of which was spent creating and destroying Comm objects. With this change, subsequent ``client.run`` calls take <10ms. I've provided a new config option `distributed.scheduler.reuse-broadcast-comm` for users who want the previous behavior. But based on https://github.com/dask/distributed/pull/3766/files, folks agreed that this was a better default behavior. That change didn't quite get the desired behavior because of the `close=True` passed to `send_recv`.
Member
Author
|
This shows a smaller, but still nice improvement for the default TCP comm: So about 40ms per broadcast. With comm reuse: In [10]: dask.config.set({"distributed.scheduler.reuse-broadcast-comm": True})
Out[10]: <dask.config.set at 0x7f81e8cd65a0>
In [11]: %time _ = client.run(lambda: None)
CPU times: user 18.2 ms, sys: 11.9 ms, total: 30 ms
Wall time: 29.2 ms
In [12]: %time _ = client.run(lambda: None)
CPU times: user 7.64 ms, sys: 3.55 ms, total: 11.2 ms
Wall time: 10.6 msThe initial one is slightly faster (doesn't have to close the sockets?) and subsequent ones are about 4x faster. |
Contributor
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 11h 11m 53s ⏱️ - 7m 51s For more details on these failures and errors, see this check. Results for commit a14d540. ± Comparison against base commit 6d3f6eb. This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Closed
jacobtomlinson
approved these changes
Jun 3, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This updates Scheduler.broadcast to reuse (i.e. not close) the Comm object that sends the message to the worker. This is motivated by the UCX comms, which are relatively expensive to create and destroy. We noticed that a simple
client.run(which usesScheduler.broadcastinternally) took ~100s of ms to complete with UCX, most of which was spent creating and destroying Comm objects. With this change, subsequentclient.runcalls take <10ms.I've provided a new config option
distributed.scheduler.reuse-broadcast-commfor users who want the previous behavior. But based on https://github.com/dask/distributed/pull/3766/files, folks agreed that this was a better default behavior. That change didn't quite get the desired behavior because of theclose=Truepassed tosend_recv.