Skip to content

Commit 23903a9

Browse files
author
Nissan Pow
committed
add timeout enforcement compliance test
Deploys a flow where a step exceeds its @timeout(seconds=5) and verifies the run fails. The existing test_timeout only checks that @timeout doesn't break normal execution — it never tests that a step actually gets killed. Adds wait_for_deployed_run_allow_failure() helper that returns the Run even when it fails, so tests can assert on run.successful == False.
1 parent e730f1e commit 23903a9

3 files changed

Lines changed: 109 additions & 0 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import time
2+
3+
from metaflow import FlowSpec, step, timeout, project
4+
5+
6+
@project(name="timeout_enforce_flow")
7+
class TimeoutEnforceFlow(FlowSpec):
8+
"""Step exceeds its @timeout — the run MUST fail."""
9+
10+
@step
11+
def start(self):
12+
self.next(self.slow)
13+
14+
@timeout(seconds=5)
15+
@step
16+
def slow(self):
17+
# Sleep well beyond the 5-second timeout to guarantee enforcement.
18+
time.sleep(120)
19+
self.next(self.end)
20+
21+
@step
22+
def end(self):
23+
pass
24+
25+
26+
if __name__ == "__main__":
27+
TimeoutEnforceFlow()

test/ux/core/test_compliance.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from .test_utils import (
2121
deploy_flow_to_scheduler,
2222
wait_for_deployed_run,
23+
wait_for_deployed_run_allow_failure,
2324
)
2425

2526

@@ -311,3 +312,45 @@ def test_nested_foreach_or_skip(
311312
f"Nested foreach produced wrong results: {all_results!r}. "
312313
"Expected ['x-1', 'y-1']."
313314
)
315+
316+
317+
# ---------------------------------------------------------------------------
318+
# test_timeout_enforcement
319+
#
320+
# WHY: The existing test_timeout only verifies that @timeout doesn't break
321+
# normal execution (step sleeps 1s with a 10-minute timeout — always passes).
322+
# If the timeout= kwarg on subprocess.run() is completely broken, that test
323+
# still passes. This test deploys a flow where a step sleeps well beyond its
324+
# @timeout(seconds=5) and verifies the run actually fails.
325+
# ---------------------------------------------------------------------------
326+
327+
328+
@pytest.mark.compliance
329+
@pytest.mark.scheduler_only
330+
def test_timeout_enforcement(exec_mode, decospecs, compute_env, tag, scheduler_config):
331+
"""A step that exceeds its @timeout must be killed — the run must fail."""
332+
if exec_mode != "deployer":
333+
pytest.skip("compliance test requires deployer mode")
334+
335+
test_unique_tag = f"test_compliance_timeout_enforce_{exec_mode}"
336+
combined_tags = tag + [test_unique_tag]
337+
338+
tl_args = {
339+
"env": compute_env,
340+
"decospecs": decospecs,
341+
}
342+
343+
deployed_flow = deploy_flow_to_scheduler(
344+
flow_name="basic/timeout_enforce_flow.py",
345+
tl_args=tl_args,
346+
scheduler_args={"cluster": scheduler_config.cluster},
347+
deploy_args={"tags": combined_tags, **(scheduler_config.deploy_args or {})},
348+
scheduler_type=scheduler_config.scheduler_type,
349+
)
350+
351+
run = wait_for_deployed_run_allow_failure(deployed_flow)
352+
353+
assert not run.successful, (
354+
"Run should have failed because the 'slow' step exceeds its "
355+
"@timeout(seconds=5), but it succeeded. Timeout enforcement may be broken."
356+
)

test/ux/core/test_utils.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,45 @@ def wait_for_deployed_run(
199199
return triggered_run.run
200200

201201

202+
def wait_for_deployed_run_allow_failure(
203+
deployed_flow,
204+
timeout: int = 3600,
205+
run_kwargs: Optional[Dict[str, Any]] = None,
206+
polling_interval: int = 3,
207+
):
208+
"""Trigger a deployed flow and wait for it to finish, even if it fails.
209+
210+
Same as wait_for_deployed_run but does NOT raise on failed status.
211+
Returns the Run regardless of success/failure so the caller can assert
212+
on run.successful.
213+
"""
214+
print(f"Deployed flow {deployed_flow.name}")
215+
run_kwargs = run_kwargs or {}
216+
triggered_run = deployed_flow.trigger(**run_kwargs)
217+
218+
start_time = time.time()
219+
while triggered_run.run is None:
220+
if time.time() - start_time > timeout:
221+
raise RuntimeError(f"Run failed to start within {timeout} seconds")
222+
print("Waiting for run to start...")
223+
time.sleep(polling_interval)
224+
225+
print(f"Run {triggered_run.run.id} started")
226+
227+
while not triggered_run.run.finished:
228+
if time.time() - start_time > timeout:
229+
raise RuntimeError(
230+
f"Run {triggered_run.run.id} failed to complete within {timeout} seconds"
231+
)
232+
print(f"Waiting for run {triggered_run.run.id} to complete...")
233+
time.sleep(polling_interval)
234+
235+
print(
236+
f"Run {triggered_run.run.id} completed (successful={triggered_run.run.successful})"
237+
)
238+
return triggered_run.run
239+
240+
202241
def verify_run_provenance(run: Run, decospecs: Any) -> None:
203242
"""Verify the run used the expected datastore and execution environment.
204243

0 commit comments

Comments
 (0)