Skip to content

Commit 4056592

Browse files
committed
Workaround for test failures
Shutting down a spec cluster shortly after scaling down and up results in `KeyError`s. Instead, explicitly synchronize after scaling down, such that all distributed components have "settled", and we can be more confident that a shutdown in the near future will succeed. (as I understand it, "remove" ops from scaling down are still coming in when shutting down, and they are no longer expected, because the state was changed by scaling up again in between, forgetting about the workers that were in the process of being removed) See dask/distributed#9064 for the upstream reproducer and proposed fix
1 parent 7f463ae commit 4056592

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

tests/executor/test_dask.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,16 @@ def test_local_cluster_snooze():
297297
num_workers = len(ctx.executor.get_available_workers())
298298
assert num_workers == 2 + 1 # +service worker
299299
ctx.executor.snooze_manager.snooze()
300-
time.sleep(1.)
300+
301+
# workaround: cannot call `ctx.close` before the `snooze` operation has
302+
# completely finished, so we need to wait here
303+
# NOTE: once this is fixed upstream in distributed, this should be
304+
# removed, as it also means we are not testing the real sequence here,
305+
# which doesn't wait!
306+
t0 = time.monotonic()
307+
while len(ctx.executor.client.cluster.workers) > 1 and time.monotonic() < t0 + 3:
308+
time.sleep(0.1)
309+
301310
assert len(ctx.executor.get_available_workers()) == 1
302311
ctx.executor.snooze_manager.unsnooze()
303312
assert len(ctx.executor.get_available_workers()) == 2 + 1

tests/server/test_state.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44
import queue
55
from unittest import mock
6+
import time
67

78
from libertem.executor.base import AsyncAdapter
89
from libertem.executor.dask import DaskJobExecutor
@@ -144,6 +145,18 @@ async def test_get_executor_unsnooze():
144145
executor_state.executor.snooze_manager.snooze()
145146
assert executor_state.executor.snooze_manager.is_snoozing
146147

148+
# workaround: cannot call `executor.close` before the `snooze`
149+
# operation has completely finished, so we need to wait here.
150+
# NOTE: once this is fixed upstream in distributed, this should be
151+
# removed, as it also means we are not testing the real sequence here,
152+
# which doesn't wait!
153+
t0 = time.monotonic()
154+
while (
155+
len(executor_state.executor._wrapped.client.cluster.workers) > 1
156+
and time.monotonic() < t0 + 3
157+
):
158+
time.sleep(0.1)
159+
147160
# Getting the executor brings it out of snooze
148161
await executor_state.get_executor()
149162
assert not executor_state.executor.snooze_manager.is_snoozing
@@ -189,6 +202,18 @@ async def test_snooze_explicit_keep_alive():
189202
snoozer.snooze()
190203
assert snoozer.is_snoozing
191204

205+
# workaround: cannot call `executor.close` before the `snooze`
206+
# operation has completely finished, so we need to wait here.
207+
# NOTE: once this is fixed upstream in distributed, this should be
208+
# removed, as it also means we are not testing the real sequence here,
209+
# which doesn't wait!
210+
t0 = time.monotonic()
211+
while (
212+
len(executor_state.executor._wrapped.client.cluster.workers) > 1
213+
and time.monotonic() < t0 + 3
214+
):
215+
time.sleep(0.1)
216+
192217
snoozer.unsnooze()
193218
assert not snoozer.is_snoozing
194219
# these two work without raising an exception:
@@ -225,6 +250,18 @@ async def test_snooze_by_activity(local_cluster_url):
225250
# opportunities to snooze in between:
226251
assert snoozer.is_snoozing
227252

253+
# workaround: cannot call `executor.close` before the `snooze`
254+
# operation has completely finished, so we need to wait here.
255+
# NOTE: once this is fixed upstream in distributed, this should be
256+
# removed, as it also means we are not testing the real sequence here,
257+
# which doesn't wait!
258+
t0 = time.monotonic()
259+
while (
260+
len(executor_state.executor._wrapped.client.cluster.workers) > 1
261+
and time.monotonic() < t0 + 3
262+
):
263+
time.sleep(0.1)
264+
228265
# and this should directly unsnooze the executor
229266
# (we need to change the timeout etc. here, before we trigger the unsnooze,
230267
# to make sure we don't directly snooze again):
@@ -260,6 +297,19 @@ async def test_messages():
260297
await executor_state.set_executor(executor, params)
261298

262299
await asyncio.sleep(0.1)
300+
301+
# workaround: cannot call `executor.close` before the `snooze` operation has
302+
# completely finished, so we need to wait here
303+
# NOTE: once this is fixed upstream in distributed, this should be
304+
# removed, as it also means we are not testing the real sequence here,
305+
# which doesn't wait!
306+
t0 = time.monotonic()
307+
while (
308+
len(executor_state.executor._wrapped.client.cluster.workers) > 1
309+
and time.monotonic() < t0 + 3
310+
):
311+
await asyncio.sleep(0.1)
312+
263313
# these two work without raising an exception:
264314
await executor_state.get_executor()
265315
messages = []

0 commit comments

Comments
 (0)