Skip to content

Commit 503e2d4

Browse files
authored
Merge pull request #41 from TaekyungHeo/not-enough-nodes
Enhance job submission error handling with custom exceptions
2 parents 8198743 + 5bb6d8c commit 503e2d4

File tree

5 files changed

+184
-25
lines changed

5 files changed

+184
-25
lines changed

src/cloudai/_core/base_runner.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from typing import Dict, List, Optional
2525

2626
from .base_job import BaseJob
27+
from .exceptions import JobSubmissionError
2728
from .system import System
2829
from .test import Test
2930
from .test_scenario import TestScenario
@@ -167,14 +168,14 @@ async def submit_test(self, test: Test):
167168
test (Test): The test to be started.
168169
"""
169170
self.logger.info(f"Starting test: {test.section_name}")
170-
job = self._submit_test(test)
171-
if job:
171+
try:
172+
job = self._submit_test(test)
172173
self.jobs.append(job)
173174
self.test_to_job_map[test] = job
174-
else:
175-
msg = f"Failed to run test {test.section_name}"
176-
self.logger.error(msg)
177-
raise RuntimeError(msg)
175+
except JobSubmissionError as e:
176+
self.logger.error(e)
177+
print(e, file=sys.stdout)
178+
sys.exit(1)
178179

179180
async def delayed_submit_test(self, test: Test, delay: int):
180181
"""
@@ -189,17 +190,17 @@ async def delayed_submit_test(self, test: Test, delay: int):
189190
await self.submit_test(test)
190191

191192
@abstractmethod
192-
def _submit_test(self, test: Test) -> Optional[BaseJob]:
193+
def _submit_test(self, test: Test) -> BaseJob:
193194
"""
194195
Execute a given test and returns a job if successful.
195196
196197
Args:
197198
test (Test): The test to be executed.
198199
199200
Returns:
200-
Optional[BaseJob]: A BaseJob object if the test execution is
201-
successful, None otherwise.
201+
BaseJob: A BaseJob object
202202
"""
203+
return BaseJob
203204

204205
async def check_start_post_init_dependencies(self):
205206
"""

src/cloudai/_core/exceptions.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
class JobSubmissionError(Exception):
17+
"""
18+
Exception raised for errors that occur during job submission.
19+
20+
Attributes
21+
test_name (str): The name of the test associated with the job.
22+
command (str): The command that was executed to submit the job.
23+
stdout (str): The standard output from the command execution.
24+
stderr (str): The standard error from the command execution.
25+
message (str): A custom message describing the error.
26+
"""
27+
28+
def __init__(self, test_name: str, command: str, stdout: str, stderr: str, message: str):
29+
"""
30+
Initialize a JobSubmissionError instance.
31+
32+
Args:
33+
test_name (str): The name of the test associated with the job.
34+
command (str): The command that was executed to submit the job.
35+
stdout (str): The standard output from the command execution.
36+
stderr (str): The standard error from the command execution.
37+
message (str): A custom message describing the error.
38+
"""
39+
super().__init__(message)
40+
self.test_name = test_name
41+
self.command = command
42+
self.stdout = stdout.strip()
43+
self.stderr = stderr.strip()
44+
self.message = message
45+
46+
def __str__(self):
47+
"""
48+
Return a formatted string representation of the JobSubmissionError instance.
49+
50+
Returns
51+
str: A formatted string with detailed error information.
52+
"""
53+
return (
54+
f"\nERROR: Job Submission Failed\n"
55+
f"\tTest Name: {self.test_name}\n"
56+
f"\tMessage: {self.message}\n"
57+
f"\tCommand: '{self.command}'\n"
58+
f"\tstdout: '{self.stdout}'\n"
59+
f"\tstderr: '{self.stderr}'\n"
60+
)
61+
62+
63+
class JobIdRetrievalError(JobSubmissionError):
64+
"""
65+
Exception raised when a job ID cannot be retrieved after job submission.
66+
67+
Attributes
68+
Inherits all attributes from JobSubmissionError.
69+
"""
70+
71+
pass

src/cloudai/runner/slurm/slurm_runner.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Optional, cast
15+
from typing import cast
1616

1717
from cloudai._core.base_job import BaseJob
1818
from cloudai._core.base_runner import BaseRunner
19+
from cloudai._core.exceptions import JobIdRetrievalError
1920
from cloudai._core.system import System
2021
from cloudai._core.test import Test
2122
from cloudai._core.test_scenario import TestScenario
@@ -56,28 +57,33 @@ def __init__(self, mode: str, system: System, test_scenario: TestScenario) -> No
5657
self.slurm_system: SlurmSystem = cast(SlurmSystem, system)
5758
self.cmd_shell = CommandShell()
5859

59-
def _submit_test(self, test: Test) -> Optional[SlurmJob]:
60+
def _submit_test(self, test: Test) -> SlurmJob:
6061
"""
6162
Submit a test for execution on Slurm and returns a SlurmJob.
6263
6364
Args:
6465
test (Test): The test to be executed.
6566
6667
Returns:
67-
Optional[SlurmJob]: A SlurmJob object if the test execution is
68-
successful, None otherwise.
68+
SlurmJob: A SlurmJob object
6969
"""
7070
self.logger.info(f"Running test: {test.section_name}")
7171
job_output_path = self.get_job_output_path(test)
7272
exec_cmd = test.gen_exec_command(job_output_path)
7373
self.logger.info(f"Executing command for test {test.section_name}: {exec_cmd}")
74-
job_id = None
74+
job_id = 0
7575
if self.mode == "run":
7676
stdout, stderr = self.cmd_shell.execute(exec_cmd).communicate()
7777
job_id = test.get_job_id(stdout, stderr)
78-
else:
79-
job_id = 0
80-
return SlurmJob(job_id, test) if job_id is not None else None
78+
if job_id is None:
79+
raise JobIdRetrievalError(
80+
test_name=str(test.section_name),
81+
command=exec_cmd,
82+
stdout=stdout,
83+
stderr=stderr,
84+
message="Failed to retrieve job ID from command output.",
85+
)
86+
return SlurmJob(job_id, test)
8187

8288
def is_job_running(self, job: BaseJob) -> bool:
8389
"""

