Description
Note: If I do the below with adapt(minimum=32, maximum=32), it works repeatably with no failures.
If I throw ~100 tasks at a AWS EC2Cluster with adapt(minimum=1, maximum=32) enabled.
All tasks are roughly equivalent length, ~4 minutes. I wait for them using this:
for future in as_completed(futures):
print(future, future.done())
try:
print(future.result())
except Exception:
if future.traceback():
traceback.print_tb(future.traceback())
print(future.exception())
del future # attempt to clear any state off the remote machine
print()
What I can observe fairly consistently is that we ramp up quickly to 32 machines, then as the tasks are mostly complete, ramp down again, which is fine. However, what seems to happen immediately after ramping down is the running task count bounces straight back up again, and so does adapt. I am GUESSING what's happening is the already completed tasks are forced into some error state, and rescheduled.
Any guidance much appreciated.
On dashboard something like (approximately, it's moving very fast at the end)
Progress: total: 109, in-memory 96, processing 4, waiting: 0, erred: 0
bounces back to
Progress: total: 109, in-memory 5, processing 96, waiting: 0, erred: 8
(FWIW, I'm assuming "in-memory" means task is complete and results are stored in memory?)
On the client side, I see 85 futures already completed successfully BEFORE it "bounces".
<Future: finished, type: NoneType, key: X_2022-08-01_40> True
2022-08-11 18:56:28,676 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'X_2022-08-01_40': ('tcp://172.31.3.223:46345',)}
None
<Future: lost, type: NoneType, key: X_2022-08-01_19> False
On cluster (with some debug added in adapt() just before it calls scale_up/scale_down):
2022-08-11 18:52:20,416 - distributed.deploy.adaptive_core - INFO - adapt up 32 {'n': 32}
2022-08-11 18:56:27,415 - distributed.deploy.adaptive_core - INFO - adapt down 2 {'workers': [256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 127]}
2022-08-11 18:56:28,415 - distributed.deploy.adaptive_core - INFO - adapt down 1 {'workers': [249, 250, 251, 252]}
2022-08-11 18:56:29,423 - distributed.deploy.adaptive_core - INFO - adapt up 32 {'n': 32}
Note it's only a couple of seconds before we bounce back from 1-2 workers back to 32
Scheduler logs:
2022-08-11 18:56:27,420 - distributed.scheduler - INFO - Retire worker names (256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 127)
2022-08-11 18:56:28,420 - distributed.scheduler - INFO - Retire worker names (249, 250, 251, 252)
2022-08-11 18:56:28,665 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.3.223:46345', name: dask-795b1015-worker-dee01138, status: closed, memory: 0, processing: 0>
2022-08-11 18:56:28,672 - distributed.scheduler - ERROR - Couldn't gather keys {'X_2022-08-01_40': ['tcp://172.31.3.223:46345']} state: ['processing'] workers: ['tcp://172.31.3.223:46345'] NoneType: None
2022-08-11 18:56:28,672 - distributed.scheduler - ERROR - Shut down workers that don't have promised key: ['tcp://172.31.3.223:46345'], X_2022-08-01_40 NoneType: None
2022-08-11 18:56:28,700 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.7.74:35551', name: dask-795b1015-worker-9f250668, status: closed, memory: 0, processing: 0>
2022-08-11 18:56:28,707 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.5.212:40673', name: dask-795b1015-worker-b27d4673, status: closed, memory: 0, processing: 0>
2022-08-11 18:56:28,751 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.0.248:45767', name: dask-795b1015-worker-70beb522, status: closed, memory: 0, processing: 0>
2022-08-11 18:56:28,757 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.12.113:35357', name: dask-795b1015-worker-157f0b85, status: closed, memory: 0, processing: 1>
2022-08-11 18:56:28,813 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.6.221:43037', name: dask-795b1015-worker-7c7e95f1, status: closed, memory: 0, processing: 2>
2022-08-11 18:56:28,828 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.14.230:45023', name: dask-795b1015-worker-77518fb4, status: closed, memory: 0, processing: 4>
2022-08-11 18:56:28,895 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.4.182:45027', name: dask-795b1015-worker-f0cd045a, status: closed, memory: 0, processing: 0>
2022-08-11 18:56:28,901 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.12.41:34147', name: dask-795b1015-worker-11788aa1, status: closed, memory: 0, processing: 1>
2022-08-11 18:56:28,907 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.3.232:43185', name: dask-795b1015-worker-e6d89dbd, status: closed, memory: 0, processing: 1>
2022-08-11 18:56:28,921 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.15.94:45413', name: dask-795b1015-worker-92635204, status: closed, memory: 0, processing: 1>
2022-08-11 18:56:28,952 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.5.252:46497', name: dask-795b1015-worker-9519c294, status: closed, memory: 0, processing: 2>
2022-08-11 18:56:28,962 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.10.64:35701', name: dask-795b1015-worker-1ae17e2a, status: closed, memory: 0, processing: 2>
2022-08-11 18:56:28,968 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.15.1:41063', name: dask-795b1015-worker-0a70e1a1, status: closed, memory: 0, processing: 3>
2022-08-11 18:56:28,997 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.5.160:39325', name: dask-795b1015-worker-160c008f, status: closed, memory: 0, processing: 3>
2022-08-11 18:56:29,005 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.6.214:46331', name: dask-795b1015-worker-c456854c, status: closed, memory: 0, processing: 3>
2022-08-11 18:56:29,125 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.0.134:37915', name: dask-795b1015-worker-8cba3e5d, status: closed, memory: 0, processing: 4>
2022-08-11 18:56:29,182 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.15.102:39231', name: dask-795b1015-worker-e1c15f88, status: closed, memory: 0, processing: 4>
2022-08-11 18:56:29,193 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.7.227:34479', name: dask-795b1015-worker-f967a37e, status: closed, memory: 0, processing: 4>
2022-08-11 18:56:29,293 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.4.20:38309', name: dask-795b1015-worker-8e2ab392, status: closed, memory: 0, processing: 5>
2022-08-11 18:56:29,388 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.8.222:42175', name: dask-795b1015-worker-52131d68, status: closed, memory: 0, processing: 5>
2022-08-11 18:56:29,396 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.12.166:45859', name: dask-795b1015-worker-256e9334, status: closed, memory: 0, processing: 7>
2022-08-11 18:56:29,473 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.11.184:40847', name: dask-795b1015-worker-8ba27687, status: closed, memory: 0, processing: 8>
2022-08-11 18:56:29,524 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.7.62:41523', name: dask-795b1015-worker-06db4e55, status: closed, memory: 0, processing: 8>
2022-08-11 18:56:29,545 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.3.141:36171', name: dask-795b1015-worker-b1414db5, status: closed, memory: 0, processing: 10>
2022-08-11 18:56:29,545 - distributed.scheduler - INFO - Task X_2022-08-01_49 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,555 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.5.193:34067', name: dask-795b1015-worker-7ba8fe50, status: closed, memory: 0, processing: 11>
2022-08-11 18:56:29,555 - distributed.scheduler - INFO - Task X_2022-08-01_55 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,603 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.2.121:38459', name: dask-795b1015-worker-fedcd3b1, status: closed, memory: 0, processing: 16>
2022-08-11 18:56:29,603 - distributed.scheduler - INFO - Task X_2022-08-01_73 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,613 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.5.80:39737', name: dask-795b1015-worker-f9c0fcd6, status: closed, memory: 0, processing: 18>
2022-08-11 18:56:29,613 - distributed.scheduler - INFO - Task X_2022-08-01_25 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,613 - distributed.scheduler - INFO - Task X_2022-08-01_101 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,614 - distributed.scheduler - INFO - Task X_2022-08-01_12 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,648 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.31.15.213:34131', name: dask-795b1015-worker-7176d462, status: closed, memory: 0, processing: 23>
2022-08-11 18:56:29,648 - distributed.scheduler - INFO - Task X_2022-08-01_94 marked as failed because 3 workers died while trying to run it
2022-08-11 18:56:29,649 - distributed.scheduler - INFO - Task X_2022-08-01_23 marked as failed because 3 workers died while trying to run it