-
-
Notifications
You must be signed in to change notification settings - Fork 749
Open
Labels
discussionDiscussing a topic with no specific actions yetDiscussing a topic with no specific actions yet
Description
Describe the issue:
I'm trying to run Dask in a SageMaker training/processing job and everything is working fine except that I'm getting an error in the scheduler and workers when calling client.shutdown()
Worker Error:
2025-03-27 00:16:54,367 - distributed.worker - INFO - Stopping worker at tcp://10.0.146.252:43319. Reason: scheduler-close
2025-03-27 00:16:54,367 - distributed.worker - INFO - Removing Worker plugin shuffle
2025-03-27 00:16:54,368 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 298, in write
raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 308, in write
convert_stream_closed_error(self, e)
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>: Stream is closed
2025-03-27 00:16:54,372 - distributed.core - INFO - Received 'close-stream' from tcp://algo-2:8786; closing.
Scheduler Error:
2025-03-27 00:16:54,365 - distributed.scheduler - INFO - Closing scheduler. Reason: unknown
2025-03-27 00:16:54,366 - distributed.scheduler - INFO - Scheduler closing all comms
2025-03-27 00:16:54,367 - distributed.core - INFO - Connection to tcp://10.0.172.211:47714 has been closed.
2025-03-27 00:16:54,367 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.172.211:46717 name: tcp://10.0.172.211:46717 (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,367 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.172.211:46717' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-370249c7007c98cd91d71a31c040470f', 'compute-09b07314d50d8280d4975166465796dd', 'compute-3e7069bd725a0d319b3c4f74619d1a4c', 'compute-a026990cf4b6bd2562eb91ae24fd9257', 'compute-634bdcb70319d9d55105495cf828dc9f'} (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,368 - distributed.core - INFO - Connection to tcp://10.0.146.252:32892 has been closed.
2025-03-27 00:16:54,368 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.146.252:43319 name: tcp://10.0.146.252:43319 (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,368 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.146.252:43319' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-a298a11d0634ccbe5bd1502400ad2a94', 'compute-e5c305c5a3f20519438085130e73ff4d', 'compute-ab4f324c0dbfa011c899b004ba886942', 'compute-85c7289d7ac8c7d7dfaf09bc8546e9ef', 'compute-2de9831faeee24c2827286de6550358d'} (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,369 - distributed.scheduler - INFO - Lost all workers
2025-03-27 00:16:54,370 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://10.0.185.14:8786 remote=tcp://10.0.162.85:34804>
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 263, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
Minimal Complete Verifiable Example:
import asyncio
import json
import os
import time
from typing import Optional
from dask.distributed import Client
from distributed import Scheduler, Worker
SCHEDULER_PORT = 8786
def setup_dask_cluster() -> Optional[Client]:
hosts = json.loads(os.environ["SM_HOSTS"])
current_host = os.environ["SM_CURRENT_HOST"]
client_host, scheduler_host = hosts[0], hosts[1]
scheduler_address = f"tcp://{scheduler_host}:{SCHEDULER_PORT}"
if current_host == client_host:
client = Client(scheduler_address)
client.wait_for_workers(len(hosts) - 2, timeout=30)
return client
async def run_scheduler():
async with Scheduler(host=scheduler_host, port=SCHEDULER_PORT) as scheduler:
await scheduler.finished()
async def run_worker():
async with Worker(scheduler_address) as worker:
await worker.finished()
asyncio.run(run_scheduler() if current_host == scheduler_host else run_worker())
def main():
client = setup_dask_cluster()
if not client:
return # Scheduler or worker node
def compute(x):
time.sleep(1)
return 2 * x
futures = client.map(compute, range(10))
results = client.gather(futures)
print(sum(results))
client.shutdown()
if __name__ == '__main__':
main()Anything else we need to know?:
Environment:
AWS SageMaker processing job
- Dask version: 2025.3.0
- Python version: 3.12
- Operating System: Linux
- Install method (conda, pip, source): pip
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
discussionDiscussing a topic with no specific actions yetDiscussing a topic with no specific actions yet