Description
Describe the issue:
After upgrading from dask-cloudprovider 2022.8.0
to 2022.10.0
, I've been running into a sporadic issue when attempting to start a FargateCluster. The scheduler task will start but then I see the following error:
# redacted ip
RuntimeError: Cluster failed to start: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 30 s
I've noticed that the scheduler Fargate task starts, but quickly dies. Here are its logs:
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| timestamp | message |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1686169718457 | + '[' '' ']' |
| 1686169718457 | + '[' '' == true ']' |
| 1686169718457 | + CONDA_BIN=/opt/conda/bin/conda |
| 1686169718457 | + '[' -e /opt/app/environment.yml ']' |
| 1686169718457 | + echo 'no environment.yml' |
| 1686169718457 | + '[' '' ']' |
| 1686169718457 | + '[' '' ']' |
| 1686169718457 | + exec dask-scheduler --idle-timeout '10 minutes' |
| 1686169718459 | no environment.yml |
| 1686169719866 | /opt/conda/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py:140: FutureWarning: dask-scheduler is deprecated and will be removed in a future release; use `dask scheduler` instead |
| 1686169719866 | warnings.warn( |
| 1686169719866 | 2023-06-07 20:28:39,866 - distributed.scheduler - INFO - ----------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
I never had this error when on dask-cloudprovider 2022.8.0. I've found that sometimes on 2022.10.0 I can get the client to connect to the scheduler, but it only works maybe 10% of the time.
I had thought that increasing the timeout could help, so I tried:
import dask
dask.config.set({"distributed.comm.timeouts.connect": '3600s'})
dask.config.set({"distributed.comm.timeouts.tcp": '3600s'})
But when I launch Fargate cluster I get the same error but with a different timeout. However, the new timeout is not respected - this error comes up after a few minutes, not an hour: OSError: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s
.
Minimal Complete Verifiable Example:
from dask_cloudprovider.aws import FargateCluster
# some values partially redacted
cluster_kwargs = dict(
cluster_arn="arn:aws:ecs:us-east-1:<ACCNT>:cluster/datascientist-dev",
execution_role_arn="arn:aws:iam::<ACCNT>:role/datascientist/DataScienceEcsTaskExecutionRole",
task_role_arn="arn:aws:iam::<ACCNT>:role/datascientist/DataScienceDaskRole",
cloudwatch_logs_group="/datascientist/datascientist-dev",
vpc="vpc-<VPC>",
subnets=["subnet-<SUBNET>"],
security_groups=["sg-<SG>"],
)
cluster = FargateCluster(
fargate_use_private_ip=True,
n_workers=2,
# Cluster image name/tag are baked into the client image
image=f"{os.getenv('CLUSTER_IMAGE')}",
scheduler_timeout="10 minutes",
skip_cleanup=True,
**cluster_kwargs,
)
Anything else we need to know?:
I'm using the same code versions on the client and cluster side. Also, I can confirm that when keeping all of the other software libraries the same, but just downgrading to 2022.08.0 makes everything work smoothly.
Here's the full error traceback:
---------------------------------------------------------------------------
StreamClosedError Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:225, in TCP.read(self, deserializers)
224 try:
--> 225 frames_nbytes = await stream.read_bytes(fmt_size)
226 (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
CommClosedError Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py:373, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
370 try:
371 # This would be better, but connections leak if worker is closed quickly
372 # write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
--> 373 handshake = await wait_for(comm.read(), time_left())
374 await wait_for(comm.write(local_info), time_left())
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:1878, in wait_for(fut, timeout)
1877 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1878 return await asyncio.wait_for(fut, timeout)
File /opt/conda/lib/python3.8/asyncio/tasks.py:494, in wait_for(fut, timeout, loop)
493 if fut.done():
--> 494 return fut.result()
495 else:
File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:241, in TCP.read(self, deserializers)
240 if not sys.is_finalizing():
--> 241 convert_stream_closed_error(self, e)
242 except BaseException:
243 # Some OSError, CancelledError or a another "low-level" exception.
244 # We do not really know what was already read from the underlying
245 # socket, so it is not even safe to retry here using the same stream.
246 # The only safe thing to do is to abort.
247 # (See also GitHub #4133, #6548).
File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:144, in convert_stream_closed_error(obj, exc)
143 else:
--> 144 raise CommClosedError(f"in {obj}: {exc}") from exc
CommClosedError: in <TCP (closed) local=tcp://172.18.0.2:59150 remote=tcp://xx.yy.zz.xyza:8786>: Stream is closed
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:331, in SpecCluster._start(self)
326 self.scheduler_comm = rpc(
327 getattr(self.scheduler, "external_address", None)
328 or self.scheduler.address,
329 connection_args=self.security.get_connection_args("client"),
330 )
--> 331 await super()._start()
332 except Exception as e: # pragma: no cover
File /opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py:127, in Cluster._start(self)
126 async def _start(self):
--> 127 comm = await self.scheduler_comm.live_comm()
128 comm.name = "Cluster worker status"
File /opt/conda/lib/python3.8/site-packages/distributed/core.py:1235, in rpc.live_comm(self)
1234 if not open or comm.closed():
-> 1235 comm = await connect(
1236 self.address,
1237 self.timeout,
1238 deserialize=self.deserialize,
1239 **self.connection_args,
1240 )
1241 comm.name = "rpc"
File /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py:378, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
377 await comm.close()
--> 378 raise OSError(
379 f"Timed out during handshake while connecting to {addr} after {timeout} s"
380 ) from exc
382 comm.remote_info = handshake
OSError: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[4], line 42
34 tags = {
35 "Environment": "DEVELOPMENT",
36 "Purpose": "Experimenting", # or "Ad-hoc"
37 "Function": "Play", # or "Test"
38 "Initiative": "Other",
39 }
41 # This may take several minutes
---> 42 cluster = FargateCluster(
43 fargate_use_private_ip=True,
44 n_workers=2,
45 # Cluster image name/tag are baked into the client image
46 image=f"{os.getenv('CLUSTER_IMAGE')}",
47 scheduler_timeout="10 minutes",
48 # find_address_timeout=120,
49 tags=tags,
50 skip_cleanup=True,
51 **cluster_kwargs,
52 )
File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:1489, in FargateCluster.__init__(self, **kwargs)
1488 def __init__(self, **kwargs):
-> 1489 super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:800, in ECSCluster.__init__(self, fargate_scheduler, fargate_workers, fargate_spot, image, scheduler_cpu, scheduler_mem, scheduler_port, scheduler_timeout, scheduler_extra_args, scheduler_task_definition_arn, scheduler_task_kwargs, scheduler_address, worker_cpu, worker_nthreads, worker_mem, worker_gpu, worker_extra_args, worker_task_definition_arn, worker_task_kwargs, n_workers, workers_name_start, workers_name_step, cluster_arn, cluster_name_template, execution_role_arn, task_role_arn, task_role_policies, cloudwatch_logs_group, cloudwatch_logs_stream_prefix, cloudwatch_logs_default_retention, vpc, subnets, security_groups, environment, tags, skip_cleanup, aws_access_key_id, aws_secret_access_key, region_name, platform_version, fargate_use_private_ip, mount_points, volumes, mount_volumes_on_scheduler, **kwargs)
798 self._lock = asyncio.Lock()
799 self.session = get_session()
--> 800 super().__init__(**kwargs)
File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:291, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
289 if not called_from_running_loop:
290 self._loop_runner.start()
--> 291 self.sync(self._start)
292 try:
293 self.sync(self._correct_state)
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
349 return future
350 else:
--> 351 return sync(
352 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
353 )
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
416 if error:
417 typ, exc, tb = error
--> 418 raise exc.with_traceback(tb)
419 else:
420 return result
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:391, in sync.<locals>.f()
389 future = wait_for(future, callback_timeout)
390 future = asyncio.ensure_future(future)
--> 391 result = yield future
392 except Exception:
393 error = sys.exc_info()
File /opt/conda/lib/python3.8/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:999, in ECSCluster._start(self)
991 self.scheduler = SchedulerAddress()
993 with warn_on_duration(
994 "10s",
995 "Creating your cluster is taking a surprisingly long time. "
996 "This is likely due to pending resources on AWS. "
997 "Hang tight! ",
998 ):
--> 999 await super()._start()
File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
333 self.status = Status.failed
334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e
RuntimeError: Cluster failed to start: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s
Environment:
- Dask version: 2023.5.0
- Python version: 3.8.16 | packaged by conda-forge | (default, Feb 1 2023, 16:01:55) \n[GCC 11.3.0]
- Operating System: Linux (running in docker container on Mac OS)
- Install method (conda, pip, source): conda from with dask-docker container.
Activity
jacobtomlinson commentedon Jun 12, 2023
@tamas4sunairio @pwerth do either of you have some time to look into this?
jgdwyer commentedon Jun 23, 2023
@jacobtomlinson @tamas4sunairio @pwerth If you have any suggestions for adding logging/debugging, I can re-run and provide more details.