Skip to content

Commit af2517d

Browse files
committed
Add custom benchmark and telemetry support
1 parent 22d46ba commit af2517d

15 files changed

Lines changed: 865 additions & 64 deletions

src/srtctl/benchmarks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
# Import runners to trigger registration
77
from srtctl.benchmarks import (
8+
custom,
89
gpqa,
910
gsm8k,
1011
longbenchv2,
@@ -28,6 +29,7 @@
2829
"list_benchmarks",
2930
"register_benchmark",
3031
# Runners
32+
"custom",
3133
"sa_bench",
3234
"sglang_bench",
3335
"mmlu",

src/srtctl/benchmarks/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ def build_command(
5959
"""
6060
...
6161

62+
def get_container_image(self, config: SrtConfig, runtime: RuntimeContext) -> str | Path:
63+
"""Get the container image used for the benchmark process."""
64+
return runtime.container_image
65+
66+
def get_container_mounts(self, config: SrtConfig, runtime: RuntimeContext) -> dict[Path, Path]:
67+
"""Get mounts used for the benchmark process."""
68+
return runtime.container_mounts
69+
70+
def get_environment(self, config: SrtConfig, runtime: RuntimeContext) -> dict[str, str]:
71+
"""Get benchmark-specific environment variables."""
72+
return {}
73+
6274

6375
class AIPerfBenchmarkRunner(BenchmarkRunner):
6476
"""Base class for AIPerf-driven benchmarks.

src/srtctl/benchmarks/custom.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Custom benchmark runner."""
5+
6+
from __future__ import annotations
7+
8+
from pathlib import Path
9+
10+
from srtctl.benchmarks.base import BenchmarkRunner, register_benchmark
11+
from srtctl.core.runtime import RuntimeContext
12+
from srtctl.core.schema import SrtConfig
13+
14+
15+
@register_benchmark("custom")
16+
class CustomBenchmarkRunner(BenchmarkRunner):
17+
"""Run an arbitrary benchmark command inside a container."""
18+
19+
@property
20+
def name(self) -> str:
21+
return "Custom"
22+
23+
@property
24+
def script_path(self) -> str:
25+
return "<custom command>"
26+
27+
def validate_config(self, config: SrtConfig) -> list[str]:
28+
if config.benchmark.command:
29+
return []
30+
return ["benchmark.command is required for benchmark.type=custom"]
31+
32+
def build_command(self, config: SrtConfig, runtime: RuntimeContext) -> list[str]:
33+
del runtime
34+
assert config.benchmark.command is not None
35+
return ["bash", "-lc", config.benchmark.command]
36+
37+
def get_container_image(self, config: SrtConfig, runtime: RuntimeContext) -> str | Path:
38+
return config.benchmark.container_image or runtime.container_image
39+
40+
def get_environment(self, config: SrtConfig, runtime: RuntimeContext) -> dict[str, str]:
41+
del runtime
42+
return dict(config.benchmark.env)

src/srtctl/cli/do_sweep.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
from dataclasses import dataclass
2222
from pathlib import Path
2323

24-
from srtctl.cli.mixins import BenchmarkStageMixin, FrontendStageMixin, PostProcessStageMixin, WorkerStageMixin
24+
from srtctl.cli.mixins import (
25+
BenchmarkStageMixin,
26+
FrontendStageMixin,
27+
PostProcessStageMixin,
28+
TelemetryStageMixin,
29+
WorkerStageMixin,
30+
)
2531
from srtctl.core.config import load_config
2632
from srtctl.core.health import wait_for_port
2733
from srtctl.core.lockfile import write_lockfile
@@ -42,7 +48,13 @@
4248

4349

4450
@dataclass
45-
class SweepOrchestrator(WorkerStageMixin, FrontendStageMixin, BenchmarkStageMixin, PostProcessStageMixin):
51+
class SweepOrchestrator(
52+
WorkerStageMixin,
53+
FrontendStageMixin,
54+
TelemetryStageMixin,
55+
BenchmarkStageMixin,
56+
PostProcessStageMixin,
57+
):
4658
"""Main orchestrator for benchmark sweeps.
4759
4860
Usage:
@@ -225,6 +237,10 @@ def run(self) -> int:
225237
for proc in frontend_procs:
226238
registry.add_process(proc)
227239

240+
telemetry_procs = self.start_telemetry()
241+
for proc in telemetry_procs:
242+
registry.add_process(proc)
243+
228244
self._print_connection_info()
229245

230246
# Stage 4: Benchmark (status reported AFTER health check passes)

src/srtctl/cli/mixins/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
from srtctl.cli.mixins.benchmark_stage import BenchmarkStageMixin
1515
from srtctl.cli.mixins.frontend_stage import FrontendStageMixin
1616
from srtctl.cli.mixins.postprocess_stage import PostProcessStageMixin
17+
from srtctl.cli.mixins.telemetry_stage import TelemetryStageMixin
1718
from srtctl.cli.mixins.worker_stage import WorkerStageMixin
1819

1920
__all__ = [
2021
"WorkerStageMixin",
2122
"FrontendStageMixin",
23+
"TelemetryStageMixin",
2224
"BenchmarkStageMixin",
2325
"PostProcessStageMixin",
2426
]

src/srtctl/cli/mixins/benchmark_stage.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ def _run_benchmark_script(
178178

179179
cmd = runner.build_command(self.config, self.runtime)
180180
env_to_set = self._get_benchmark_env(runner)
181+
env_to_set.update(runner.get_environment(self.config, self.runtime))
182+
container_image = runner.get_container_image(self.config, self.runtime)
183+
container_mounts = runner.get_container_mounts(self.config, self.runtime)
181184

182185
logger.info("Script: %s", runner.script_path)
183186
logger.info("Command: %s", shlex.join(cmd))
@@ -187,8 +190,8 @@ def _run_benchmark_script(
187190
command=cmd,
188191
nodelist=[self.runtime.nodes.head],
189192
output=str(log_file),
190-
container_image=str(self.runtime.container_image),
191-
container_mounts=self.runtime.container_mounts,
193+
container_image=str(container_image),
194+
container_mounts=container_mounts,
192195
env_to_set=env_to_set,
193196
)
194197

src/srtctl/cli/mixins/postprocess_stage.py

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838

3939
logger = logging.getLogger(__name__)
4040

41+
POSTPROCESS_PARSE_FAILED_EXIT = 20
42+
POSTPROCESS_UPLOAD_FAILED_EXIT = 11
43+
4144

4245
class PostProcessStageMixin:
4346
"""Mixin for post-process stage after benchmark completion.
@@ -254,34 +257,7 @@ def _run_postprocess_container(self) -> tuple[Path | None, str | None]:
254257
endpoint_flag = f"--endpoint-url {s3_config.endpoint_url}" if s3_config.endpoint_url else ""
255258

256259
# Build the post-processing script
257-
script = f"""
258-
set -e
259-
260-
# Install uv, srtlog, and awscli
261-
echo "Installing uv..."
262-
pip install uv
263-
264-
echo "Installing srtlog and awscli..."
265-
cd /tmp
266-
git clone --depth 1 https://github.com/ishandhanani/srtlog.git
267-
uv pip install --system ./srtlog awscli
268-
269-
# Run srtlog to generate parquet
270-
echo "Running srtlog parse..."
271-
cd /logs
272-
srtlog parse .
273-
274-
# Upload entire log directory to S3
275-
echo "Uploading entire log directory to S3..."
276-
aws s3 sync /logs {s3_url} {endpoint_flag}
277-
echo "Upload complete: {s3_url}"
278-
279-
# Report what was uploaded
280-
echo ""
281-
echo "Uploaded files:"
282-
find /logs -type f | wc -l
283-
echo "files total"
284-
"""
260+
script = self._build_postprocess_script(s3_url, endpoint_flag)
285261

286262
# Build env for AWS credentials
287263
env: dict[str, str] = {}
@@ -301,7 +277,7 @@ def _run_postprocess_container(self) -> tuple[Path | None, str | None]:
301277
nodelist=[self.runtime.nodes.head],
302278
output=str(self.runtime.log_dir / "postprocess.log"),
303279
container_image="python:3.11",
304-
container_mounts={str(self.runtime.log_dir): "/logs"},
280+
container_mounts={self.runtime.log_dir: Path("/logs")},
305281
env_to_set=env,
306282
)
307283
proc.wait(timeout=600) # 10 min timeout for install + parse + full sync
@@ -311,6 +287,9 @@ def _run_postprocess_container(self) -> tuple[Path | None, str | None]:
311287
if proc.returncode == 0:
312288
logger.info("Post-processing complete: %s", s3_url)
313289
return parquet_path if parquet_path.exists() else None, s3_url
290+
if proc.returncode == POSTPROCESS_PARSE_FAILED_EXIT:
291+
logger.warning("srtlog parsing failed, but raw logs were still uploaded to %s", s3_url)
292+
return parquet_path if parquet_path.exists() else None, s3_url
314293
else:
315294
logger.warning("Post-processing failed (exit code: %s)", proc.returncode)
316295
return parquet_path if parquet_path.exists() else None, None
@@ -323,6 +302,58 @@ def _run_postprocess_container(self) -> tuple[Path | None, str | None]:
323302
logger.warning("Post-processing container failed: %s", e)
324303
return None, None
325304

305+
def _build_postprocess_script(self, s3_url: str, endpoint_flag: str) -> str:
306+
"""Build the post-processing shell script.
307+
308+
Upload is always attempted if awscli installs successfully. Parsing is
309+
best-effort so raw logs survive parser/tooling failures.
310+
"""
311+
return f"""
312+
set -u
313+
set -o pipefail
314+
315+
PARSE_STATUS=0
316+
UPLOAD_STATUS=0
317+
318+
echo "Installing uv and awscli..."
319+
if ! pip install uv awscli; then
320+
echo "Failed to install uv/awscli"
321+
exit {POSTPROCESS_UPLOAD_FAILED_EXIT}
322+
fi
323+
324+
echo "Installing srtlog..."
325+
if cd /tmp && git clone --depth 1 https://github.com/ishandhanani/srtlog.git && uv pip install --system ./srtlog; then
326+
echo "Running srtlog parse..."
327+
cd /logs
328+
srtlog parse . || PARSE_STATUS=$?
329+
else
330+
echo "Failed to install srtlog; continuing with raw log upload"
331+
PARSE_STATUS=1
332+
fi
333+
334+
cat > /logs/postprocess-status.json <<EOF
335+
{{"parse_status": $PARSE_STATUS, "s3_url": "{s3_url}"}}
336+
EOF
337+
338+
echo "Uploading entire log directory to S3..."
339+
aws s3 sync /logs {s3_url} {endpoint_flag} || UPLOAD_STATUS=$?
340+
341+
if [ "$UPLOAD_STATUS" -ne 0 ]; then
342+
echo "Upload failed with status $UPLOAD_STATUS"
343+
exit {POSTPROCESS_UPLOAD_FAILED_EXIT}
344+
fi
345+
346+
echo "Upload complete: {s3_url}"
347+
echo ""
348+
echo "Uploaded files:"
349+
find /logs -type f | wc -l
350+
echo "files total"
351+
352+
if [ "$PARSE_STATUS" -ne 0 ]; then
353+
exit {POSTPROCESS_PARSE_FAILED_EXIT}
354+
fi
355+
"""
356+
326357
def _report_metrics(self, benchmark_results: dict[str, Any] | None, s3_url: str | None, exit_code: int) -> None:
327358
"""Report metrics to dashboard via status API.
328359
@@ -443,7 +474,7 @@ def _run_ai_analysis(self, config: AIAnalysisConfig) -> None:
443474
nodelist=[self.runtime.nodes.head],
444475
output=str(analysis_log),
445476
container_image="python:3.11",
446-
container_mounts={str(self.runtime.log_dir): "/logs"},
477+
container_mounts={self.runtime.log_dir: Path("/logs")},
447478
env_to_set=env_to_set,
448479
)
449480

0 commit comments

Comments
 (0)