Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmark/conf/hydra/sweeper/multi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
defaults:
- base

_target_: hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper
_target_: hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper
fail_fast: false
24 changes: 21 additions & 3 deletions benchmark/hydra_plugins/smart_sweeper/smart_benchmark_sweeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ class SmartBenchmarkSweeperConf:
_target_: str = "hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper"
max_batch_size: Optional[int] = None
params: Optional[Dict[str, str]] = None
fail_fast: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth documenting somewhere? Users should know when and why they'd want to enable fail_fast=true. Consider this if it feels reasonable.



ConfigStore.instance().store(group="hydra/sweeper", name="smart_benchmark", node=SmartBenchmarkSweeperConf)


class SmartBenchmarkSweeper(Sweeper):
def __init__(self, max_batch_size: Optional[int] = None, params: Optional[Dict[str, str]] = None):
def __init__(
self, max_batch_size: Optional[int] = None, params: Optional[Dict[str, str]] = None, fail_fast: bool = False
):
self.max_batch_size = max_batch_size
self.params = params or {}
self.fail_fast = fail_fast
self.config: Optional[DictConfig] = None
self.launcher: Optional[Launcher] = None
self.hydra_context: Optional[HydraContext] = None
Expand Down Expand Up @@ -86,8 +90,22 @@ def sweep(self, arguments: List[str]) -> Any:
initial_job_idx = 0
if all_combinations:
self.validate_batch_is_legal(all_combinations)
results = self.launcher.launch(all_combinations, initial_job_idx=initial_job_idx)
returns.append(results)

