Skip to content

Commit 9ec6a13

Browse files
committed
wip
1 parent 01c2308 commit 9ec6a13

7 files changed

Lines changed: 6 additions & 112 deletions

File tree

cylc/flow/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def environ_init():
5353

5454
environ_init()
5555

56-
__version__ = '8.5.0.dev'
56+
__version__ = '8.5.0.dev836'
5757

5858

5959
def iter_entry_points(entry_point_name):

cylc/flow/commands.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -462,17 +462,11 @@ async def force_trigger_tasks(
462462
flow: List[str],
463463
flow_wait: bool = False,
464464
flow_descr: Optional[str] = None,
465-
on_resume: bool = False
466465
):
467466
"""Manual task trigger."""
468467
validate.is_tasks(tasks)
469468
validate.flow_opts(flow, flow_wait)
470-
if on_resume:
471-
LOG.warning(
472-
"The --on-resume option is deprecated and will be removed "
473-
"at Cylc 8.5."
474-
)
475469
yield
476470
yield schd.pool.force_trigger_tasks(
477-
tasks, flow, flow_wait, flow_descr, on_resume
471+
tasks, flow, flow_wait, flow_descr
478472
)

cylc/flow/network/schema.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2238,15 +2238,6 @@ class Meta:
22382238
''')
22392239
resolver = partial(mutator, command='force_trigger_tasks')
22402240

2241-
class Arguments(TaskMutation.Arguments, FlowMutationArguments):
2242-
on_resume = Boolean(
2243-
default_value=False,
2244-
description=sstrip('''
2245-
If the workflow is paused, wait until it is resumed before
2246-
running the triggered task(s).
2247-
''')
2248-
)
2249-
22502241

22512242
def _mut_field(cls):
22522243
"""Convert a mutation class into a field.

cylc/flow/scheduler.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,10 +1480,6 @@ def release_tasks_to_run(self) -> bool:
14801480
if not self.is_paused:
14811481
# release queued tasks
14821482
pre_prep_tasks.update(self.pool.release_queued_tasks())
1483-
if self.pool.tasks_to_trigger_on_resume:
1484-
# and manually triggered tasks to run once workflow resumed
1485-
pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume)
1486-
self.pool.tasks_to_trigger_on_resume = set()
14871483

14881484
elif (
14891485
(

cylc/flow/scripts/trigger.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,6 @@ def get_option_parser() -> COP:
108108

109109
add_flow_opts(parser)
110110

111-
parser.add_option(
112-
"--on-resume",
113-
help=(
114-
"If the workflow is paused, wait until it is resumed before "
115-
"running the triggered task(s). DEPRECATED - this will be "
116-
"removed at Cylc 8.5."
117-
),
118-
action="store_true",
119-
default=False,
120-
dest="on_resume"
121-
)
122111
return parser
123112

124113

@@ -136,7 +125,6 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
136125
'flow': options.flow,
137126
'flowWait': options.flow_wait,
138127
'flowDescr': options.flow_descr,
139-
'onResume': options.on_resume,
140128
}
141129
}
142130
return await pclient.async_request('graphql', mutation_kwargs)

cylc/flow/task_pool.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ def __init__(
216216

217217
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
218218
self.tasks_to_trigger_now: Set['TaskProxy'] = set()
219-
self.tasks_to_trigger_on_resume: Set['TaskProxy'] = set()
220219

221220
def set_stop_task(self, task_id):
222221
"""Set stop after a task."""
@@ -2299,7 +2298,7 @@ def _get_flow_nums(
22992298
for n in flow
23002299
}
23012300

2302-
def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
2301+
def _force_trigger(self, itask: 'TaskProxy'):
23032302
"""Process a manually triggered task, ready for job submission.
23042303
23052304
Assumes the task is in the pool.
@@ -2311,10 +2310,6 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
23112310
- queue it, if the queue is limiting activity
23122311
- run it, if the queue is not limiting activity
23132312
2314-
After state reset and queue handling:
2315-
- if on_resume is False, add the task to tasks_to_trigger_now
2316-
- if on_resume is True, add the task to tasks_to_trigger_on_resume
2317-
23182313
The scheduler will release tasks from the tasks_to_trigger sets.
23192314
23202315
"""
@@ -2350,15 +2345,7 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
23502345
# If not queued now, record the task as ready to run.
23512346
itask.waiting_on_job_prep = True
23522347

2353-
if on_resume:
2354-
self.tasks_to_trigger_on_resume.add(itask)
2355-
# In case previously triggered without --on-resume.
2356-
# (It should have run already, but just in case).
2357-
self.tasks_to_trigger_now.discard(itask)
2358-
else:
2359-
self.tasks_to_trigger_now.add(itask)
2360-
# In case previously triggered with --on-resume.
2361-
self.tasks_to_trigger_on_resume.discard(itask)
2348+
self.tasks_to_trigger_now.add(itask)
23622349

23632350
# Task may be set running before xtrigger is satisfied,
23642351
# if so check/spawn if xtrigger sequential.
@@ -2370,7 +2357,6 @@ def force_trigger_tasks(
23702357
flow: List[str],
23712358
flow_wait: bool = False,
23722359
flow_descr: Optional[str] = None,
2373-
on_resume: bool = False
23742360
):
23752361
"""Manually trigger tasks.
23762362
@@ -2406,7 +2392,7 @@ def force_trigger_tasks(
24062392
LOG.error(f"[{itask}] ignoring trigger - already active")
24072393
continue
24082394
self.merge_flows(itask, flow_nums)
2409-
self._force_trigger(itask, on_resume)
2395+
self._force_trigger(itask)
24102396

24112397
# Spawn and trigger inactive tasks.
24122398
if not flow:
@@ -2441,7 +2427,7 @@ def force_trigger_tasks(
24412427

24422428
# run it (or run it again for incomplete flow-wait)
24432429
self.add_to_pool(itask)
2444-
self._force_trigger(itask, on_resume)
2430+
self._force_trigger(itask)
24452431

24462432
def spawn_parentless_sequential_xtriggers(self):
24472433
"""Spawn successor(s) of parentless wall clock satisfied tasks."""

tests/integration/test_force_trigger.py

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -88,64 +88,3 @@ async def test_trigger_workflow_paused(
8888
level=logging.ERROR,
8989
contains="ignoring trigger - already active"
9090
)
91-
92-
93-
async def test_trigger_on_resume(
94-
flow: 'Fixture',
95-
scheduler: 'Fixture',
96-
start: 'Fixture',
97-
capture_submission: 'Fixture',
98-
log_filter: Callable
99-
):
100-
"""
101-
Test manual triggering on-resume option when the workflow is paused.
102-
103-
https://github.com/cylc/cylc-flow/issues/6192
104-
105-
"""
106-
id_ = flow({
107-
'scheduling': {
108-
'queues': {
109-
'default': {
110-
'limit': 1,
111-
},
112-
},
113-
'graph': {
114-
'R1': '''
115-
a => x & y & z
116-
''',
117-
},
118-
},
119-
})
120-
schd = scheduler(id_, paused_start=True)
121-
122-
# start the scheduler (but don't set the main loop running)
123-
async with start(schd):
124-
125-
# capture task submissions (prevents real submissions)
126-
submitted_tasks = capture_submission(schd)
127-
128-
# paused at start-up so no tasks should be submitted
129-
assert len(submitted_tasks) == 0
130-
131-
# manually trigger 1/x - it not should be submitted
132-
schd.pool.force_trigger_tasks(['1/x'], [1], on_resume=True)
133-
schd.release_tasks_to_run()
134-
assert len(submitted_tasks) == 0
135-
136-
# manually trigger 1/y - it should not be submitted
137-
# (queue limit reached)
138-
schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True)
139-
schd.release_tasks_to_run()
140-
assert len(submitted_tasks) == 0
141-
142-
# manually trigger 1/y again - it should not be submitted
143-
# (triggering a queued task runs it)
144-
schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True)
145-
schd.release_tasks_to_run()
146-
assert len(submitted_tasks) == 0
147-
148-
# resume the workflow, both tasks should trigger now.
149-
schd.resume_workflow()
150-
schd.release_tasks_to_run()
151-
assert len(submitted_tasks) == 2

0 commit comments

Comments
 (0)