Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmark/conf/hydra/sweeper/multi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
defaults:
- base

_target_: hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper
_target_: hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper
fail_fast: false
51 changes: 46 additions & 5 deletions benchmark/hydra_plugins/smart_sweeper/smart_benchmark_sweeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hydra.core.override_parser.overrides_parser import OverridesParser
from hydra.core.override_parser.types import Override
from hydra.core.plugins import Plugins
from hydra.core.utils import JobStatus
from hydra.plugins.launcher import Launcher
from hydra.plugins.sweeper import Sweeper
from hydra.types import TaskFunction
Expand All @@ -18,18 +19,32 @@

@dataclass
class SmartBenchmarkSweeperConf:
"""Configuration for SmartBenchmarkSweeper.

Attributes:
max_batch_size: Maximum number of jobs to run in a single batch (currently unused)
params: Base parameters to apply to all benchmark configurations
fail_fast: If True, stops execution immediately after first benchmark failure.
Use True for quick validation during development/debugging.
Use False (default) to run all benchmarks and collect all results.
"""

_target_: str = "hydra_plugins.smart_sweeper.smart_benchmark_sweeper.SmartBenchmarkSweeper"
max_batch_size: Optional[int] = None
params: Optional[Dict[str, str]] = None
fail_fast: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth documenting somewhere? Users should know when and why they'd want to enable fail_fast=true. Consider this if it feels reasonable.



ConfigStore.instance().store(group="hydra/sweeper", name="smart_benchmark", node=SmartBenchmarkSweeperConf)


class SmartBenchmarkSweeper(Sweeper):
def __init__(self, max_batch_size: Optional[int] = None, params: Optional[Dict[str, str]] = None):
def __init__(
self, max_batch_size: Optional[int] = None, params: Optional[Dict[str, str]] = None, fail_fast: bool = False
):
self.max_batch_size = max_batch_size
self.params = params or {}
self.fail_fast = fail_fast
self.config: Optional[DictConfig] = None
self.launcher: Optional[Launcher] = None
self.hydra_context: Optional[HydraContext] = None
Expand Down Expand Up @@ -82,11 +97,37 @@ def sweep(self, arguments: List[str]) -> Any:

log.info(f"Generated {len(all_combinations)} total combinations")

returns = []
initial_job_idx = 0
if all_combinations:
self.validate_batch_is_legal(all_combinations)
results = self.launcher.launch(all_combinations, initial_job_idx=initial_job_idx)
self.validate_batch_is_legal(all_combinations)
return self._execute_batches(all_combinations, initial_job_idx)

def _execute_batches(self, all_combinations: List[List[str]], initial_job_idx: int) -> List[Any]:
"""
Execute benchmark combinations in batches.

When fail_fast=False: Launches all combinations in one batch
When fail_fast=True: Launches one combination at a time, stopping on first failure

Args:
all_combinations: List of parameter combinations to execute
initial_job_idx: Starting job index for the launcher

Returns:
List of results from launcher.launch() calls
"""
returns = []
batch_size = 1 if self.fail_fast else len(all_combinations)

for i in range(0, len(all_combinations), batch_size):
batch = all_combinations[i : i + batch_size]
results = self.launcher.launch(batch, initial_job_idx=i)

# Check results immediately if fail_fast enabled
if self.fail_fast:
for r in results:
if r.status == JobStatus.FAILED:
raise r._return_value

returns.append(results)

return returns
Expand Down
73 changes: 73 additions & 0 deletions benchmark/tests/test_execute_batches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import pytest
from unittest.mock import Mock
from hydra.core.utils import JobReturn, JobStatus
from hydra_plugins.smart_sweeper.smart_benchmark_sweeper import SmartBenchmarkSweeper


class TestExecuteBatches:
"""Unit tests for _execute_batches - the core fail_fast logic"""

INVALID_COMBINATIONS = [
["benchmark_type=fio", "mountpoint.stub_mode=off", "network.maximum_throughput_gbps=100"],
[
"benchmark_type=fio",
"mountpoint.stub_mode=s3_client",
"network.maximum_throughput_gbps=100",
], # Invalid config
["benchmark_type=fio", "mountpoint.stub_mode=off", "network.maximum_throughput_gbps=100"],
]

def test_fail_fast_true_stops_on_first_failure(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests look better, though have you verified that it actually behaves this way when called from the CLI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, verified with a real CLI run. Configured a benchmark sweep with 36 total combinations and intentionally used an invalid network interface name (ens129 instead of ens33) to trigger a failure.

Results with fail_fast=true:

  • Job # 0: SUCCESS (fio, single NIC)
  • Job # 1: SUCCESS (fio, single NIC, direct_io=True)
  • Job # 2: FAILED (fio, dual NIC with invalid ens129 interface)
  • Stopped immediately - remaining 33 jobs never executed

The framework correctly caught the CalledProcessError from the mount-s3 command failing with AWS_ERROR_INVALID_ARGUMENT, and stopped execution as expected.

sweeper = SmartBenchmarkSweeper(fail_fast=True)

mock_launcher = Mock()
sweeper.launcher = mock_launcher

mock_launcher.launch.side_effect = [
[JobReturn(status=JobStatus.COMPLETED, _return_value="success")],
[
JobReturn(
status=JobStatus.FAILED,
_return_value=ValueError(
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
),
)
],
[JobReturn(status=JobStatus.COMPLETED, _return_value="success")],
]

# Test 1: Should raise error in failed job
with pytest.raises(ValueError, match="should not use `stub_mode=s3_client` with `maximum_throughput_gbps`"):
sweeper._execute_batches(self.INVALID_COMBINATIONS, initial_job_idx=0)

# Test 2: Verify it stopped after 2nd job (didn't run 3rd)
assert mock_launcher.launch.call_count == 2

def test_fail_fast_false_continues_through_failures(self):
sweeper = SmartBenchmarkSweeper(fail_fast=False)

mock_launcher = Mock()
sweeper.launcher = mock_launcher

# Return all results in ONE batch (fail_fast=False batches everything)
mock_launcher.launch.return_value = [
JobReturn(status=JobStatus.COMPLETED, _return_value="success"),
JobReturn(
status=JobStatus.FAILED,
_return_value=ValueError(
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
),
),
JobReturn(status=JobStatus.COMPLETED, _return_value="success"),
]

# Test 1: Should not raise any exception
results = sweeper._execute_batches(self.INVALID_COMBINATIONS, initial_job_idx=0)

# Test 2: Should call launcher ONCE with all 3 jobs
assert mock_launcher.launch.call_count == 1
assert len(results) == 1
assert len(results[0]) == 3
assert results[0][0].status == JobStatus.COMPLETED # Verify first job succeeded
assert results[0][1].status == JobStatus.FAILED # Verify failure is captured
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing verification that we have success in the other 2 cases

assert results[0][2].status == JobStatus.COMPLETED # Verify third job succeeded
Loading