Skip to content

Commit 48d3d38

Browse files
authored
Merge pull request #6660 from cylc/8.4.x-sync
🤖 Merge 8.4.x-sync into master
2 parents eafee44 + 121b0e1 commit 48d3d38

File tree

3 files changed

+40
-16
lines changed

3 files changed

+40
-16
lines changed

changes.d/6656.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where old cycle points could accumulate in the UI.

cylc/flow/data_store_mgr.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,7 +1735,7 @@ def update_data_structure(self):
17351735
self.update_workflow()
17361736

17371737
# Don't process updated deltas of pruned nodes
1738-
self.prune_pruned_updated_nodes()
1738+
self.dedupe_pruned_updated_task_proxies()
17391739

17401740
# Gather deltas
17411741
self.batch_deltas()
@@ -1959,8 +1959,8 @@ def _family_ascent_point_prune(
19591959
if fp_id in parent_ids:
19601960
parent_ids.remove(fp_id)
19611961

1962-
def prune_pruned_updated_nodes(self):
1963-
"""Remove updated nodes that will also be pruned this batch.
1962+
def dedupe_pruned_updated_task_proxies(self):
1963+
"""Remove updated task proxies that will also be pruned in this batch.
19641964
19651965
This will avoid processing and sending deltas that will immediately
19661966
be pruned. Kept separate from other pruning to allow for update
@@ -1970,19 +1970,19 @@ def prune_pruned_updated_nodes(self):
19701970
tp_data = self.data[self.workflow_id][TASK_PROXIES]
19711971
tp_added = self.added[TASK_PROXIES]
19721972
tp_updated = self.updated[TASK_PROXIES]
1973-
j_updated = self.updated[JOBS]
19741973
for tp_id in self.pruned_task_proxies:
19751974
if tp_id in tp_updated:
19761975
if tp_id in tp_data:
1977-
node = tp_data[tp_id]
1976+
node: PbTaskProxy = tp_data[tp_id]
19781977
elif tp_id in tp_added:
19791978
node = tp_added[tp_id]
19801979
else:
19811980
continue
1982-
update_node = tp_updated.pop(tp_id)
1983-
for j_id in list(node.jobs) + list(update_node.jobs):
1984-
if j_id in j_updated:
1985-
del j_updated[j_id]
1981+
update_node: PbTaskProxy = tp_updated.pop(tp_id)
1982+
# Remove this task's added/updated jobs in this batch too:
1983+
for j_id in set(node.jobs).union(update_node.jobs):
1984+
for record in (self.added, self.updated):
1985+
record[JOBS].pop(j_id, None)
19861986
self.n_window_edges.difference_update(update_node.edges)
19871987
self.deltas[EDGES].pruned.extend(update_node.edges)
19881988
self.pruned_task_proxies.clear()

tests/integration/test_data_store_mgr.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,19 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from typing import Iterable, List, TYPE_CHECKING, cast
17+
from logging import INFO
18+
from typing import (
19+
Iterable,
20+
List,
21+
cast,
22+
)
1823

1924
import pytest
2025

26+
from cylc.flow.data_messages_pb2 import (
27+
PbPrerequisite,
28+
PbTaskProxy,
29+
)
2130
from cylc.flow.data_store_mgr import (
2231
EDGES,
2332
FAMILY_PROXIES,
@@ -27,25 +36,22 @@
2736
WORKFLOW,
2837
)
2938
from cylc.flow.id import Tokens
39+
from cylc.flow.scheduler import Scheduler
3040
from cylc.flow.task_events_mgr import TaskEventsManager
3141
from cylc.flow.task_outputs import (
32-
TASK_OUTPUT_SUBMITTED,
3342
TASK_OUTPUT_STARTED,
43+
TASK_OUTPUT_SUBMITTED,
3444
TASK_OUTPUT_SUCCEEDED,
3545
)
3646
from cylc.flow.task_state import (
3747
TASK_STATUS_FAILED,
48+
TASK_STATUS_PREPARING,
3849
TASK_STATUS_SUCCEEDED,
3950
TASK_STATUS_WAITING,
4051
)
4152
from cylc.flow.wallclock import get_current_time_string
4253

4354

44-
if TYPE_CHECKING:
45-
from cylc.flow.scheduler import Scheduler
46-
from cylc.flow.data_messages_pb2 import PbPrerequisite, PbTaskProxy
47-
48-
4955
# NOTE: Some of these tests mutate the data store, so running them in
5056
# isolation may see failures when they actually pass if you run the
5157
# whole file
@@ -502,3 +508,20 @@ def _patch_remove(*args, **kwargs):
502508
TASK_OUTPUT_SUCCEEDED,
503509
}
504510
assert get_delta_outputs() is None
511+
512+
513+
async def test_remove_added_jobs_of_pruned_task(one: Scheduler, start):
514+
"""When a task is pruned, any of its jobs added in the same batch
515+
must be removed from the batch.
516+
517+
See https://github.com/cylc/cylc-flow/pull/6656
518+
"""
519+
async with start(one):
520+
itask = one.pool.get_tasks()[0]
521+
itask.state_reset(TASK_STATUS_PREPARING)
522+
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_SUCCEEDED)
523+
assert not one.data_store_mgr.data[one.id][JOBS]
524+
assert len(one.data_store_mgr.added[JOBS]) == 1
525+
one.data_store_mgr.update_data_structure()
526+
assert not one.data_store_mgr.data[one.id][JOBS]
527+
assert not one.data_store_mgr.added[JOBS]

0 commit comments

Comments
 (0)