Skip to content

Commit 0e6fef7

Browse files
committed
add a test of resume with different queue size
1 parent f4f94ae commit 0e6fef7

1 file changed

Lines changed: 71 additions & 0 deletions

File tree

tests/test_resume.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,77 @@ def test_resume(dynamic, delay_frac, with_pool, dyn_pool):
232232
pass
233233

234234

235+
@pytest.mark.parametrize(
236+
"delay_frac",
237+
[.2, .5, .75, .9],
238+
)
239+
@pytest.mark.xdist_group(name="resume_group")
240+
def test_resume_queue_size(delay_frac):
241+
"""
242+
Test we can interrupt and resume nested runs
243+
Note that I used xdist_group here in order to guarantee that if all the
244+
tests are run in parallel, this one is executed in one thread because
245+
I want to only use one getlogz() call.
246+
"""
247+
fname = get_fname(inspect.currentframe().f_code.co_name)
248+
249+
save_every = 1
250+
cache_dt, cache_logz = getlogz(fname, save_every)
251+
npool = 2
252+
curdt, curlogz = [_[False, True] for _ in [cache_dt, cache_logz]]
253+
save_every = min(save_every, curdt / 10)
254+
curdt *= delay_frac
255+
256+
# For spawn context, we need to account for startup time
257+
ready_file = fname + '.ready'
258+
259+
try:
260+
# Always use spawn context to match actual usage
261+
fit_proc = mp.get_context('spawn').Process(
262+
target=fit_main,
263+
args=(fname, False, save_every, npool, False, NEFF0, ready_file))
264+
start_time = time.time()
265+
fit_proc.start()
266+
267+
# Wait for spawn process to be ready before starting timer
268+
while not os.path.exists(ready_file):
269+
time.sleep(0.01)
270+
if time.time() - start_time > 5: # Safety timeout
271+
raise RuntimeError("Process failed to start")
272+
# Account for startup time in the timeout
273+
startup_time = time.time() - start_time
274+
actual_timeout = curdt + startup_time
275+
276+
fit_proc.join(actual_timeout)
277+
# Proceed to terminate only if the process did not finish in time.
278+
if fit_proc.is_alive():
279+
print('terminating', file=sys.stderr)
280+
fit_proc.terminate()
281+
if np.allclose(delay_frac, .2) and not os.path.exists(fname):
282+
warnings.warn(
283+
"The checkpoint file was not created I'm skipping the test"
284+
)
285+
return
286+
287+
with mp.get_context('spawn').Pool(npool + 1) as pool:
288+
fit_resume(fname, False, curlogz, pool=pool)
289+
else:
290+
assert fit_proc.exitcode == 0
291+
finally:
292+
try:
293+
os.unlink(fname)
294+
except: # noqa
295+
pass
296+
try:
297+
os.unlink(fname + '.tmp')
298+
except: # noqa
299+
pass
300+
try:
301+
os.unlink(ready_file)
302+
except: # noqa
303+
pass
304+
305+
235306
@pytest.mark.parametrize("dynamic", [False, True])
236307
def test_save(dynamic):
237308
"""

0 commit comments

Comments
 (0)