Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6639.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that shutdown event handlers are killed if they exceed the process pool timeout.
10 changes: 8 additions & 2 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,22 @@ def put_command(
)

@classmethod
def run_command(cls, ctx):
def run_command(cls, ctx, timeout: Optional[float] = None):
"""Execute command in ctx and capture its output and exit status.

Raises subprocess.TimeoutExpired (via subprocess.communicate) if
the command gets killed for exceeding a given timeout.

Arguments:
ctx (cylc.flow.subprocctx.SubProcContext):
A context object containing the command to run and its status.
timeout:
Timeout in seconds, after which to kill the command.
"""
proc = cls._run_command_init(ctx)
if proc:
ctx.out, ctx.err = (f.decode() for f in proc.communicate())
ctx.out, ctx.err = (
f.decode() for f in proc.communicate(timeout=timeout))
ctx.ret_code = proc.wait()
cls._run_command_exit(ctx)

Expand Down
42 changes: 24 additions & 18 deletions cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from enum import Enum
import os
from shlex import quote
from subprocess import TimeoutExpired
from typing import Any, Dict, List, Union, TYPE_CHECKING

from cylc.flow import LOG
Expand Down Expand Up @@ -223,6 +224,8 @@

def __init__(self, proc_pool):
self.proc_pool = proc_pool
self.proc_timeout = (
glbl_cfg().get(['scheduler', 'process pool timeout']))

@staticmethod
def get_events_conf(
Expand Down Expand Up @@ -302,14 +305,26 @@
env=env,
stdin_str=message
)
if self.proc_pool.closed:
# Run command in foreground if process pool is closed
self.proc_pool.run_command(proc_ctx)
self._run_event_handlers_callback(proc_ctx)
self._run_cmd(proc_ctx, callback=self._run_event_mail_callback)

def _run_cmd(self, ctx, callback):
"""Queue or directly run a command and its callback.

Queue the command to the subprocess pool if possible, or otherwise
run it in the foreground but subject to the subprocess pool timeout.

"""
if not self.proc_pool.closed:
# Queue it to the subprocess pool.
self.proc_pool.put_command(ctx, callback=callback)
else:
# Run command using process pool otherwise
self.proc_pool.put_command(
proc_ctx, callback=self._run_event_mail_callback)
# Run it in the foreground, but use the subprocess pool timeout.
try:
self.proc_pool.run_command(ctx, float(self.proc_timeout))
except TimeoutExpired:
ctx.ret_code = -9
ctx.err = f"killed on timeout ({self.proc_timeout})"

Check warning on line 326 in cylc/flow/workflow_events.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/workflow_events.py#L324-L326

Added lines #L324 - L326 were not covered by tests
callback(ctx)

def _run_event_custom_handlers(self, schd, template_variables, event):
"""Helper for "run_event_handlers", custom event handlers."""
Expand Down Expand Up @@ -349,23 +364,14 @@
env=dict(os.environ),
shell=True # nosec (designed to run user defined code)
)
if self.proc_pool.closed:
# Run command in foreground if abort on failure is set or if
# process pool is closed
self.proc_pool.run_command(proc_ctx)
self._run_event_handlers_callback(proc_ctx)
else:
# Run command using process pool otherwise
self.proc_pool.put_command(
proc_ctx, callback=self._run_event_handlers_callback)
self._run_cmd(proc_ctx, self._run_event_handlers_callback)

@staticmethod
def _run_event_handlers_callback(proc_ctx):
"""Callback on completion of a workflow event handler."""
if proc_ctx.ret_code:
msg = '%s EVENT HANDLER FAILED' % proc_ctx.cmd_key[1]
LOG.error(str(proc_ctx))
LOG.error(msg)
LOG.error(f'{proc_ctx.cmd_key[1]} EVENT HANDLER FAILED')

Check warning on line 374 in cylc/flow/workflow_events.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/workflow_events.py#L374

Added line #L374 was not covered by tests
else:
LOG.info(str(proc_ctx))

Expand Down
46 changes: 44 additions & 2 deletions tests/integration/test_workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from async_timeout import timeout as async_timeout
import pytest
from types import MethodType

from cylc.flow.scheduler import SchedulerError

Expand Down Expand Up @@ -75,8 +76,8 @@ def _schd(config=None, **opts):
async def test_startup_and_shutdown(test_scheduler, run):
"""Test the startup and shutdown events.

* "statup" should fire every time a scheduler is started.
* "shutdown" should fire every time a scheduler exits in a controlled fassion
* "startup" should fire every time a scheduler is started.
* "shutdown" should fire every time a scheduler does a controlled exit.
(i.e. excluding aborts on unexpected internal errors).
"""
schd = test_scheduler()
Expand Down Expand Up @@ -186,3 +187,44 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
async with run(schd2):
await asyncio.sleep(0.1)
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}


async def test_shutdown_handler_timeout_kill(
test_scheduler, run, monkeypatch, caplog
):
"""Test shutdown handlers get killed on the process pool timeout.

Has to be done differently as the process pool is closed during shutdown.
See GitHub #6639

"""
def mock_run_event_handlers(self, event, reason=""):
"""To replace scheduler.run_event_handlers(...).

Run workflow event handlers even in simulation mode.

"""
self.workflow_event_handler.handle(self, event, str(reason))

# Configure a long-running shutdown handler.
schd = test_scheduler({'shutdown handlers': 'sleep 10; echo'})

async with async_timeout(30):
async with run(schd):
# (schd doesn't have a workflow_event_handler prior to this)
# Set a low timeout value.
monkeypatch.setattr(
schd.workflow_event_handler, 'proc_timeout', 0.0
)
# Replace a scheduler method, to call handlers in simulation mode.
monkeypatch.setattr(
schd,
'run_event_handlers',
MethodType(mock_run_event_handlers, schd),
)
await asyncio.sleep(0.1)

assert (
"[('workflow-event-handler-00', 'shutdown') err] killed on timeout (0.0)"
in caplog.text
)
Loading