Skip to content

Commit dcd0ed4

Browse files
author
Nissan Pow
committed
test: add regression tests for metaflow-mage QA findings
Adds tests that catch bugs found in the deep-QA run of metaflow-mage (run 20260315-193000). Tests are written to fail against the unfixed code and pass after fixes are applied. test_dag.py: - test_foreach, test_multibody_foreach: assert len(tasks) == N to catch silent foreach_count=1 fallback (D-FOREACH-1) - test_nested_foreach_2x2: 2-outer x 2-inner structure asserting all 4 combinations appear in outer_join and 4 inner tasks exist; the minimal 1-inner case in test_nested_foreach passes even with D-NESTED-1 bug test_compliance.py: - test_timeout_minutes_enforced: verifies @timeout(minutes=1) actually kills the step (D-TIMEOUT-1); skipped on remote backends same as test_timeout_enforcement - test_run_param_not_dropped: deploys flow with retry_count param, triggers with retry_count=42, asserts value arrives at task runtime test_utils.py: - wait_for_deployed_run: assert triggered_run.run is not None, catching run_id mismatches between deployer and init block in every deployer test New flows: - flows/basic/timeout_minutes_flow.py: @timeout(minutes=1) on 120s sleep - flows/basic/reserved_param_flow.py: retry_count Parameter for param passthrough test - flows/dag/nested_foreach_2x2_flow.py: 2x2 nested foreach (groups x items)
1 parent 49b411d commit dcd0ed4

6 files changed

