Skip to content

Commit d6b9ff7

Browse files
authored
Merge pull request #6698 from cylc/8.4.x-sync
🤖 Merge 8.4.x-sync into master
2 parents 4b9442d + d4c88d3 commit d6b9ff7

File tree

4 files changed

+91
-26
lines changed

4 files changed

+91
-26
lines changed

changes.d/6639.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ensure that shutdown event handlers are killed if they exceed the process pool timeout.

cylc/flow/subprocpool.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from tempfile import SpooledTemporaryFile
2626
from threading import RLock
2727
from time import time
28-
from subprocess import DEVNULL, run # nosec
28+
from subprocess import DEVNULL, TimeoutExpired, run # nosec
2929
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set
3030

3131
from cylc.flow import LOG, iter_entry_points
@@ -365,18 +365,38 @@ def put_command(
365365
)
366366

367367
@classmethod
368-
def run_command(cls, ctx):
368+
def run_command(cls, ctx, callback: Optional[Callable] = None):
369369
"""Execute command in ctx and capture its output and exit status.
370370
371+
Kills the subprocess if it exceeds the subprocess pool timeout.
372+
371373
Arguments:
372374
ctx (cylc.flow.subprocctx.SubProcContext):
373375
A context object containing the command to run and its status.
376+
callback:
377+
Optional callback function.
374378
"""
379+
timeout = glbl_cfg().get(['scheduler', 'process pool timeout'])
380+
375381
proc = cls._run_command_init(ctx)
376-
if proc:
377-
ctx.out, ctx.err = (f.decode() for f in proc.communicate())
382+
if not proc:
383+
return
384+
385+
try:
386+
ctx.out, ctx.err = (
387+
f.decode()
388+
for f in proc.communicate(timeout=float(timeout))
389+
)
390+
except TimeoutExpired:
391+
if _killpg(proc, SIGKILL):
392+
ctx.err = f"killed on timeout ({timeout})"
393+
ctx.ret_code = proc.wait()
394+
else:
378395
ctx.ret_code = proc.wait()
379-
cls._run_command_exit(ctx)
396+
397+
if callback is not None:
398+
callback(ctx)
399+
cls._run_command_exit(ctx)
380400

381401
def set_stopping(self):
382402
"""Stop job submission."""

cylc/flow/workflow_events.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,21 @@ def _send_mail(
302302
env=env,
303303
stdin_str=message
304304
)
305-
if self.proc_pool.closed:
306-
# Run command in foreground if process pool is closed
307-
self.proc_pool.run_command(proc_ctx)
308-
self._run_event_handlers_callback(proc_ctx)
305+
self._run_cmd(proc_ctx, callback=self._run_event_mail_callback)
306+
307+
def _run_cmd(self, ctx, callback):
308+
"""Queue or directly run a command and its callback.
309+
310+
Queue the command to the subprocess pool if possible, or else run it
311+
in the foreground (but still subject to the subprocess pool timeout).
312+
313+
"""
314+
if not self.proc_pool.closed:
315+
# Queue it to the subprocess pool.
316+
self.proc_pool.put_command(ctx, callback=callback)
309317
else:
310-
# Run command using process pool otherwise
311-
self.proc_pool.put_command(
312-
proc_ctx, callback=self._run_event_mail_callback)
318+
# Run it in the foreground.
319+
self.proc_pool.run_command(ctx, callback=callback)
313320

314321
def _run_event_custom_handlers(self, schd, template_variables, event):
315322
"""Helper for "run_event_handlers", custom event handlers."""
@@ -349,23 +356,14 @@ def _run_event_custom_handlers(self, schd, template_variables, event):
349356
env=dict(os.environ),
350357
shell=True # nosec (designed to run user defined code)
351358
)
352-
if self.proc_pool.closed:
353-
# Run command in foreground if abort on failure is set or if
354-
# process pool is closed
355-
self.proc_pool.run_command(proc_ctx)
356-
self._run_event_handlers_callback(proc_ctx)
357-
else:
358-
# Run command using process pool otherwise
359-
self.proc_pool.put_command(
360-
proc_ctx, callback=self._run_event_handlers_callback)
359+
self._run_cmd(proc_ctx, self._run_event_handlers_callback)
361360

362361
@staticmethod
363362
def _run_event_handlers_callback(proc_ctx):
364363
"""Callback on completion of a workflow event handler."""
365364
if proc_ctx.ret_code:
366-
msg = '%s EVENT HANDLER FAILED' % proc_ctx.cmd_key[1]
367365
LOG.error(str(proc_ctx))
368-
LOG.error(msg)
366+
LOG.error(f'{proc_ctx.cmd_key[1]} EVENT HANDLER FAILED')
369367
else:
370368
LOG.info(str(proc_ctx))
371369

tests/integration/test_workflow_events.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import sys
1919

2020
import pytest
21+
from types import MethodType
2122

2223
from cylc.flow.scheduler import SchedulerError
2324

@@ -80,9 +81,9 @@ def _schd(config=None, **opts):
8081
async def test_startup_and_shutdown(test_scheduler, run):
8182
"""Test the startup and shutdown events.
8283
83-
* "statup" should fire every time a scheduler is started.
84-
* "shutdown" should fire every time a scheduler exits in a controlled
85-
fashion (i.e. excluding aborts on unexpected internal errors).
84+
* "startup" should fire every time a scheduler is started.
85+
* "shutdown" should fire every time a scheduler does a controlled exit.
86+
(i.e. excluding aborts on unexpected internal errors).
8687
"""
8788
schd = test_scheduler()
8889
async with run(schd):
@@ -191,3 +192,48 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
191192
async with run(schd2):
192193
await asyncio.sleep(0.1)
193194
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}
195+
196+
197+
async def test_shutdown_handler_timeout_kill(
198+
test_scheduler, run, monkeypatch, mock_glbl_cfg, caplog
199+
):
200+
"""Test shutdown handlers get killed on the process pool timeout.
201+
202+
Has to be done differently as the process pool is closed during shutdown.
203+
See GitHub #6639
204+
205+
"""
206+
def mock_run_event_handlers(self, event, reason=""):
207+
"""To replace scheduler.run_event_handlers(...).
208+
209+
Run workflow event handlers even in simulation mode.
210+
211+
"""
212+
self.workflow_event_handler.handle(self, event, str(reason))
213+
214+
# Configure a long-running shutdown handler.
215+
schd = test_scheduler({'shutdown handlers': 'sleep 10; echo'})
216+
217+
# Set a low process pool timeout value.
218+
mock_glbl_cfg(
219+
'cylc.flow.subprocpool.glbl_cfg',
220+
'''
221+
[scheduler]
222+
process pool timeout = PT1S
223+
'''
224+
)
225+
226+
async with async_timeout(30):
227+
async with run(schd):
228+
# Replace a scheduler method, to call handlers in simulation mode.
229+
monkeypatch.setattr(
230+
schd,
231+
'run_event_handlers',
232+
MethodType(mock_run_event_handlers, schd),
233+
)
234+
await asyncio.sleep(0.1)
235+
236+
assert (
237+
"[('workflow-event-handler-00', 'shutdown') err] killed on timeout (PT1S)"
238+
in caplog.text
239+
)

0 commit comments

Comments
 (0)