Skip to content

Commit 0c35fd3

Browse files
authored
Merge pull request #567 from NVIDIA/am/more-slurm-meta
Expand slurm meta to have per-step information
2 parents eac711c + 752e276 commit 0c35fd3

File tree

9 files changed

+179
-96
lines changed

9 files changed

+179
-96
lines changed

src/cloudai/cli/handlers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ def generate_reports(system: System, test_scenario: TestScenario, result_dir: Pa
168168
reporter = reporter_class(system, test_scenario, result_dir, cfg)
169169
reporter.generate()
170170
except Exception as e:
171-
logging.warning(f"Error generating report: {e}")
171+
logging.warning(f"Error generating report '{name}', see debug log for details")
172+
logging.debug(e, stack_info=True)
172173

173174

174175
def handle_non_dse_job(runner: Runner, args: argparse.Namespace) -> None:

src/cloudai/systems/slurm/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
from .metadata import SlurmSystemMetadata
1817
from .single_sbatch_runner import SingleSbatchRunner
1918
from .slurm_command_gen_strategy import SlurmCommandGenStrategy
2019
from .slurm_installer import SlurmInstaller
2120
from .slurm_job import SlurmJob
21+
from .slurm_metadata import SlurmJobMetadata, SlurmStepMetadata, SlurmSystemMetadata
2222
from .slurm_node import SlurmNode, SlurmNodeState
2323
from .slurm_runner import SlurmRunner
24-
from .slurm_system import SlurmGroup, SlurmJobMetadata, SlurmPartition, SlurmSystem, parse_node_list
24+
from .slurm_system import SlurmGroup, SlurmPartition, SlurmSystem, parse_node_list
2525

2626
__all__ = [
2727
"SingleSbatchRunner",
@@ -34,6 +34,7 @@
3434
"SlurmNodeState",
3535
"SlurmPartition",
3636
"SlurmRunner",
37+
"SlurmStepMetadata",
3738
"SlurmSystem",
3839
"SlurmSystemMetadata",
3940
"parse_node_list",

src/cloudai/systems/slurm/single_sbatch_runner.py

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@
2121
from pathlib import Path
2222
from typing import Generator, Optional, cast
2323

24-
import toml
25-
2624
from cloudai.core import JobIdRetrievalError, System, TestRun, TestScenario
25+
from cloudai.systems.slurm.slurm_metadata import SlurmJobMetadata, SlurmStepMetadata
2726
from cloudai.util import CommandShell, format_time_limit, parse_time_limit
2827

2928
from .slurm_command_gen_strategy import SlurmCommandGenStrategy
3029
from .slurm_runner import SlurmJob, SlurmRunner
31-
from .slurm_system import SlurmJobMetadata, SlurmSystem
30+
from .slurm_system import SlurmSystem
3231

3332

3433
class SingleSbatchRunner(SlurmRunner):
@@ -184,7 +183,7 @@ async def run(self):
184183
is_completed = True if self.mode == "dry-run" else self.system.is_job_completed(job)
185184
await asyncio.sleep(self.system.monitor_interval)
186185

187-
await self.job_completion_callback(job)
186+
self.on_job_completion(job)
188187

189188
def _submit_test(self, tr: TestRun) -> SlurmJob:
190189
with open(self.scenario_root / "cloudai_sbatch_script.sh", "w") as f:
@@ -206,25 +205,20 @@ def _submit_test(self, tr: TestRun) -> SlurmJob:
206205
logging.info(f"Submitted slurm job: {job_id}")
207206
return SlurmJob(tr, id=job_id)
208207

209-
def store_job_metadata(self, job: SlurmJob):
210-
logging.debug(f"Storing job metadata for job {job.id}")
211-
res = None if self.mode == "dry-run" else self.system.get_job_status(job)
212-
logging.debug(f"Job status ra: {res}")
213-
214-
job_name, job_state, time_sec = "unknown", "UNKNOWN", 0
215-
if res:
216-
job_name, job_state, time_sec = res[0], res[1], int(res[2])
217-
218-
job_meta = SlurmJobMetadata(
208+
def _get_job_metadata(
209+
self, job: SlurmJob, steps_metadata: list[SlurmStepMetadata]
210+
) -> tuple[Path, SlurmJobMetadata]:
211+
return self.scenario_root / "slurm-job.toml", SlurmJobMetadata(
219212
job_id=int(job.id),
220-
job_name=job_name,
221-
job_state=job_state,
222-
elapsed_time_sec=time_sec,
213+
name=steps_metadata[0].name,
214+
state=steps_metadata[0].state,
215+
exit_code=steps_metadata[0].exit_code,
216+
start_time=steps_metadata[0].start_time,
217+
end_time=steps_metadata[0].end_time,
218+
elapsed_time_sec=steps_metadata[0].elapsed_time_sec,
219+
job_steps=steps_metadata[1:],
223220
srun_cmd="n/a for single sbatch run",
224221
test_cmd="n/a for single sbatch run",
222+
is_single_sbatch=True,
223+
job_root=self.scenario_root.absolute(),
225224
)
226-
227-
job_res = self.scenario_root / "slurm-job.toml"
228-
with job_res.open("w") as job_file:
229-
toml.dump(job_meta.model_dump(), job_file)
230-
logging.debug(f"Saved job metadata: {job_res}")

src/cloudai/systems/slurm/metadata.py renamed to src/cloudai/systems/slurm/slurm_metadata.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,68 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
from pydantic import BaseModel
17+
from __future__ import annotations
18+
19+
from pathlib import Path
20+
21+
from pydantic import BaseModel, ConfigDict, field_serializer
22+
23+
24+
class _SlurmStepMetadataBase(BaseModel):
25+
"""Represents the metadata of a Slurm job step."""
26+
27+
model_config = ConfigDict(extra="forbid")
28+
29+
job_id: int
30+
name: str
31+
state: str
32+
start_time: str
33+
end_time: str
34+
elapsed_time_sec: int
35+
exit_code: str
36+
37+
38+
class SlurmStepMetadata(_SlurmStepMetadataBase):
39+
"""Represents the metadata of a Slurm job step."""
40+
41+
model_config = ConfigDict(extra="forbid")
42+
43+
step_id: str
44+
submit_line: str
45+
46+
@classmethod
47+
def from_sacct_single_line(cls, line: str, delimiter: str) -> SlurmStepMetadata:
48+
data = line.split(delimiter)
49+
if len(data) < 8:
50+
raise ValueError(f"Invalid line: {line}")
51+
52+
job_id, step_id = data[0].split(".") if "." in data[0] else (data[0], "")
53+
54+
return cls(
55+
job_id=int(job_id),
56+
step_id=step_id,
57+
name=data[1],
58+
state=data[2],
59+
exit_code=data[3],
60+
start_time=data[4],
61+
end_time=data[5],
62+
elapsed_time_sec=int(data[6]),
63+
submit_line=data[7],
64+
)
65+
66+
67+
class SlurmJobMetadata(_SlurmStepMetadataBase):
68+
"""Represents the metadata of a Slurm job."""
69+
70+
srun_cmd: str
71+
test_cmd: str
72+
is_single_sbatch: bool = False
73+
job_root: Path
74+
job_steps: list[SlurmStepMetadata]
75+
76+
@field_serializer("job_root")
77+
def _path_serializer(self, v: Path) -> str:
78+
return str(v)
1879

1980

2081
class MetadataSystem(BaseModel):

src/cloudai/systems/slurm/slurm_runner.py

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525

2626
from .slurm_command_gen_strategy import SlurmCommandGenStrategy
2727
from .slurm_job import SlurmJob
28-
from .slurm_system import SlurmJobMetadata, SlurmSystem
28+
from .slurm_metadata import SlurmJobMetadata, SlurmStepMetadata
29+
from .slurm_system import SlurmSystem
2930

3031

3132
class SlurmRunner(BaseRunner):
@@ -59,25 +60,46 @@ def _submit_test(self, tr: TestRun) -> SlurmJob:
5960
logging.info(f"Submitted slurm job: {job_id}")
6061
return SlurmJob(tr, id=job_id)
6162

62-
async def job_completion_callback(self, job: BaseJob) -> None:
63-
self.store_job_metadata(job)
63+
def on_job_completion(self, job: BaseJob) -> None:
64+
logging.debug(f"Job completion callback for job {job.id}")
65+
self.store_job_metadata(cast(SlurmJob, job))
6466

65-
def store_job_metadata(self, job):
66-
jb = cast(SlurmJob, job)
67-
system = cast(SlurmSystem, self.system)
68-
cmd_gen = cast(SlurmCommandGenStrategy, jb.test_run.test.test_template.command_gen_strategy)
69-
res = None if self.mode == "dry-run" else system.get_job_status(jb)
70-
job_name, job_state, time_sec = "unknown", "UNKNOWN", 0
71-
if res:
72-
job_name, job_state, time_sec = res[0], res[1], int(res[2])
73-
job_meta = SlurmJobMetadata(
74-
job_id=int(jb.id),
75-
job_name=job_name,
76-
job_state=job_state,
77-
elapsed_time_sec=time_sec,
78-
srun_cmd=cmd_gen.gen_srun_command(jb.test_run),
79-
test_cmd=" ".join(cmd_gen.generate_test_command({}, {}, jb.test_run)),
67+
def _mock_job_metadata(self) -> SlurmStepMetadata:
68+
return SlurmStepMetadata(
69+
job_id=0,
70+
step_id="",
71+
name="unknown",
72+
state="UNKNOWN",
73+
exit_code="0",
74+
start_time="",
75+
end_time="",
76+
elapsed_time_sec=0,
77+
submit_line="dry-run test",
78+
)
79+
80+
def _get_job_metadata(
81+
self, job: SlurmJob, steps_metadata: list[SlurmStepMetadata]
82+
) -> tuple[Path, SlurmJobMetadata]:
83+
cmd_gen = cast(SlurmCommandGenStrategy, job.test_run.test.test_template.command_gen_strategy)
84+
return job.test_run.output_path / "slurm-job.toml", SlurmJobMetadata(
85+
job_id=int(job.id),
86+
name=steps_metadata[0].name,
87+
state=steps_metadata[0].state,
88+
exit_code=steps_metadata[0].exit_code,
89+
start_time=steps_metadata[0].start_time,
90+
end_time=steps_metadata[0].end_time,
91+
elapsed_time_sec=steps_metadata[0].elapsed_time_sec,
92+
job_steps=steps_metadata[1:],
93+
srun_cmd=cmd_gen.gen_srun_command(job.test_run),
94+
test_cmd=" ".join(cmd_gen.generate_test_command({}, {}, job.test_run)),
95+
job_root=job.test_run.output_path.absolute(),
8096
)
8197

82-
with open(jb.test_run.output_path / "slurm-job.toml", "w") as job_file:
98+
def store_job_metadata(self, job: SlurmJob):
99+
system = cast(SlurmSystem, self.system)
100+
steps_metadata = [self._mock_job_metadata()] if self.mode == "dry-run" else system.get_job_status(job)
101+
slurm_job_file, job_meta = self._get_job_metadata(job, steps_metadata)
102+
103+
logging.debug(f"Storing job metadata for job {job.id} to {slurm_job_file}")
104+
with slurm_job_file.open("w") as job_file:
83105
toml.dump(job_meta.model_dump(), job_file)

src/cloudai/systems/slurm/slurm_system.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
from __future__ import annotations
18+
1719
import logging
1820
import re
1921
from pathlib import Path
@@ -25,20 +27,10 @@
2527
from cloudai.models.scenario import ReportConfig, parse_reports_spec
2628
from cloudai.util import CommandShell
2729

30+
from .slurm_metadata import SlurmStepMetadata
2831
from .slurm_node import SlurmNode, SlurmNodeState
2932

3033

31-
class SlurmJobMetadata(BaseModel):
32-
"""Represents the metadata of a Slurm job."""
33-
34-
job_id: int
35-
job_name: str
36-
job_state: str
37-
elapsed_time_sec: int
38-
srun_cmd: str
39-
test_cmd: str
40-
41-
4234
class DataRepositoryConfig(BaseModel):
4335
"""Configuration for a data repository."""
4436

@@ -308,9 +300,12 @@ def is_job_completed(self, job: BaseJob, retry_threshold: int = 3) -> bool:
308300

309301
return False
310302

311-
def get_job_status(self, job: BaseJob, retry_threshold: int = 3) -> Optional[tuple[str, str, str]]:
303+
def get_job_status(self, job: BaseJob, retry_threshold: int = 3) -> list[SlurmStepMetadata]:
312304
retry_count = 0
313-
command = f"sacct -j {job.id} --format=JobName,State,ElapsedRAW --delimiter=',' -p --noheader"
305+
command = (
306+
f"sacct -j {job.id} --format=JobID,JobName,State,ExitCode,Start,End,ElapsedRAW,SubmitLine "
307+
"--delimiter='|' -p --noheader"
308+
)
314309

315310
while retry_count < retry_threshold:
316311
stdout, stderr = self.cmd_shell.execute(command).communicate()
@@ -326,12 +321,9 @@ def get_job_status(self, job: BaseJob, retry_threshold: int = 3) -> Optional[tup
326321
logging.error(error_message)
327322
raise RuntimeError(error_message)
328323

329-
# sacct produces a single line per job, first line is for overall job
330-
job_states = stdout.strip().splitlines()[0]
331-
data = job_states.split(",")
332-
return data[0], data[1], data[2]
324+
return [SlurmStepMetadata.from_sacct_single_line(line, "|") for line in stdout.splitlines()]
333325

334-
return None
326+
return []
335327

336328
def kill(self, job: BaseJob) -> None:
337329
"""

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def create_autospec_dataclass(dataclass: type) -> Mock:
3232
def cleanup():
3333
yield
3434

35-
for f in {"env_vars.sh", "hostfile.txt"}:
35+
for f in {"env_vars.sh", "hostfile.txt", "start_server_wrapper.sh"}:
3636
(Path.cwd() / f).unlink(missing_ok=True)
3737

3838

tests/test_single_sbatch_runner.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def nccl_tr(slurm_system: SlurmSystem) -> TestRun:
4343
),
4444
num_nodes=2,
4545
nodes=[],
46-
output_path=slurm_system.output_path,
46+
output_path=slurm_system.output_path / "nccl_test",
4747
)
4848
tr.test.test_template.command_gen_strategy = NcclTestSlurmCommandGenStrategy(slurm_system, {})
4949
return tr
@@ -61,7 +61,7 @@ def sleep_tr(slurm_system: SlurmSystem) -> TestRun:
6161
),
6262
num_nodes=1,
6363
nodes=[],
64-
output_path=slurm_system.output_path,
64+
output_path=slurm_system.output_path / "sleep_test",
6565
)
6666
tr.test.test_template.command_gen_strategy = SleepSlurmCommandGenStrategy(slurm_system, {})
6767
tr.output_path.mkdir(parents=True, exist_ok=True)
@@ -480,8 +480,12 @@ def test_store_job_metadata(nccl_tr: TestRun, slurm_system: SlurmSystem) -> None
480480
assert out_file.exists()
481481
sjm = SlurmJobMetadata.model_validate(toml.load(out_file))
482482
assert sjm.job_id == 1
483+
assert sjm.is_single_sbatch is True
483484
assert sjm.srun_cmd == "n/a for single sbatch run"
484485
assert sjm.test_cmd == "n/a for single sbatch run"
486+
assert sjm.job_root == runner.scenario_root.absolute()
487+
488+
assert sjm == SlurmJobMetadata.model_validate(toml.loads(toml.dumps(sjm.model_dump())))
485489

486490

487491
def test_pre_test(nccl_tr: TestRun, sleep_tr: TestRun, slurm_system: SlurmSystem) -> None:

0 commit comments

Comments
 (0)