Lines changed: 264 additions & 1 deletion

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Flow with a parameter named 'retry_count' to test parameter passing correctness.
2+
3+
When deployed to Mage, parameters were silently dropped when trigger variables
4+
had None values or when JSON serialization lost the value. This flow verifies
5+
that parameter values arrive correctly at task runtime.
6+
"""
7+
8+
from metaflow import FlowSpec, Parameter, step, project
9+
10+
11+
@project(name="reserved_param_flow")
12+
class ReservedParamFlow(FlowSpec):
13+
retry_count = Parameter(
14+
"retry_count",
15+
default=0,
16+
type=int,
17+
help="Number of retries (named retry_count to test parameter passing)",
18+
)
19+
20+
@step
21+
def start(self):
22+
self.stored_retry_count = self.retry_count
23+
self.next(self.end)
24+
25+
@step
26+
def end(self):
27+
pass
28+
29+
30+
if __name__ == "__main__":
31+
ReservedParamFlow()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""Flow with @timeout(minutes=1) on a step that sleeps 120 seconds.
2+
3+
Used to verify that @timeout(minutes=N) is correctly parsed and enforced —
4+
not silently treated as @timeout(seconds=0).
5+
"""
6+
7+
import time
8+
9+
from metaflow import FlowSpec, step, timeout, project
10+
11+
12+
@project(name="timeout_minutes_flow")
13+
class TimeoutMinutesFlow(FlowSpec):
14+
"""Step with @timeout(minutes=1) sleeps 2 minutes — must be killed after 1 minute."""
15+
16+
@step
17+
def start(self):
18+
self.next(self.slow)
19+
20+
@timeout(minutes=1)
21+
@step
22+
def slow(self):
23+
# Sleep 2 minutes — should be killed after 1 minute timeout.
24+
time.sleep(120)
25+
self.done = True
26+
self.next(self.end)
27+
28+
@step
29+
def end(self):
30+
pass
31+
32+
33+
if __name__ == "__main__":
34+
TimeoutMinutesFlow()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Nested foreach flow with 2 outer x 2 inner items — tests full 2x2 fanout."""
2+
3+
import os
4+
5+
from metaflow import FlowSpec, step, project
6+
7+
8+
@project(name="nested_foreach_2x2_flow")
9+
class NestedForeach2x2Flow(FlowSpec):
10+
@step
11+
def start(self):
12+
self.execution_env = os.environ.get("KUBERNETES_SERVICE_HOST", "")
13+
self.groups = ["x", "y"]
14+
self.next(self.outer, foreach="groups")
15+
16+
@step
17+
def outer(self):
18+
self.group = self.input
19+
self.items = [1, 2]
20+
self.next(self.inner, foreach="items")
21+
22+
@step
23+
def inner(self):
24+
self.result = "%s-%d" % (self.group, self.input)
25+
self.next(self.inner_join)
26+
27+
@step
28+
def inner_join(self, inputs):
29+
self.inner_results = sorted([i.result for i in inputs])
30+
self.next(self.outer_join)
31+
32+
@step
33+
def outer_join(self, inputs):
34+
self.all_results = sorted([r for i in inputs for r in i.inner_results])
35+
self.next(self.end)
36+
37+
@step
38+
def end(self):
39+
pass
40+
41+
42+
if __name__ == "__main__":
43+
NestedForeach2x2Flow()

test/ux/core/test_compliance.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,100 @@ def test_timeout_enforcement(exec_mode, decospecs, compute_env, tag, scheduler_c
359359
"Run should have failed because the 'slow' step exceeds its "
360360
"@timeout(seconds=5), but it succeeded. Timeout enforcement may be broken."
361361
)
362+
363+
364+
# ---------------------------------------------------------------------------
365+
# test_timeout_minutes_enforced
366+
#
367+
# WHY: _get_timeout_seconds only read 'seconds' attribute, silently ignoring
368+
# 'minutes'. @timeout(minutes=1) produced no timeout at all — the step ran
369+
# indefinitely. This test verifies that minute-based timeouts are actually
370+
# enforced (D-TIMEOUT-1).
371+
# ---------------------------------------------------------------------------
372+
373+
374+
@pytest.mark.compliance
375+
@pytest.mark.scheduler_only
376+
@pytest.mark.skip(
377+
reason="@timeout enforcement on remote backends (argo/sfn/airflow) is not "
378+
"reliable — the run may hang instead of failing. Needs backend-specific "
379+
"timeout mechanisms (e.g. activeDeadlineSeconds for k8s). See #XXXX."
380+
)
381+
def test_timeout_minutes_enforced(
382+
exec_mode, decospecs, compute_env, tag, scheduler_config
383+
):
384+
"""WHY: _get_timeout_seconds only read 'seconds' attribute, silently ignoring 'minutes'.
385+
@timeout(minutes=1) produced no timeout at all.
386+
This test verifies that minute-based timeouts are actually enforced (D-TIMEOUT-1).
387+
"""
388+
if exec_mode != "deployer":
389+
pytest.skip("compliance test requires deployer mode")
390+
391+
test_unique_tag = f"test_compliance_timeout_minutes_{exec_mode}"
392+
combined_tags = tag + [test_unique_tag]
393+
394+
tl_args = {
395+
"env": compute_env,
396+
"decospecs": decospecs,
397+
}
398+
399+
deployed_flow = deploy_flow_to_scheduler(
400+
flow_name="basic/timeout_minutes_flow.py",
401+
tl_args=tl_args,
402+
scheduler_args={"cluster": scheduler_config.cluster},
403+
deploy_args={"tags": combined_tags, **(scheduler_config.deploy_args or {})},
404+
scheduler_type=scheduler_config.scheduler_type,
405+
)
406+
407+
run = wait_for_deployed_run_allow_failure(deployed_flow)
408+
409+
assert not run.successful, (
410+
"@timeout(minutes=1) was NOT enforced — the step ran for 2+ minutes without being killed. "
411+
"Check that _get_timeout_seconds correctly computes minutes*60+seconds."
412+
)
413+
414+
415+
# ---------------------------------------------------------------------------
416+
# test_run_param_not_dropped
417+
#
418+
# WHY: Parameters were silently dropped when trigger variables dict had None
419+
# values or when JSON serialization lost the value. Verify parameter values
420+
# arrive correctly at task runtime.
421+
# ---------------------------------------------------------------------------
422+
423+
424+
@pytest.mark.compliance
425+
@pytest.mark.scheduler_only
426+
def test_run_param_not_dropped(
427+
exec_mode, decospecs, compute_env, tag, scheduler_config
428+
):
429+
"""WHY: Parameters were silently dropped when trigger variables dict had None values
430+
or when JSON serialization lost the value. Verify parameter values arrive correctly.
431+
"""
432+
if exec_mode != "deployer":
433+
pytest.skip("compliance test requires deployer mode")
434+
435+
test_unique_tag = f"test_compliance_run_param_not_dropped_{exec_mode}"
436+
combined_tags = tag + [test_unique_tag]
437+
438+
tl_args = {
439+
"env": compute_env,
440+
"decospecs": decospecs,
441+
}
442+
443+
deployed_flow = deploy_flow_to_scheduler(
444+
flow_name="basic/reserved_param_flow.py",
445+
tl_args=tl_args,
446+
scheduler_args={"cluster": scheduler_config.cluster},
447+
deploy_args={"tags": combined_tags, **(scheduler_config.deploy_args or {})},
448+
scheduler_type=scheduler_config.scheduler_type,
449+
)
450+
451+
run = wait_for_deployed_run(deployed_flow, run_kwargs={"retry_count": 42})
452+
453+
assert run.successful, "Run was not successful"
454+
assert run["start"].task.data.stored_retry_count == 42, (
455+
"Expected retry_count=42, got %r. "
456+
"Parameter may have been dropped or not passed correctly."
457+
% run["start"].task.data.stored_retry_count
458+
)

test/ux/core/test_dag.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ def test_foreach(exec_mode, decospecs, compute_env, tag, scheduler_config):
3636
)
3737

3838
assert run.successful, "Run was not successful"
39+
# Verify exact fanout count — catches silent foreach_count=1 fallback (D-FOREACH-1)
40+
process_tasks = list(run["process"].tasks())
41+
assert len(process_tasks) == 3, (
42+
"Expected 3 foreach tasks for items=[1,2,3], got %d. "
43+
"This may indicate foreach_count fell back to 1." % len(process_tasks)
44+
)
3945
assert run["join"].task.data.results == [
4046
2,
4147
4,
@@ -56,6 +62,10 @@ def test_multibody_foreach(exec_mode, decospecs, compute_env, tag, scheduler_con
5662
)
5763

5864
assert run.successful, "Run was not successful"
65+
process_tasks = list(run["process"].tasks())
66+
assert len(process_tasks) == 3, "Expected 3 foreach process tasks, got %d" % len(
67+
process_tasks
68+
)
5969
assert run["join"].task.data.results == [
6070
3,
6171
5,
@@ -153,3 +163,46 @@ def test_nested_foreach(exec_mode, decospecs, compute_env, tag, scheduler_config
153163
"x-1",
154164
"y-1",
155165
], "Nested foreach all_results didn't match"
166+
167+
168+
def test_nested_foreach_2x2(exec_mode, decospecs, compute_env, tag, scheduler_config):
169+
"""Verify nested foreach with 2 outer x 2 inner items — catches D-NESTED-1 semantic bug."""
170+
from metaflow.exception import MetaflowException
171+
172+
try:
173+
run = execute_test_flow(
174+
flow_name="dag/nested_foreach_2x2_flow.py",
175+
exec_mode=exec_mode,
176+
decospecs=decospecs,
177+
tag=tag,
178+
scheduler_config=scheduler_config,
179+
test_name="nested_foreach_2x2",
180+
tl_args_extra={"env": compute_env},
181+
)
182+
except (MetaflowException, Exception) as e:
183+
msg = str(e).lower()
184+
if exec_mode == "deployer" and (
185+
"not supported" in msg or "not yet supported" in msg
186+
):
187+
pytest.skip(
188+
f"{scheduler_config.scheduler_type} does not support nested foreach: {e}"
189+
)
190+
raise
191+
192+
assert run.successful, "Run was not successful"
193+
# Must have all 4 combinations: x-1, x-2, y-1, y-2
194+
assert run["outer_join"].task.data.all_results == [
195+
"x-1",
196+
"x-2",
197+
"y-1",
198+
"y-2",
199+
], (
200+
"Expected 4 results from 2x2 nested foreach, got: %s. "
201+
"This may indicate nested_foreach_join is not aggregating all outer items correctly."
202+
% run["outer_join"].task.data.all_results
203+
)
204+
# Verify inner task count: 4 inner tasks total (2 outer x 2 inner)
205+
inner_tasks = list(run["inner"].tasks())
206+
assert (
207+
len(inner_tasks) == 4
208+
), "Expected 4 inner tasks for 2x2 foreach, got %d" % len(inner_tasks)

test/ux/core/test_utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ def wait_for_deployed_run(
182182
print("Waiting for run to start...")
183183
time.sleep(polling_interval)
184184

185-
print(f"Run {triggered_run.run.id} started")
185+
run = triggered_run.run
186+
assert run is not None, (
187+
"triggered_run.run returned None — run_id mismatch between deployer and init block. "
188+
"Check that pipeline_run_id kwarg is injected by the scheduler."
189+
)
190+
print(f"Run {run.id} started")
186191

187192
while not triggered_run.run.finished:
188193
if time.time() - start_time > timeout:

0 commit comments

Comments
 (0)