Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes.d/6909.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix potential timeout of the play and vr commands for workflows with contact
files, due to an unnecessary remote process check - now only done if the
workflow fails to respond on the network.
16 changes: 16 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,23 @@ def default_for(

{REPLACES}``[suite servers][run host select]rank``.
''')
Conf('process check timeout', VDR.V_INTERVAL, DurationFloat(10),
desc='''
Maximum time for the `cylc play` and `cylc vr` commands to wait
for a remote process that checks if an unresponsive scheduler
is still alive (for workflows with existing contact files).

.. note::

This check involves running ``cylc psutil`` on the run host.
You may need to increase the timeout if shared filesystem
latency (for example) results in slow Python script startup.
Increasing the timeout unnecessarily, however, will just
cause these commands to hang for an unnecessarily long time
in this circumstance.

.. versionadded:: 8.5.2
''')
with Conf('host self-identification', desc=f'''
How Cylc determines and shares the identity of the workflow host.

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import (
ContactFileExists,
SchedulerAlive,
CylcError,
InputError,
PlatformError,
Expand Down Expand Up @@ -123,7 +123,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None:
return
try:
detect_old_contact_file(id_)
except ContactFileExists as exc:
except SchedulerAlive as exc:
raise ServiceFileError(
f"Cannot clean running workflow {id_}.\n\n{exc}"
) from None
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class WorkflowFilesError(CylcError):
bullet = "\n -"


class ContactFileExists(CylcError):
"""Workflow contact file exists."""
class SchedulerAlive(CylcError):
"""Workflow contact file exists and scheduler is alive."""


class FileRemovalError(CylcError):
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)
from cylc.flow.exceptions import (
ClientTimeout,
ContactFileExists,
SchedulerAlive,
CylcError,
RequestError,
WorkflowStopped,
Expand Down Expand Up @@ -158,6 +158,10 @@ def timeout_handler(self) -> None:
WorkflowStopped: if the workflow has already stopped.
CyclError: if the workflow has moved to different host/port.
"""
LOG.warning(
f"{self.workflow} {self.host}:{self.port}:"
f" Connection timed out ({self.timeout} ms)"
)
contact_host, contact_port, *_ = get_location(self.workflow)
if (
contact_host != get_fqdn_by_host(self._orig_host)
Expand All @@ -177,7 +181,7 @@ def timeout_handler(self) -> None:
# behind a contact file?
try:
detect_old_contact_file(self.workflow)
except ContactFileExists:
except SchedulerAlive:
# old contact file exists and the workflow process still alive
return
else:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ def _configure_contact(self) -> None:
"""Create contact file."""
# Make sure another workflow of the same name hasn't started while this
# one is starting
# NOTE: raises ContactFileExists if workflow is running
# NOTE: raises SchedulerAlive if workflow is running
workflow_files.detect_old_contact_file(self.workflow)

# Extract contact data.
Expand Down
33 changes: 21 additions & 12 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@

from cylc.flow import LOG, __version__
from cylc.flow.exceptions import (
ContactFileExists,
CylcError,
ServiceFileError,
WorkflowStopped,
)
from cylc.flow.scripts.ping import run as cylc_ping
import cylc.flow.flags
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.host_select import select_workflow_host
Expand All @@ -60,7 +61,6 @@
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_files import (
SUITERC_DEPR_MSG,
detect_old_contact_file,
get_workflow_srv_dir,
)
from cylc.flow.terminal import (
Expand Down Expand Up @@ -474,13 +474,29 @@ async def _scheduler_cli_3(
async def _resume(workflow_id, options):
"""Resume the workflow if it is already running."""
try:
detect_old_contact_file(workflow_id)
except ContactFileExists as exc:
print(f"Resuming already-running workflow\n\n{exc}")
pclient = WorkflowRuntimeClient(
workflow_id,
timeout=options.comms_timeout,
)
except WorkflowStopped:
# Not running - don't resume.
return

# Is it running? If yes, send resume command.
try:
await cylc_ping(options, workflow_id, pclient)
except WorkflowStopped:
# Not running, restart instead of resume.
# (Orphaned contact file will be removed by cylc_ping client logic).
return
except CylcError as exc:
# PID check failed - abort.
LOG.error(exc)
LOG.critical('Cannot tell if the workflow is running')
sys.exit(1)
else:
# It's running: resume it and exit.
print("Resuming already-running workflow")
mutation_kwargs = {
'request_string': RESUME_MUTATION,
'variables': {
Expand All @@ -489,13 +505,6 @@ async def _resume(workflow_id, options):
}
await pclient.async_request('graphql', mutation_kwargs)
sys.exit(0)
except CylcError as exc:
LOG.error(exc)
LOG.critical(
'Cannot tell if the workflow is running'
'\nNote, Cylc 8 cannot restart Cylc 7 workflows.'
)
sys.exit(1)


def _version_check(
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def run(*ids: str, opts: 'Values') -> None:
workflows, multi_mode = await scan(workflows, multi_mode)

if not workflows:
LOG.warning(f"No workflows matching {', '.join(ids)}")
LOG.warning(f"No stopped workflows matching {', '.join(ids)}")
return

workflows.sort()
Expand Down
61 changes: 11 additions & 50 deletions cylc/flow/scripts/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@

"""cylc ping [OPTIONS] ARGS

Test communication with a running workflow.
Test communication with running workflows.

If workflow WORKFLOW is running or TASK in WORKFLOW is currently running,
exit with success status, else exit with error status.
Print the HOST:PORT of running workflows.
If any are not running, exit with error status.
"""

from functools import partial
import sys
from typing import Any, Dict, TYPE_CHECKING

import cylc.flow.flags
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.multi import call_multi
from cylc.flow.option_parsers import (
FULL_ID_MULTI_ARG_DOC,
ID_MULTI_ARG_DOC,
CylcOptionParser as COP,
)
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.terminal import cli_function

if TYPE_CHECKING:
Expand All @@ -48,16 +46,6 @@
id
name
port
pubPort
}
}
'''

TASK_QUERY = '''
query ($tProxy: ID!) {
taskProxy (id: $tProxy) {
state
id
}
}
'''
Expand All @@ -67,9 +55,8 @@ def get_option_parser() -> COP:
parser = COP(
__doc__,
comms=True,
multitask=True,
multiworkflow=True,
argdoc=[FULL_ID_MULTI_ARG_DOC],
argdoc=[ID_MULTI_ARG_DOC],
)

return parser
Expand All @@ -78,9 +65,10 @@ def get_option_parser() -> COP:
async def run(
options: 'Values',
workflow_id: str,
*tokens_list,
client=None
) -> Dict:
pclient = get_client(workflow_id, timeout=options.comms_timeout)

pclient = client or get_client(workflow_id, timeout=options.comms_timeout)

ret: Dict[str, Any] = {
'stdout': [],
Expand All @@ -91,38 +79,11 @@ async def run(
'request_string': FLOW_QUERY,
'variables': {'wFlows': [workflow_id]}
}
task_kwargs: Dict[str, Any] = {
'request_string': TASK_QUERY,
}

# ping called on the workflow
result = await pclient.async_request('graphql', flow_kwargs)
msg = ""
for flow in result['workflows']:
w_name = flow['name']
w_port = flow['port']
w_pub_port = flow['pubPort']
if cylc.flow.flags.verbosity > 0:
ret['stdout'].append(
f'{w_name} running on '
f'{pclient.host}:{w_port} {w_pub_port}\n'
)

# ping called with task-like objects
for tokens in tokens_list:
task_kwargs['variables'] = {
'tProxy': tokens.relative_id
}
task_result = await pclient.async_request('graphql', task_kwargs)
string_id = tokens.relative_id
if not task_result.get('taskProxy'):
msg = f"task not found: {string_id}"
elif task_result['taskProxy']['state'] != TASK_STATUS_RUNNING:
msg = f"task not {TASK_STATUS_RUNNING}: {string_id}"
if msg:
ret['stderr'].append(msg)
ret['exit'] = 1

for flow in result['workflows']:
ret['stdout'].append(f"{pclient.host}:{flow['port']}")
return ret


Expand All @@ -144,6 +105,6 @@ def main(
partial(run, options),
*ids,
report=report,
constraint='mixed',
constraint='workflows',
)
sys.exit(all(rets.values()) is False)
5 changes: 3 additions & 2 deletions cylc/flow/scripts/reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ async def reinstall_cli(
# no rsync output == no changes => exit
print(cparse(
'<magenta>'
f'{workflow_id} up to date with {source}'
'No changes made:'
f' {workflow_id} is up to date with {source}'
'</magenta>'
))
return False
Expand Down Expand Up @@ -249,7 +250,7 @@ async def reinstall_cli(
else:
# no reinstall
print(
cparse('<magenta>Reinstall canceled, no changes made.</magenta>')
cparse('<magenta>No changes made: reinstall cancelled.</magenta>')
)
return False

Expand Down
Loading
Loading