Skip to content

Commit 9352f61

Browse files
authored
Merge pull request #6909 from hjoliver/cylc-play-vr-ping
Address play and vr timeouts: ping first
2 parents 6578e3e + 3a73be7 commit 9352f61

File tree

18 files changed

+197
-113
lines changed

18 files changed

+197
-113
lines changed

changes.d/6909.fix.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix potential timeout of the play and vr commands for workflows with contact
2+
files, due to an unnecessary remote process check - now only done if the
3+
workflow fails to respond on the network.

cylc/flow/cfgspec/globalcfg.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,23 @@ def default_for(
10441044
10451045
{REPLACES}``[suite servers][run host select]rank``.
10461046
''')
1047+
Conf('process check timeout', VDR.V_INTERVAL, DurationFloat(10),
1048+
desc='''
1049+
Maximum time for the `cylc play` and `cylc vr` commands to wait
1050+
for a remote process that checks if an unresponsive scheduler
1051+
is still alive (for workflows with existing contact files).
1052+
1053+
.. note::
10471054
1055+
This check involves running ``cylc psutil`` on the run host.
1056+
You may need to increase the timeout if shared filesystem
1057+
latency (for example) results in slow Python script startup.
1058+
Increasing the timeout unnecessarily, however, will just
1059+
cause these commands to hang for an unnecessarily long time
1060+
in this circumstance.
1061+
1062+
.. versionadded:: 8.5.2
1063+
''')
10481064
with Conf('host self-identification', desc=f'''
10491065
How Cylc determines and shares the identity of the workflow host.
10501066

cylc/flow/clean.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from cylc.flow import LOG
4949
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
5050
from cylc.flow.exceptions import (
51-
ContactFileExists,
51+
SchedulerAlive,
5252
CylcError,
5353
InputError,
5454
PlatformError,
@@ -123,7 +123,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None:
123123
return
124124
try:
125125
detect_old_contact_file(id_)
126-
except ContactFileExists as exc:
126+
except SchedulerAlive as exc:
127127
raise ServiceFileError(
128128
f"Cannot clean running workflow {id_}.\n\n{exc}"
129129
) from None

cylc/flow/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ class WorkflowFilesError(CylcError):
136136
bullet = "\n -"
137137

138138

139-
class ContactFileExists(CylcError):
140-
"""Workflow contact file exists."""
139+
class SchedulerAlive(CylcError):
140+
"""Workflow contact file exists and scheduler is alive."""
141141

142142

143143
class FileRemovalError(CylcError):

cylc/flow/network/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
)
4242
from cylc.flow.exceptions import (
4343
ClientTimeout,
44-
ContactFileExists,
44+
SchedulerAlive,
4545
CylcError,
4646
RequestError,
4747
WorkflowStopped,
@@ -158,6 +158,10 @@ def timeout_handler(self) -> None:
158158
WorkflowStopped: if the workflow has already stopped.
159159
CyclError: if the workflow has moved to different host/port.
160160
"""
161+
LOG.warning(
162+
f"{self.workflow} {self.host}:{self.port}:"
163+
f" Connection timed out ({self.timeout} ms)"
164+
)
161165
contact_host, contact_port, *_ = get_location(self.workflow)
162166
if (
163167
contact_host != get_fqdn_by_host(self._orig_host)
@@ -177,7 +181,7 @@ def timeout_handler(self) -> None:
177181
# behind a contact file?
178182
try:
179183
detect_old_contact_file(self.workflow)
180-
except ContactFileExists:
184+
except SchedulerAlive:
181185
# old contact file exists and the workflow process still alive
182186
return
183187
else:

cylc/flow/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ def _configure_contact(self) -> None:
11441144
"""Create contact file."""
11451145
# Make sure another workflow of the same name hasn't started while this
11461146
# one is starting
1147-
# NOTE: raises ContactFileExists if workflow is running
1147+
# NOTE: raises SchedulerAlive if workflow is running
11481148
workflow_files.detect_old_contact_file(self.workflow)
11491149

11501150
# Extract contact data.

cylc/flow/scheduler_cli.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030

3131
from cylc.flow import LOG, __version__
3232
from cylc.flow.exceptions import (
33-
ContactFileExists,
3433
CylcError,
3534
ServiceFileError,
35+
WorkflowStopped,
3636
)
37+
from cylc.flow.scripts.ping import run as cylc_ping
3738
import cylc.flow.flags
3839
from cylc.flow.id import upgrade_legacy_ids
3940
from cylc.flow.host_select import select_workflow_host
@@ -60,7 +61,6 @@
6061
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
6162
from cylc.flow.workflow_files import (
6263
SUITERC_DEPR_MSG,
63-
detect_old_contact_file,
6464
get_workflow_srv_dir,
6565
)
6666
from cylc.flow.terminal import (
@@ -474,13 +474,29 @@ async def _scheduler_cli_3(
474474
async def _resume(workflow_id, options):
475475
"""Resume the workflow if it is already running."""
476476
try:
477-
detect_old_contact_file(workflow_id)
478-
except ContactFileExists as exc:
479-
print(f"Resuming already-running workflow\n\n{exc}")
480477
pclient = WorkflowRuntimeClient(
481478
workflow_id,
482479
timeout=options.comms_timeout,
483480
)
481+
except WorkflowStopped:
482+
# Not running - don't resume.
483+
return
484+
485+
# Is it running? If yes, send resume command.
486+
try:
487+
await cylc_ping(options, workflow_id, pclient)
488+
except WorkflowStopped:
489+
# Not running, restart instead of resume.
490+
# (Orphaned contact file will be removed by cylc_ping client logic).
491+
return
492+
except CylcError as exc:
493+
# PID check failed - abort.
494+
LOG.error(exc)
495+
LOG.critical('Cannot tell if the workflow is running')
496+
sys.exit(1)
497+
else:
498+
# It's running: resume it and exit.
499+
print("Resuming already-running workflow")
484500
mutation_kwargs = {
485501
'request_string': RESUME_MUTATION,
486502
'variables': {
@@ -489,13 +505,6 @@ async def _resume(workflow_id, options):
489505
}
490506
await pclient.async_request('graphql', mutation_kwargs)
491507
sys.exit(0)
492-
except CylcError as exc:
493-
LOG.error(exc)
494-
LOG.critical(
495-
'Cannot tell if the workflow is running'
496-
'\nNote, Cylc 8 cannot restart Cylc 7 workflows.'
497-
)
498-
sys.exit(1)
499508

500509

501510
def _version_check(

cylc/flow/scripts/clean.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async def run(*ids: str, opts: 'Values') -> None:
220220
workflows, multi_mode = await scan(workflows, multi_mode)
221221

222222
if not workflows:
223-
LOG.warning(f"No workflows matching {', '.join(ids)}")
223+
LOG.warning(f"No stopped workflows matching {', '.join(ids)}")
224224
return
225225

226226
workflows.sort()

cylc/flow/scripts/ping.py

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,22 @@
1818

1919
"""cylc ping [OPTIONS] ARGS
2020
21-
Test communication with a running workflow.
21+
Test communication with running workflows.
2222
23-
If workflow WORKFLOW is running or TASK in WORKFLOW is currently running,
24-
exit with success status, else exit with error status.
23+
Print the HOST:PORT of running workflows.
24+
If any are not running, exit with error status.
2525
"""
2626

2727
from functools import partial
2828
import sys
2929
from typing import Any, Dict, TYPE_CHECKING
3030

31-
import cylc.flow.flags
3231
from cylc.flow.network.client_factory import get_client
3332
from cylc.flow.network.multi import call_multi
3433
from cylc.flow.option_parsers import (
35-
FULL_ID_MULTI_ARG_DOC,
34+
ID_MULTI_ARG_DOC,
3635
CylcOptionParser as COP,
3736
)
38-
from cylc.flow.task_state import TASK_STATUS_RUNNING
3937
from cylc.flow.terminal import cli_function
4038

4139
if TYPE_CHECKING:
@@ -48,16 +46,6 @@
4846
id
4947
name
5048
port
51-
pubPort
52-
}
53-
}
54-
'''
55-
56-
TASK_QUERY = '''
57-
query ($tProxy: ID!) {
58-
taskProxy (id: $tProxy) {
59-
state
60-
id
6149
}
6250
}
6351
'''
@@ -67,9 +55,8 @@ def get_option_parser() -> COP:
6755
parser = COP(
6856
__doc__,
6957
comms=True,
70-
multitask=True,
7158
multiworkflow=True,
72-
argdoc=[FULL_ID_MULTI_ARG_DOC],
59+
argdoc=[ID_MULTI_ARG_DOC],
7360
)
7461

7562
return parser
@@ -78,9 +65,10 @@ def get_option_parser() -> COP:
7865
async def run(
7966
options: 'Values',
8067
workflow_id: str,
81-
*tokens_list,
68+
client=None
8269
) -> Dict:
83-
pclient = get_client(workflow_id, timeout=options.comms_timeout)
70+
71+
pclient = client or get_client(workflow_id, timeout=options.comms_timeout)
8472

8573
ret: Dict[str, Any] = {
8674
'stdout': [],
@@ -91,38 +79,11 @@ async def run(
9179
'request_string': FLOW_QUERY,
9280
'variables': {'wFlows': [workflow_id]}
9381
}
94-
task_kwargs: Dict[str, Any] = {
95-
'request_string': TASK_QUERY,
96-
}
9782

98-
# ping called on the workflow
9983
result = await pclient.async_request('graphql', flow_kwargs)
100-
msg = ""
101-
for flow in result['workflows']:
102-
w_name = flow['name']
103-
w_port = flow['port']
104-
w_pub_port = flow['pubPort']
105-
if cylc.flow.flags.verbosity > 0:
106-
ret['stdout'].append(
107-
f'{w_name} running on '
108-
f'{pclient.host}:{w_port} {w_pub_port}\n'
109-
)
110-
111-
# ping called with task-like objects
112-
for tokens in tokens_list:
113-
task_kwargs['variables'] = {
114-
'tProxy': tokens.relative_id
115-
}
116-
task_result = await pclient.async_request('graphql', task_kwargs)
117-
string_id = tokens.relative_id
118-
if not task_result.get('taskProxy'):
119-
msg = f"task not found: {string_id}"
120-
elif task_result['taskProxy']['state'] != TASK_STATUS_RUNNING:
121-
msg = f"task not {TASK_STATUS_RUNNING}: {string_id}"
122-
if msg:
123-
ret['stderr'].append(msg)
124-
ret['exit'] = 1
12584

85+
for flow in result['workflows']:
86+
ret['stdout'].append(f"{pclient.host}:{flow['port']}")
12687
return ret
12788

12889

@@ -144,6 +105,6 @@ def main(
144105
partial(run, options),
145106
*ids,
146107
report=report,
147-
constraint='mixed',
108+
constraint='workflows',
148109
)
149110
sys.exit(all(rets.values()) is False)

cylc/flow/scripts/reinstall.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ async def reinstall_cli(
217217
# no rsync output == no changes => exit
218218
print(cparse(
219219
'<magenta>'
220-
f'{workflow_id} up to date with {source}'
220+
'No changes made:'
221+
f' {workflow_id} is up to date with {source}'
221222
'</magenta>'
222223
))
223224
return False
@@ -249,7 +250,7 @@ async def reinstall_cli(
249250
else:
250251
# no reinstall
251252
print(
252-
cparse('<magenta>Reinstall canceled, no changes made.</magenta>')
253+
cparse('<magenta>No changes made: reinstall cancelled.</magenta>')
253254
)
254255
return False
255256

0 commit comments

Comments
 (0)