generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 234
Expand file tree
/
Copy pathtest_execute_batches.py
More file actions
73 lines (60 loc) · 3.07 KB
/
test_execute_batches.py
File metadata and controls
73 lines (60 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import pytest
from unittest.mock import Mock
from hydra.core.utils import JobReturn, JobStatus
from hydra_plugins.smart_sweeper.smart_benchmark_sweeper import SmartBenchmarkSweeper
class TestExecuteBatches:
"""Unit tests for _execute_batches - the core fail_fast logic"""
INVALID_COMBINATIONS = [
["benchmark_type=fio", "mountpoint.stub_mode=off", "network.maximum_throughput_gbps=100"],
[
"benchmark_type=fio",
"mountpoint.stub_mode=s3_client",
"network.maximum_throughput_gbps=100",
], # Invalid config
["benchmark_type=fio", "mountpoint.stub_mode=off", "network.maximum_throughput_gbps=100"],
]
def test_fail_fast_true_stops_on_first_failure(self):
sweeper = SmartBenchmarkSweeper(fail_fast=True)
# We create Mock launcher that returns JobReturn objects to simulate a set of benchmarking jobs
mock_launcher = Mock()
sweeper.launcher = mock_launcher
# side_effect makes mock return different values on each call: 1st call gets 1st item, 2nd call gets 2nd item, etc.
mock_launcher.launch.side_effect = [
[JobReturn(status=JobStatus.COMPLETED, _return_value="success")],
[
JobReturn(
status=JobStatus.FAILED,
_return_value=ValueError(
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
),
)
],
[JobReturn(status=JobStatus.COMPLETED, _return_value="success")],
]
# Test 1: Should raise error in failed job
with pytest.raises(ValueError, match="should not use `stub_mode=s3_client` with `maximum_throughput_gbps`"):
sweeper._execute_batches(self.INVALID_COMBINATIONS, initial_job_idx=0)
# Test 2: Verify it stopped after 2nd job (didn't run 3rd)
assert mock_launcher.launch.call_count == 2
def test_fail_fast_false_continues_through_failures(self):
sweeper = SmartBenchmarkSweeper(fail_fast=False)
mock_launcher = Mock()
sweeper.launcher = mock_launcher
# Return all results in ONE batch (fail_fast=False batches everything)
mock_launcher.launch.return_value = [
JobReturn(status=JobStatus.COMPLETED, _return_value="success"),
JobReturn(
status=JobStatus.FAILED,
_return_value=ValueError(
"should not use `stub_mode=s3_client` with `maximum_throughput_gbps`, throughput will be limited"
),
),
JobReturn(status=JobStatus.COMPLETED, _return_value="success"),
]
# Test 1: Should not raise any exception
results = sweeper._execute_batches(self.INVALID_COMBINATIONS, initial_job_idx=0)
# Test 2: Should call launcher ONCE with all 3 jobs
assert mock_launcher.launch.call_count == 1
assert len(results) == 1
assert len(results[0]) == 3
assert results[0][1].status == JobStatus.FAILED # Verify failure is captured