Skip to content
1 change: 1 addition & 0 deletions changes.d/7206.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes a memory leak in the data management that impacted some workflows.
60 changes: 44 additions & 16 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
generated at the boundary of an active node's graph walk and registering active
node's parents against them. Once active, these boundary nodes act as the prune
triggers for the associated parent nodes. Set operations are used to do a diff
between the nodes of active paths (paths whose node is in n=0)
and the nodes of flagged paths (whose boundary node(s) have become active).
between the nodes of active paths (paths whose node is in n=0) and the nodes of
flagged paths (whose boundary node(s) have become active).
This method is used to avoid "blinking", where a task becomes non-active then
is removed (along with it's window/walk) before a descendant is added, causing
it to disapear then reappear in the store (and, hence, UIs).

Updates are created by the event/task/job managers.

Expand Down Expand Up @@ -658,20 +661,35 @@ def __init__(self, schd, n_edge_distance=1):
# internal delta
self.delta_queues = {self.workflow_id: {}}
self.publish_deltas = []

# internal n-window
self.all_task_pool = set()
self.all_n_window_nodes = set()
self.n_window_nodes = {}
self.n_window_edges = set()
# The walk information for window nodes, which is used for
# pre-populating new walks (if possible) and node depth calculations.
self.n_window_node_walks = {}
self.n_window_completed_walks = set()
self.n_window_depths = {}
self.update_window_depths = False
self.db_load_task_proxies: Dict[str, Tuple[TaskProxy, bool]] = {}
# Family node IDs that have been pruned. Sent with deltas and used for
# exclusion from state total and other calculations.
self.family_pruned_ids = set()
# Boundary nodes are those nodes at the boundary of the window, and
# these are used to trigger pruning for associated nodes.
# self.prune_trigger_nodes collects walk node IDs associated with a
# boundary node so that nodes of isolates, adjacent paths, and it's
# own path can be flagged for pruning.
self.prune_trigger_nodes = {}
# Node ids flagged for pruning.
# Those not in active paths (the walk paths of active tasks) will be
# pruned.
self.prune_flagged_nodes = set()
# Set of removed task proxies to avoid applying/sending new deltas.
self.pruned_task_proxies = set()

self.updates_pending = False
self.updates_pending_follow_on = False
self.publish_pending = False
Expand Down Expand Up @@ -1214,15 +1232,10 @@ def increment_graph_window(
boundary_nodes = {active_id}
# associate
for tp_id in boundary_nodes:
try:
self.prune_trigger_nodes.setdefault(tp_id, set()).update(
active_walk['walk_ids']
)
self.prune_trigger_nodes[tp_id].discard(tp_id)
except KeyError:
self.prune_trigger_nodes.setdefault(tp_id, set()).add(
active_id
)
self.prune_trigger_nodes.setdefault(tp_id, set()).update(
active_walk['walk_ids']
)
self.prune_trigger_nodes[tp_id].discard(tp_id)
# flag manual triggers for pruning on deletion.
if is_manual_submit:
self.prune_trigger_nodes.setdefault(active_id, set()).add(
Expand Down Expand Up @@ -1279,20 +1292,23 @@ def remove_pool_node(self, name, point):
self.updates_pending = True
# flagged isolates/end-of-branch nodes for pruning on removal
if (
tp_id in self.prune_trigger_nodes and
tp_id in self.prune_trigger_nodes[tp_id]
tp_id in self.prune_trigger_nodes and
tp_id in self.prune_trigger_nodes[tp_id]
):
self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id])
del self.prune_trigger_nodes[tp_id]
# If, at the time of removal, no desendents are active then only
# flag the node not the entire walk.
elif (
tp_id in self.n_window_nodes and
self.n_window_nodes[tp_id].isdisjoint(self.all_task_pool)
tp_id in self.n_window_nodes and
self.n_window_nodes[tp_id].isdisjoint(self.all_task_pool)
):
self.prune_flagged_nodes.add(tp_id)
elif tp_id in self.n_window_node_walks:
self.prune_flagged_nodes.update(
self.n_window_node_walks[tp_id]['walk_ids']
)
if tp_id in self.prune_trigger_nodes:
del self.prune_trigger_nodes[tp_id]
self.update_window_depths = True
self.updates_pending = True

Expand Down Expand Up @@ -1930,6 +1946,7 @@ def window_resize_rewalk(self) -> None:

# Clear window walks, and walk from scratch.
self.prune_flagged_nodes.clear()
self.prune_trigger_nodes.clear()
self.n_window_node_walks.clear()
for tp_id in self.all_task_pool:
tokens = Tokens(tp_id)
Expand Down Expand Up @@ -2021,6 +2038,17 @@ def prune_data_store(self):
# Absolute triggers may be present in task pool, so recheck.
# Clear the rest.
self.prune_flagged_nodes.intersection_update(self.all_task_pool)
# Clear any boundary prune triggers not in the window.
# This can happen where the graph has paths not taken, i.e.:
# ```
# foo => a
# foo:failed => b
# ```
# So if `foo` then `a`, which when active/removed is the prune trigger
# for `foo`.. However, `b` is not used so delete the trigger here.
for trigger_id in set(
self.prune_trigger_nodes).difference(self.all_n_window_nodes):
del self.prune_trigger_nodes[trigger_id]
Comment on lines +2041 to +2051
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the explanation for why the below fix was needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!


tp_data = self.data[self.workflow_id][TASK_PROXIES]
tp_added = self.added[TASK_PROXIES]
Expand Down
102 changes: 74 additions & 28 deletions cylc/flow/main_loop/log_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
except ModuleNotFoundError:
PLT = False

from pympler.asizeof import asized
from pympler.asizeof import asizeof


@startup
Expand All @@ -47,7 +47,7 @@ async def init(scheduler, state):
state['objects'] = {}
state['size'] = {}
state['times'] = []
for key, _ in _iter_data_store(scheduler.data_store_mgr.data):
for key, _ in _iter_data_store(scheduler.data_store_mgr):
state['objects'][key] = []
state['size'][key] = []

Expand All @@ -56,12 +56,13 @@ async def init(scheduler, state):
async def log_data_store(scheduler, state):
"""Count the number of objects and the data store size."""
state['times'].append(time())
for key, value in _iter_data_store(scheduler.data_store_mgr.data):
state['objects'][key].append(
len(value)
)
for key, value in _iter_data_store(scheduler.data_store_mgr):
if isinstance(value, (list, dict, set)):
state['objects'][key].append(
len(value)
)
state['size'][key].append(
asized(value).size
asizeof(value)
)


Expand All @@ -72,11 +73,29 @@ async def report(scheduler, state):
_plot(state, scheduler.workflow_run_dir)


def _iter_data_store(data_store):
for item in data_store.values():
for key, value in item.items():
if key != 'workflow':
yield (key, value)
def _iter_data_store(data_store_mgr):
# the data store itself (for a total measurement)
yield ('data_store_mgr (total)', data_store_mgr)

# the top-level attributes of the data store
for key in dir(data_store_mgr):
if (
key != 'data'
and not key.startswith('__')
and isinstance(
value := getattr(data_store_mgr, key),
(list, dict, set)
)
):
yield (key, value)

# the individual components of the "data" attribute
for datum in data_store_mgr.data.values():
for key, value in datum.items():
if key == 'workflow':
yield (f'data.{key}', [value])
else:
yield (f'data.{key}', value)
# there should only be one workflow in the data store
break

Expand All @@ -94,34 +113,61 @@ def _dump(state, path):
return True


def _plot(state, path):
def _plot(state, path, min_size_percent=2):
if (
not PLT
or len(state['times']) < 2
not PLT
or len(state['times']) < 2
):
return False

# extract snapshot times
times = [tick - state['times'][0] for tick in state['times']]
_, ax1 = plt.subplots(figsize=(10, 7.5))

ax1.set_xlabel('Time (s)')
max_size = max(
size
for sizes in state['size'].values()
for size in sizes
)

ax1.set_ylabel('Objects')
lines = [
ax1.plot(times, objects, label=key)[0]
for key, objects in state['objects'].items()
]
# filter attributes by the minimum size
min_size_bytes = max_size * (min_size_percent / 100)
filtered_keys = {
key
for key, sizes in state['size'].items()
if (
any(size > min_size_bytes for size in sizes)
or key.startswith('data.')
)
}

# plot
fig = plt.figure(figsize=(15, 8))
ax1 = fig.add_subplot(111)
fig.suptitle(
f'data_store_mgr data and attrs above {min_size_percent}% of largest'
f' (> {int(min_size_bytes / 1000)}kb)'
)

# plot sizes
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('Size (kb)')
for key, sizes in state['size'].items():
if key in filtered_keys:
ax1.plot(times, [x / 1000 for x in sizes], label=key)

# plot # objects
ax2 = ax1.twinx()
ax2.set_ylabel('Size (kb)')
for sizes in state['size'].values():
ax2.plot(times, [x / 1000 for x in sizes], linestyle=':')
ax2.set_ylabel('Objects')
for key, objects in state['objects'].items():
if objects and key in filtered_keys:
ax2.plot(times, objects, label=key, linestyle=':')

ax1.legend(lines, state['objects'], loc=0)
# legends
ax1.legend(loc=0)
ax2.legend(
(ax1.get_children()[0], ax2.get_children()[0]),
('objects', 'size'),
loc=1
('size', 'objects'),
loc=0
)

# start the x-axis at zero
Expand Down
12 changes: 10 additions & 2 deletions tests/unit/main_loop/test_log_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,16 @@ def test_plot(test_data, tmp_path):


def test_iter_data_store():
class DataStore:
tracker = {'this': 'that'}
data = {'x': {'a': 1, 'workflow': 2, 'c': 3}}
ds = DataStore()
assert (
list(_iter_data_store({'x': {'a': 1, 'workflow': 2, 'c': 3}}))
list(_iter_data_store(ds))
) == [
('a', 1), ('c', 3)
('data_store_mgr (total)', ds),
('tracker', {'this': 'that'}),
('data.a', 1),
('data.workflow', [2]),
('data.c', 3)
]
Loading