Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6506.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Work around caching behaviour observed on NFS filesystems which could cause workflows to appear to be stopped or even to not exist, when they are running.
1 change: 1 addition & 0 deletions changes.d/6577.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug where if you prematurely deleted the job log directory, it would leave tasks permanently in the submitted or running states.
13 changes: 13 additions & 0 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDict


JOB_FILES_REMOVED_MESSAGE = 'ERR_JOB_FILES_REMOVED'


class JobPollContext():
"""Context object for a job poll."""
CONTEXT_ATTRIBUTES = (
Expand Down Expand Up @@ -439,6 +442,16 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err):
def _jobs_poll_status_files(self, job_log_root, job_log_dir):
"""Helper 1 for self.jobs_poll(job_log_root, job_log_dirs)."""
ctx = JobPollContext(job_log_dir)
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
# The job may still be in the job runner and may yet succeed,
# but we assume it failed & exited because it's the best we
# can do as it is no longer possible to poll it.
ctx.run_status = 1
ctx.job_runner_exit_polled = 1
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
return ctx
try:
with open(
os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS)
Expand Down
17 changes: 14 additions & 3 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,21 @@ async def is_active(flow, is_active):
False to filter for stopped and unregistered flows.

"""
contact = flow['path'] / SERVICE / CONTACT
_is_active = contact.exists()
service = flow['path'] / SERVICE
# NOTE: We must list the service directory contents rather than checking
# for the existence of the contact file directly, because listing the
# directory forces NFS filesystems to recompute their local cache.
# See https://github.com/cylc/cylc-flow/issues/6506
try:
contents = await scandir(service)
except FileNotFoundError:
_is_active = False
else:
_is_active = any(
path.name == WorkflowFiles.Service.CONTACT for path in contents
)
if _is_active:
flow['contact'] = contact
flow['contact'] = service / CONTACT
return _is_active == is_active


Expand Down
20 changes: 15 additions & 5 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
is_remote_platform,
)
from cylc.flow.job_file import JobFileWriter
from cylc.flow.job_runner_mgr import JobPollContext
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE, JobPollContext
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
get_host_from_platform,
Expand Down Expand Up @@ -852,7 +852,12 @@ def _poll_task_job_callback_255(self, itask, cmd_ctx, line):
)
self.poll_task_jobs([itask])

def _poll_task_job_callback(self, itask, cmd_ctx, line):
def _poll_task_job_callback(
self,
itask: 'TaskProxy',
cmd_ctx: SubProcContext,
line: str,
):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
Expand Down Expand Up @@ -881,16 +886,21 @@ def _poll_task_job_callback(self, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO

if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
LOG.error(
f"platform: {itask.platform['name']} - job log directory "
f"{job_tokens.relative_id} no longer exists"
)

if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
self.task_events_mgr.process_message(
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
Expand Down
38 changes: 37 additions & 1 deletion cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

"""

from contextlib import suppress
from enum import Enum
import errno
from collections import deque
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -620,8 +622,29 @@ def get_workflow_srv_dir(id_):
return os.path.join(run_d, WorkflowFiles.Service.DIRNAME)


def refresh_nfs_cache(path: Path):
"""Refresh NFS cache for dirs between ~/cylc-run and <path> inclusive.

On NFS filesystems, the non-existence of files/directories may become
cashed. To work around this, we can list the contents of these directories
which refreshes the NFS cache.

See: https://github.com/cylc/cylc-flow/issues/6506

Arguments:
path: The directory to refresh.

Raises:
FileNotFoundError: If any of the directories between ~/cylc-run and
this directory (inclsive) are not present.

"""
cylc_run_dir = get_cylc_run_dir()
for subdir in reversed(path.relative_to(cylc_run_dir).parents):
deque((cylc_run_dir / subdir).iterdir(), maxlen=0)


def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
"""Load contact file. Return data as key=value dict."""
if not run_dir:
path = Path(get_contact_file_path(id_))
else:
Expand All @@ -630,6 +653,14 @@ def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT
)

if not path.exists():
# work around NFS caching issues
try:
refresh_nfs_cache(path)
except FileNotFoundError as exc:
raise ServiceFileError("Couldn't load contact file") from exc

