Description
There appear to be some edge cases or race conditions (sort of) when using adaptive and the lifetime flags to close each worker gracefully.
I started a toy Kubernetes cluster running distributed 2.30.1 with some random data and an aggressive lifetime of 10±2 minutes. Eventually, it shrank to two workers, then the following happened:
Time | Msg |
---|---|
Worker 9 retires normally | |
15:56:03.893 | distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.1.96.241:40383', name: 9, memory: 1555, processing: 0>} |
15:56:03.895 | distributed.scheduler - INFO - Moving 1555 keys to other workers |
15:56:04.103 | distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.96.241:40383', name: 9, memory: 1555, processing: 0> |
15:56:04.103 | distributed.core - INFO - Removing comms to tcp://10.1.96.241:40383 |
Worker 9 comes back after 20s from a new pod with a new IP | |
15:56:23.793 | distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.83.218:42119', name: 9, memory: 0, processing: 0> |
15:56:23.795 | distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.83.218:42119 |
15:56:23.795 | distributed.core - INFO - Starting established connection |
Worker 9 wants to retire again after 9.5 minutes, without any data | |
16:05:50.404 | distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.1.83.218:42119', name: 9, memory: 0, processing: 0>} |
16:05:50.408 | distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.83.218:42119', name: 9, memory: 0, processing: 0> |
16:05:50.408 | distributed.core - INFO - Removing comms to tcp://10.1.83.218:42119 |
Adaptive wants to scale down the cluster to one worker, 13s later, presumably while a new worker 9 is still coming up | |
16:06:03.566 | distributed.scheduler - INFO - Retire worker names (9,) |
16:06:03.566 | distributed.deploy.adaptive - INFO - Retiring workers [9] |
Worker 8 wants to close gracefully | |
16:07:21.217 | distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.1.140.67:38495', name: 8, memory: 1555, processing: 0>} |
16:07:21.222 | distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.140.67:38495', name: 8, memory: 1555, processing: 0> |
16:07:21.222 | distributed.core - INFO - Removing comms to tcp://10.1.140.67:38495 |
16:07:21.249 | distributed.scheduler - INFO - Lost all workers |
When 8 closes, there's no other worker to move keys to. Should the scheduler wait for a new one to come up first, perhaps with a timeout, if this is an adaptive cluster and the closing was triggered by the worker itself? In all other cases you could argue that losing them all is not as big of a deal.
But this wasn't the only unexpected behaviour. I went back in time through the logs and noticed that this wasn't even the first time we lost all workers.
Time | Msg |
---|---|
We're down to workers 8 and 9, adaptive wants to retire the latter | |
14:54:55.570 | distributed.scheduler - INFO - Retire worker names (9,) |
14:54:55.570 | distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.1.196.156:35633', name: 9, memory: 0, processing: 0>} |
14:54:55.570 | distributed.scheduler - INFO - Closing worker tcp://10.1.196.156:35633 |
14:54:55.570 | distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.196.156:35633', name: 9, memory: 0, processing: 0> |
14:54:55.570 | distributed.core - INFO - Removing comms to tcp://10.1.196.156:35633 |
14:54:55.570 | distributed.deploy.adaptive - INFO - Retiring workers [9] |
Worker 8 wants to close gracefully | |
14:55:36.198 | distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.1.115.47:35839', name: 8, memory: 1555, processing: 0>} |
14:55:36.210 | distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.115.47:35839', name: 8, memory: 1555, processing: 0> |
14:55:36.210 | distributed.core - INFO - Removing comms to tcp://10.1.115.47:35839 |
14:55:36.234 | distributed.scheduler - INFO - Lost all workers |
[Repeat 3x: Register worker 8, Retire worker 8. Remove worker 8, warn about all workers lost] | |
Groundhog day: worker 8 comes up for the fourth time | |
15:25:22.492 | distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.101.150:32953', name: 8, memory: 0, processing: 1555> |
15:25:22.494 | distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.101.150:32953 |
15:25:22.494 | distributed.core - INFO - Starting established connection |
Worker 9 comes back from the dead? | |
15:26:00.801 | distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.113.173:36021', name: 9, memory: 0, processing: 0> |
15:26:00.803 | distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.113.173:36021 |
So it takes 30m from the first "Lost all workers" warning for worker 9 to come back. Presumably, it was adaptive that brought it back with a scaleup event and it wasn't just in limbo. That's hard to tell, because adaptive only seems to log scaledown events.
The scenarios above wouldn't be very common if you chose a lifetime of e.g. 24±2 h and would mostly occur in clusters that are idle, so there might not even be anyone around to notice the proverbial tree falling temporarily in the forest. Still, it would be nice to reduce the windows during which we're left without workers and, even better, not to lose the entire contents of distributed memory at all: at some point later, users do notice and wonder if they have done something wrong.
I can try to find my way through the code and work on PRs, but first I'd need to know what kind of solutions would be considered appropriate.