Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,90 @@ def _visit(
]
required_deps = []

# join steps in_funcs need special handling, as there can be disjoint sets of always-executing and conditional branches.
if node.type == "join" and any(
self._is_conditional_node(self.graph[fn]) for fn in node.in_funcs
):

def _split_switch_ancestors(step_name, first_ancestor):
acc = []
for in_fn in self.graph[step_name].in_funcs:
if self.graph[in_fn].type == "split-switch":
acc.append(in_fn)
if not in_fn == first_ancestor:
acc.extend(
_split_switch_ancestors(in_fn, first_ancestor)
)

return acc
Comment thread
saikonen marked this conversation as resolved.

node_groups = {}
node_switch_ancestors = {}
for fn in node.in_funcs:
if self.graph[fn].split_branches:
# This is the latest split in the DAG.
last_split = self.graph[fn].split_branches[-1]
switch_ancestors = _split_switch_ancestors(
fn, node.split_parents[-1]
)
Comment thread
saikonen marked this conversation as resolved.
if switch_ancestors:
node_switch_ancestors[fn] = switch_ancestors
new_funcs = node_groups.get(last_split, [])
new_funcs.append(fn)
node_groups[last_split] = new_funcs

def build_ancestor_tree(node_groups, switch_ancestors):
result = {}
for parent, children in node_groups.items():
nodes = [
n
for g in children
for n in (g if isinstance(g, list) else [g])
]
Comment on lines +1450 to +1454
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead isinstance(g, list) branch — always evaluates to [g]

children in node_groups is always a flat list of step-name strings. It is populated exclusively via new_funcs.append(fn) where fn is a string from node.in_funcs. Consequently isinstance(g, list) is always False, the g branch is dead code, and the comprehension is always equivalent to nodes = list(children).

Keeping the guard creates noise for future maintainers and may suggest that node_groups can hold nested lists when it cannot.

Suggested change
nodes = [
n
for g in children
for n in (g if isinstance(g, list) else [g])
]
nodes = list(children)


# Group nodes by their ancestor set
by_anc = defaultdict(list)
for n in nodes:
by_anc[frozenset(switch_ancestors.get(n, []))].append(n)

# Sort from most specific (most ancestors) to least
groups = sorted(
by_anc.items(), key=lambda x: len(x[0]), reverse=True
)

# Greedily build chains: add to a chain if this key is a subset of its first (largest) key
chains = []
for key, grp in groups:
for chain in chains:
if key <= chain[0][0]:
chain.append((key, grp))
break
else:
chains.append([(key, grp)])

result[parent] = [[g for _, g in chain] for chain in chains]
return result

if node_groups:
conditional_deps = []
required_deps = []
for parent, chains in build_ancestor_tree(
node_groups, node_switch_ancestors
).items():
parts = []
for chain in chains:
groups = [
"({})".format(
" || ".join(
"%s.Succeeded" % self._sanitize(g)
for g in grp
)
)
for grp in chain
]
parts.append("({})".format(" || ".join(groups)))
Comment thread
saikonen marked this conversation as resolved.
required_deps.append("&&".join(parts))
Comment thread
saikonen marked this conversation as resolved.
Comment thread
saikonen marked this conversation as resolved.

both_conditions = required_deps and conditional_deps

depends_str = "{required}{_and}{conditional}".format(
Expand Down
Loading