# Determine batch size: run all at once (fail_fast=False) or one at a time (fail_fast=True)
batch_size = 1 if self.fail_fast else len(all_combinations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should fail_fast impact the batch size? Wouldn't it be faster to run in parallel but cancel on failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our SmartBenchmarkSweeper uses Hydra's BasicLauncher by default, which executes jobs sequentially
not in parallel. With fail_fast=True and batch_size=1, the sweeper passes one job at a
time to the launcher, gets the result back immediately, and can check if it failed before launching the next job. If we used batch_size=len(all_combinations), the launcher would run all jobs sequentially and only return after completing every single one, meaning we'd waste time running jobs after a failure instead of stopping early. This fits our requirements.

(For your parralel jobs question..: Hydra does support parallel launchers like
JoblibLauncher, but they don't support canceling in-flight jobs on first failure—they run all submitted jobs to completion. I'm also not sure the effect that parallel might have on the benchmark results in general)


for i in range(0, len(all_combinations), batch_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a test for this.

batch = all_combinations[i : i + batch_size]
results = self.launcher.launch(batch, initial_job_idx=initial_job_idx)

# Check results immediately if fail_fast enabled
# Accessing return_value raises an exception if the job failed (hydra/core/utils.py:251-258)
if self.fail_fast:
for r in results:
_ = r.return_value # Raises on failure, stopping the sweep
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a job fails ? Does the exception message or logs get captured
clearly so users know which benchmark failed and why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does accessing return_value raise on failure?


initial_job_idx += len(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is initial_job_idx the same as i?

returns.append(results)

return returns

Expand Down
126 changes: 126 additions & 0 deletions benchmark/tests/test_fail_fast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from unittest.mock import Mock, PropertyMock
from hydra.core.override_parser.overrides_parser import OverridesParser
from hydra_plugins.smart_sweeper.smart_benchmark_sweeper import SmartBenchmarkSweeper


class TestFailFastBehavior:
"""
Tests for the fail_fast feature in SmartBenchmarkSweeper.

fail_fast controls whether benchmark execution stops on first failure:
- fail_fast=False: All combinations run in one batch (default)
- fail_fast=True: Combinations run one at a time, stopping on first error
"""

def setup_method(self):
self.parser = OverridesParser.create()

def test_fail_fast_configuration(self):
"""Test that fail_fast defaults to False and can be set to True"""

# Test default (fail_fast=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: useless comments

sweeper_default = SmartBenchmarkSweeper()
assert not sweeper_default.fail_fast, "Default should be fail_fast=False"

# Test fail_fast=True
sweeper_fast_fail = SmartBenchmarkSweeper(fail_fast=True)
assert sweeper_fast_fail.fail_fast

print("✓ fail_fast configuration test passed")

def test_fail_fast_affects_batch_size(self):
"""
Test that fail_fast controls batch size:
- fail_fast=False → batch_size = all combinations
- fail_fast=True → batch_size = 1
"""

# Create mock launcher and setup
mock_launcher = Mock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract to function build_smart_sweeper?

# Because we don't want to actually run benchmarks in this test. We just want to test the logic of how batches are created.
mock_launcher.launch = Mock(return_value=[])

# Test with fail_fast=False (should batch all combinations)
sweeper_normal = SmartBenchmarkSweeper(fail_fast=False)
sweeper_normal.launcher = mock_launcher

# We give it a fake config (just needs a directory path)
sweeper_normal.config = Mock()
sweeper_normal.config.hydra.sweep.dir = "/tmp/test"

# "Run FIO benchmark with 1 worker, then 2 workers, then 3 workers"
test_overrides = ['benchmark_type=fio', 'application_workers=1,2,3']
parsed = self.parser.parse_overrides(test_overrides)

# Generate combinations (should be 3: workers=1,2,3)
combinations = sweeper_normal._generate_combinations_for_type("fio", parsed)
assert len(combinations) == 3, f"Expected 3 combinations, got {len(combinations)}"

# Mock the sweep to test batching
sweeper_normal._load_benchmark_params = Mock(return_value=[])
sweeper_normal._extract_benchmark_types = Mock(return_value=['fio'])

# Test the batching logic in the sweeper
all_combinations = combinations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why renaming the variable?

batch_size = 1 if sweeper_normal.fail_fast else len(all_combinations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't testing anything useful.


assert batch_size == 3, f"With fail_fast=False, batch_size should be {len(all_combinations)}, got {batch_size}"

# Test with fail_fast=True (should batch one at a time)
sweeper_fast_fail = SmartBenchmarkSweeper(fail_fast=True)
batch_size_fast_fail = 1 if sweeper_fast_fail.fail_fast else len(all_combinations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this isn't testing anything useful


assert batch_size_fast_fail == 1, f"With fail_fast=True, batch_size should be 1, got {batch_size_fast_fail}"

print("✓ fail_fast batch size test passed")

def test_fail_fast_stops_on_first_error(self):
"""Test that fail_fast=True stops iteration when a job fails"""

# Accessing return_value raises an exception if the job failed (hydra/core/utils.py:251-258)
# When status == JobStatus.COMPLETED, return_value returns normally
# When status != JobStatus.COMPLETED, return_value raises the stored exception

# Test fail_fast=True behavior
sweeper_fast_fail = SmartBenchmarkSweeper(fail_fast=True)

# Test 1: Successful job should not raise - return_value returns normally
mock_result_success = Mock()
mock_result_success.return_value = None # Success

results = [mock_result_success]

# Accessing return_value on a successful job should NOT raise an exception
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not useful.

if sweeper_fast_fail.fail_fast:
for r in results:
_ = r.return_value # Should not raise
success_check_passed = True
except Exception:
success_check_passed = False

assert success_check_passed, "First successful result should not raise"

# Test 2: Failed job should raise when return_value is accessed
# Context: Hydra's JobReturn.return_value is a property that raises exceptions and PropertyMock simulates this behavior
# We mock the return_value attribute and make it behave like a property, so when accesed it will raise this exception.
mock_result_failure = Mock()

# Use PropertyMock to simulate Hydra's property that raises on access
type(mock_result_failure).return_value = PropertyMock(side_effect=Exception("Job failed!"))

results_with_failure = [mock_result_failure]

# Accessing return_value on a failed job should raise an exception
try:
if sweeper_fast_fail.fail_fast:
for r in results_with_failure:
_ = r.return_value # Should raise
failure_check_passed = False
except Exception as e:
failure_check_passed = True
print(f"✓ Correctly caught failure: {e}")

assert failure_check_passed, "Failed result should raise exception with fail_fast=True"

print("✓ fail_fast error handling test passed")
Loading