try:
with open(path) as f:
file_content = f.read()
Expand Down Expand Up @@ -919,6 +950,11 @@ def infer_latest_run(
except ValueError:
raise ValueError(f"{path} is not in the cylc-run directory") from None

if not path.exists():
# work around NFS caching issues
with suppress(FileNotFoundError):
refresh_nfs_cache(path)

if not path.exists():
raise InputError(
f'Workflow ID not found: {id_}\n(Directory not found: {path})'
Expand Down
55 changes: 10 additions & 45 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test execution time limit polling.
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
. "$(dirname "$0")/test_header"
#-------------------------------------------------------------------------------
set_test_number 4

set_test_number 5
create_test_global_config '' "
[platforms]
[[$CYLC_TEST_PLATFORM]]
Expand All @@ -28,51 +28,16 @@ create_test_global_config '' "
execution time limit polling intervals = PT5S
"
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
#-------------------------------------------------------------------------------

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
#-------------------------------------------------------------------------------
# shellcheck disable=SC2317
cmp_times () {
# Test if the times $1 and $2 are within $3 seconds of each other.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser
parser = TimePointParser()
time_1 = parser.parse(sys.argv[1])
time_2 = parser.parse(sys.argv[2])
if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
__PYTHON__
}
time_offset () {
# Add an ISO8601 duration to an ISO8601 date-time.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser, DurationParser
print(
TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
__PYTHON__
}
#-------------------------------------------------------------------------------

LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
# Test logging of the "next job poll" message when task starts.
TEST_NAME="${TEST_NAME_BASE}-log-entry"
LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
# Determine poll times.
PREDICTED_POLL_TIME=$(time_offset \
"$(cut -d ' ' -f 1 <<< "${LINE}")" \
"PT10S") # PT5S time limit + PT5S polling interval
ACTUAL_POLL_TIME=$(sed -n \
's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")

# Test execution timeout polling.
# Main loop is roughly 1 second, but integer rounding may give an apparent 2
# seconds delay, so set threshold as 2 seconds.
run_ok "${TEST_NAME_BASE}-poll-time" \
cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
#-------------------------------------------------------------------------------
log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
"\[1/foo/01:submitted\] => running" \
"\[1/foo/01:running\] poll now, (next in PT5S" \
"\[1/foo/01:running\] (polled)failed/XCPU"

purge
exit
12 changes: 2 additions & 10 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
[runtime]
[[foo]]
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
init-script = cylc__job__disable_fail_signals ERR EXIT
script = """
cylc__job__wait_cylc_message_started
# give it a while for the started message to get picked up by
# the scheduler
sleep 10
exit 1
"""
[[[job]]]
execution time limit = PT5S
script = sleep 20
execution time limit = PT10S
[[bar]]
37 changes: 36 additions & 1 deletion tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import suppress
import json
import logging
from typing import Any as Fixture
from unittest.mock import Mock

from cylc.flow import CYLC_LOG
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
)


async def test_run_job_cmd_no_hosts_error(
Expand Down Expand Up @@ -223,3 +229,32 @@ async def test_broadcast_platform_change(
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
# ... and that remote init failed because all hosts bad:
assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)")


async def test_poll_job_deleted_log_folder(
one_conf, flow, scheduler, start, log_filter
):
"""Capture a task error caused by polling finding the job log dir deleted.

https://github.com/cylc/cylc-flow/issues/6425
"""
response = {
'run_signal': JOB_FILES_REMOVED_MESSAGE,
'run_status': 1,
'job_runner_exit_polled': 1,
}
schd: Scheduler = scheduler(flow(one_conf))
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
job_id = itask.tokens.duplicate(job='01').relative_id
schd.task_job_mgr._poll_task_job_callback(
itask,
cmd_ctx=Mock(),
line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
)
assert itask.state(TASK_STATUS_FAILED)

assert log_filter(
logging.ERROR, f"job log directory {job_id} no longer exists"
)
78 changes: 78 additions & 0 deletions tests/unit/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.job_runner_mgr import (
JobRunnerManager, JOB_FILES_REMOVED_MESSAGE)

jrm = JobRunnerManager()


SAMPLE_STATUS = """
ignore me, I have no = sign
CYLC_JOB_RUNNER_NAME=pbs
CYLC_JOB_ID=2361713
CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z
CYLC_JOB_PID=2361713
CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z
CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31
CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z
CYLC_JOB_EXIT=SUCCEEDED
CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z
"""


def test__job_poll_status_files(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS)
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.job_runner_name == 'pbs'
assert ctx.job_id == '2361713'
assert ctx.job_runner_exit_polled == 1
assert ctx.pid == '2361713'
assert ctx.time_submit_exit == '2025-01-28T14:46:04Z'
assert ctx.time_run == '2025-01-28T14:46:05Z'
assert ctx.time_run_exit == '2025-01-28T14:46:38Z'
assert ctx.run_status == 0
assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31']


def test__job_poll_status_files_task_failed(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO")
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.run_status == 1
assert ctx.run_signal == 'FOO'


def test__job_poll_status_files_deleted_logdir():
"""The log dir has been deleted whilst the task is still active.
Return the context with the message that the task has failed.
"""
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
assert ctx.run_status == 1
assert ctx.job_runner_exit_polled == 1


def test__job_poll_status_files_ioerror(tmp_path, capsys):
"""There is no readable file.
"""
(tmp_path / 'sub').mkdir()
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err