Skip to content

Commit 22bebef

Browse files
author
Nissan Pow
committed
test: add fail_flow and split_in_branch UX tests for windmill QA findings
- fail_flow.py: flow that raises RuntimeError mid-step; used to verify schedulers report FAILED status (not RUNNING) when a step crashes before reaching end (catches A03-1 in metaflow-windmill QA) - split_in_branch_flow.py: outer split with a nested inner split inside branch_a; verifies the outer join is compiled correctly (catches A02-2 where _find_join_step returned inner_join instead of outer_join) - test_basic.py: test_fail_flow_reports_failed_status and test_split_in_branch_deployer, both marked scheduler_only + deployer
1 parent dcd0ed4 commit 22bebef

3 files changed

Lines changed: 160 additions & 0 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""Flow that deliberately fails mid-step.
2+
3+
Used to verify that schedulers report FAILED status (not RUNNING or PENDING)
4+
when a flow step raises an unhandled exception.
5+
"""
6+
7+
from metaflow import FlowSpec, step
8+
9+
10+
class FailFlow(FlowSpec):
11+
@step
12+
def start(self):
13+
self.next(self.failing_step)
14+
15+
@step
16+
def failing_step(self):
17+
raise RuntimeError("Deliberate failure for testing scheduler status reporting.")
18+
19+
@step
20+
def end(self):
21+
pass
22+
23+
24+
if __name__ == "__main__":
25+
FailFlow()
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Flow with a nested split inside a branch.
2+
3+
Graph:
4+
start → [branch_a, branch_b] → outer_join → end
5+
branch_a → [inner_x, inner_y] → inner_join → outer_join
6+
7+
Used to verify that schedulers correctly compile and execute complex graph
8+
topologies where a split exists inside one branch of an outer split.
9+
Without correct join detection, outer_join and end are silently dropped.
10+
"""
11+
12+
from metaflow import FlowSpec, step
13+
14+
15+
class SplitInBranchFlow(FlowSpec):
16+
@step
17+
def start(self):
18+
self.next(self.branch_a, self.branch_b)
19+
20+
@step
21+
def branch_a(self):
22+
self.label = "a"
23+
self.next(self.inner_x, self.inner_y)
24+
25+
@step
26+
def inner_x(self):
27+
self.sub_label = "x"
28+
self.next(self.inner_join)
29+
30+
@step
31+
def inner_y(self):
32+
self.sub_label = "y"
33+
self.next(self.inner_join)
34+
35+
@step
36+
def inner_join(self, inputs):
37+
self.sub_labels = sorted(i.sub_label for i in inputs)
38+
self.next(self.outer_join)
39+
40+
@step
41+
def branch_b(self):
42+
self.label = "b"
43+
self.next(self.outer_join)
44+
45+
@step
46+
def outer_join(self, inputs):
47+
self.labels = sorted(i.label for i in inputs)
48+
self.next(self.end)
49+
50+
@step
51+
def end(self):
52+
pass
53+
54+
55+
if __name__ == "__main__":
56+
SplitInBranchFlow()

test/ux/core/test_basic.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,85 @@ def test_resources_cpu(exec_mode, decospecs, compute_env, tag, scheduler_config)
201201
), "Message didn't match"
202202

203203

204+
@pytest.mark.scheduler_only
205+
@pytest.mark.deployer
206+
def test_fail_flow_reports_failed_status(
207+
exec_mode, decospecs, compute_env, tag, scheduler_config
208+
):
209+
"""Verify schedulers report FAILED (not RUNNING/PENDING) when a step raises.
210+
211+
Catches A03-1: _check_sysroot_completion returns RUNNING forever for flows
212+
that crash before reaching the end step, because end/ dir never appears.
213+
"""
214+
import time
215+
from .test_utils import deploy_flow_to_scheduler
216+
217+
scheduler_type = scheduler_config.scheduler_type
218+
if scheduler_type is None:
219+
pytest.skip("No scheduler configured — requires a scheduler_type")
220+
221+
deployed_flow = deploy_flow_to_scheduler(
222+
flow_name="basic/fail_flow.py",
223+
tl_args={"decospecs": decospecs, "env": compute_env},
224+
scheduler_args={"cluster": scheduler_config.cluster},
225+
deploy_args={"tags": tag, **(scheduler_config.deploy_args or {})},
226+
scheduler_type=scheduler_type,
227+
)
228+
229+
triggered = deployed_flow.trigger()
230+
231+
deadline = time.time() + 120
232+
final_status = None
233+
while time.time() < deadline:
234+
s = triggered.status
235+
if s in ("FAILED", "SUCCEEDED"):
236+
final_status = s
237+
break
238+
time.sleep(5)
239+
240+
assert final_status == "FAILED", (
241+
"A flow that raises RuntimeError mid-step should report FAILED, got %r"
242+
% final_status
243+
)
244+
245+
246+
@pytest.mark.scheduler_only
247+
@pytest.mark.deployer
248+
def test_split_in_branch_deployer(
249+
exec_mode, decospecs, compute_env, tag, scheduler_config
250+
):
251+
"""Verify a split nested inside a branch compiles and executes correctly.
252+
253+
Catches A02-2: _find_join_step's while loop follows only out_funcs[0],
254+
causing it to return the inner join instead of the outer join. Without the
255+
fix, outer_join and end are silently dropped from the compiled flow.
256+
"""
257+
from .test_utils import deploy_flow_to_scheduler, wait_for_deployed_run
258+
259+
scheduler_type = scheduler_config.scheduler_type
260+
if scheduler_type is None:
261+
pytest.skip("No scheduler configured — requires a scheduler_type")
262+
263+
deployed_flow = deploy_flow_to_scheduler(
264+
flow_name="basic/split_in_branch_flow.py",
265+
tl_args={"decospecs": decospecs, "env": compute_env},
266+
scheduler_args={"cluster": scheduler_config.cluster},
267+
deploy_args={"tags": tag, **(scheduler_config.deploy_args or {})},
268+
scheduler_type=scheduler_type,
269+
)
270+
271+
run = wait_for_deployed_run(deployed_flow)
272+
assert run.successful, "SplitInBranchFlow should complete successfully"
273+
assert sorted(run["outer_join"].task.data.labels) == [
274+
"a",
275+
"b",
276+
], "outer_join should receive results from both branch_a and branch_b"
277+
assert sorted(run["inner_join"].task.data.sub_labels) == [
278+
"x",
279+
"y",
280+
], "inner_join should receive results from inner_x and inner_y"
281+
282+
204283
@pytest.mark.scheduler_only
205284
@pytest.mark.skip(reason="devstack has no GPU nodes")
206285
def test_resources_gpu(exec_mode, decospecs, compute_env, tag, scheduler_config):

0 commit comments

Comments
 (0)