Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6639.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that shutdown event handlers are killed if they exceed the process pool timeout.
30 changes: 25 additions & 5 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from tempfile import SpooledTemporaryFile
from threading import RLock
from time import time
from subprocess import DEVNULL, run # nosec
from subprocess import DEVNULL, TimeoutExpired, run # nosec
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set

from cylc.flow import LOG, iter_entry_points
Expand Down Expand Up @@ -365,18 +365,38 @@ def put_command(
)

@classmethod
def run_command(cls, ctx):
def run_command(cls, ctx, callback: Optional[Callable] = None):
"""Execute command in ctx and capture its output and exit status.

Kills the subprocess if it exceeds the subprocess pool timeout.

Arguments:
ctx (cylc.flow.subprocctx.SubProcContext):
A context object containing the command to run and its status.
callback:
Optional callback function.
"""
timeout = glbl_cfg().get(['scheduler', 'process pool timeout'])

proc = cls._run_command_init(ctx)
if proc:
ctx.out, ctx.err = (f.decode() for f in proc.communicate())
if not proc:
return

try:
ctx.out, ctx.err = (
f.decode()
for f in proc.communicate(timeout=float(timeout))
)
except TimeoutExpired:
if _killpg(proc, SIGKILL):
ctx.err = f"killed on timeout ({timeout})"
ctx.ret_code = proc.wait()
else:
ctx.ret_code = proc.wait()
cls._run_command_exit(ctx)

if callback is not None:
callback(ctx)
cls._run_command_exit(ctx)

def set_stopping(self):
"""Stop job submission."""
Expand Down
34 changes: 16 additions & 18 deletions cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,21 @@ def _send_mail(
env=env,
stdin_str=message
)
if self.proc_pool.closed:
# Run command in foreground if process pool is closed
self.proc_pool.run_command(proc_ctx)
self._run_event_handlers_callback(proc_ctx)
self._run_cmd(proc_ctx, callback=self._run_event_mail_callback)

def _run_cmd(self, ctx, callback):
"""Queue or directly run a command and its callback.

Queue the command to the subprocess pool if possible, or else run it
in the foreground (but still subject to the subprocess pool timeout).

"""
if not self.proc_pool.closed:
# Queue it to the subprocess pool.
self.proc_pool.put_command(ctx, callback=callback)
else:
# Run command using process pool otherwise
self.proc_pool.put_command(
proc_ctx, callback=self._run_event_mail_callback)
# Run it in the foreground.
self.proc_pool.run_command(ctx, callback=callback)

def _run_event_custom_handlers(self, schd, template_variables, event):
"""Helper for "run_event_handlers", custom event handlers."""
Expand Down Expand Up @@ -349,23 +356,14 @@ def _run_event_custom_handlers(self, schd, template_variables, event):
env=dict(os.environ),
shell=True # nosec (designed to run user defined code)
)
if self.proc_pool.closed:
# Run command in foreground if abort on failure is set or if
# process pool is closed
self.proc_pool.run_command(proc_ctx)
self._run_event_handlers_callback(proc_ctx)
else:
# Run command using process pool otherwise
self.proc_pool.put_command(
proc_ctx, callback=self._run_event_handlers_callback)
self._run_cmd(proc_ctx, self._run_event_handlers_callback)

@staticmethod
def _run_event_handlers_callback(proc_ctx):
"""Callback on completion of a workflow event handler."""
if proc_ctx.ret_code:
msg = '%s EVENT HANDLER FAILED' % proc_ctx.cmd_key[1]
LOG.error(str(proc_ctx))
LOG.error(msg)
LOG.error(f'{proc_ctx.cmd_key[1]} EVENT HANDLER FAILED')
else:
LOG.info(str(proc_ctx))

Expand Down
50 changes: 48 additions & 2 deletions tests/integration/test_workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from async_timeout import timeout as async_timeout
import pytest
from types import MethodType

from cylc.flow.scheduler import SchedulerError

Expand Down Expand Up @@ -75,8 +76,8 @@ def _schd(config=None, **opts):
async def test_startup_and_shutdown(test_scheduler, run):
"""Test the startup and shutdown events.

* "statup" should fire every time a scheduler is started.
* "shutdown" should fire every time a scheduler exits in a controlled fassion
* "startup" should fire every time a scheduler is started.
* "shutdown" should fire every time a scheduler does a controlled exit.
(i.e. excluding aborts on unexpected internal errors).
"""
schd = test_scheduler()
Expand Down Expand Up @@ -186,3 +187,48 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
async with run(schd2):
await asyncio.sleep(0.1)
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}


async def test_shutdown_handler_timeout_kill(
test_scheduler, run, monkeypatch, mock_glbl_cfg, caplog
):
"""Test shutdown handlers get killed on the process pool timeout.

Has to be done differently as the process pool is closed during shutdown.
See GitHub #6639

"""
def mock_run_event_handlers(self, event, reason=""):
"""To replace scheduler.run_event_handlers(...).

Run workflow event handlers even in simulation mode.

"""
self.workflow_event_handler.handle(self, event, str(reason))

# Configure a long-running shutdown handler.
schd = test_scheduler({'shutdown handlers': 'sleep 10; echo'})

# Set a low process pool timeout value.
mock_glbl_cfg(
'cylc.flow.subprocpool.glbl_cfg',
'''
[scheduler]
process pool timeout = PT1S
'''
)

async with async_timeout(30):
async with run(schd):
# Replace a scheduler method, to call handlers in simulation mode.
monkeypatch.setattr(
schd,
'run_event_handlers',
MethodType(mock_run_event_handlers, schd),
)
await asyncio.sleep(0.1)

assert (
"[('workflow-event-handler-00', 'shutdown') err] killed on timeout (PT1S)"
in caplog.text
)
Loading