src/cloudai/runner/standalone/standalone_runner.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Optional, cast
15+
from typing import cast
1616

1717
from cloudai._core.base_job import BaseJob
1818
from cloudai._core.base_runner import BaseRunner
19+
from cloudai._core.exceptions import JobIdRetrievalError
1920
from cloudai._core.system import System
2021
from cloudai._core.test import Test
2122
from cloudai._core.test_scenario import TestScenario
@@ -55,28 +56,33 @@ def __init__(
5556
super().__init__(mode, system, test_scenario)
5657
self.cmd_shell = CommandShell()
5758

58-
def _submit_test(self, test: Test) -> Optional[StandaloneJob]:
59+
def _submit_test(self, test: Test) -> StandaloneJob:
5960
"""
6061
Submit a test for execution on Standalone and returns a StandaloneJob.
6162
6263
Args:
6364
test (Test): The test to be executed.
6465
6566
Returns:
66-
Optional[StandaloneJob]: A StandaloneJob object if the test execution is
67-
successful, None otherwise.
67+
StandaloneJob: A StandaloneJob object
6868
"""
6969
self.logger.info(f"Running test: {test.section_name}")
7070
job_output_path = self.get_job_output_path(test)
7171
exec_cmd = test.gen_exec_command(job_output_path)
7272
self.logger.info(f"Executing command for test {test.section_name}: {exec_cmd}")
73-
job_id = None
73+
job_id = 0
7474
if self.mode == "run":
7575
pid = self.cmd_shell.execute(exec_cmd).pid
7676
job_id = test.get_job_id(str(pid), "")
77-
else:
78-
job_id = 0
79-
return StandaloneJob(job_id, test) if job_id is not None else None
77+
if job_id is None:
78+
raise JobIdRetrievalError(
79+
test_name=str(test.section_name),
80+
command=exec_cmd,
81+
stdout="",
82+
stderr="",
83+
message="Failed to retrieve job ID from command output.",
84+
)
85+
return StandaloneJob(job_id, test)
8086

8187
def is_job_running(self, job: BaseJob) -> bool:
8288
"""

tests/test_job_submission_error.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import subprocess
2+
from unittest.mock import MagicMock, Mock
3+
4+
import pytest
5+
from cloudai._core.exceptions import JobIdRetrievalError
6+
from cloudai._core.test import Test
7+
from cloudai._core.test_scenario import TestScenario
8+
from cloudai._core.test_template import TestTemplate
9+
from cloudai.runner.slurm.slurm_runner import SlurmRunner
10+
from cloudai.schema.system import SlurmSystem
11+
from cloudai.schema.system.slurm import SlurmNode, SlurmNodeState
12+
from cloudai.util import CommandShell
13+
14+
15+
class MockCommandShell(CommandShell):
16+
def execute(self, command):
17+
mock_popen = Mock(spec=subprocess.Popen)
18+
mock_popen.communicate.return_value = (
19+
"",
20+
"sbatch: error: Batch job submission failed: Requested node configuration is not available",
21+
)
22+
return mock_popen
23+
24+
25+
class MockTest(Test):
26+
def __init__(self, section_name):
27+
self.name = "Mock Test"
28+
self.description = "A mock test description"
29+
self.test_template = MagicMock(spec=TestTemplate)
30+
self.env_vars = {}
31+
self.cmd_args = {}
32+
self.extra_env_vars = {}
33+
self.extra_cmd_args = ""
34+
self.section_name = "Tests.1"
35+
self.current_iteration = 0
36+
37+
def gen_exec_command(self, output_path):
38+
return "sbatch mock_script.sh"
39+
40+
def get_job_id(self, stdout, stderr):
41+
return None
42+
43+
44+
@pytest.fixture
45+
def slurm_system(tmpdir):
46+
nodes = [
47+
SlurmNode(name="nodeA001", partition="main", state=SlurmNodeState.UNKNOWN_STATE),
48+
SlurmNode(name="nodeB001", partition="main", state=SlurmNodeState.UNKNOWN_STATE),
49+
]
50+
system = SlurmSystem(
51+
name="test_system",
52+
install_path=tmpdir,
53+
output_path=tmpdir,
54+
default_partition="main",
55+
partitions={"main": nodes},
56+
)
57+
return system
58+
59+
60+
@pytest.fixture
61+
def slurm_runner(slurm_system):
62+
test_scenario = TestScenario(name="Test Scenario", tests=[MockTest(section_name="Mock Test")])
63+
runner = SlurmRunner(mode="run", system=slurm_system, test_scenario=test_scenario)
64+
runner.cmd_shell = MockCommandShell()
65+
return runner
66+
67+
68+
def test_job_id_retrieval_error(slurm_runner):
69+
test = slurm_runner.test_scenario.tests[0]
70+
with pytest.raises(JobIdRetrievalError) as excinfo:
71+
slurm_runner._submit_test(test)
72+
assert "Failed to retrieve job ID from command output." in str(excinfo.value)
73+
assert "sbatch: error: Batch job submission failed: Requested node configuration is not available" in str(
74+
excinfo.value
75+
)

0 commit comments

Comments
 (0)