Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import base64
import datetime
import os
import shlex
import sys
from dataclasses import dataclass
from typing import Optional

Expand Down Expand Up @@ -134,6 +136,16 @@ def _set_nested_optionally_overriding(
"""


def _get_launcher_command() -> str:
"""Return the launcher CLI command as a single string.

sys.argv already starts at the entrypoint (e.g. ``nemo-evaluator-launcher run ...``),
so shell-level ``export KEY=VALUE`` prefixes are never included.
"""
argv = [os.path.basename(sys.argv[0])] + sys.argv[1:]
return shlex.join(argv)


def get_eval_factory_config(
cfg: DictConfig,
user_task_config: DictConfig,
Expand Down Expand Up @@ -252,6 +264,11 @@ def get_eval_factory_command(
["metadata", "versioning"],
get_versions(),
)
_set_nested_optionally_overriding(
merged_nemo_evaluator_config,
["metadata", "launcher_command"],
_get_launcher_command(),
)

# Now get the pre_cmd/post_cmd either from `evaluation.*` or task-level. Note the
# order -- task level wins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
copy_artifacts,
extract_accuracy_metrics,
get_model_id,
get_results_dir_from_job_data,
load_benchmark_info,
load_config_from_metadata,
load_launcher_command_from_metadata,
)


Expand Down Expand Up @@ -268,10 +270,14 @@ def _get_jobs_in_dir(self, invocation_dir: Path) -> Dict[str, JobData]:
job_id = generate_job_id(invocation_id, task_index)

# Build job data dict
hostname = config.get("execution", {}).get("hostname")
job_data_dict = {
"output_dir": str(job_subdir),
"in_database": False,
}
if hostname:
job_data_dict["hostname"] = hostname
job_data_dict["remote_rundir_path"] = str(job_subdir)

# Create JobData object
job_data = JobData(
Expand Down Expand Up @@ -331,6 +337,8 @@ def prepare_data_for_export(self, job_data: JobData) -> DataForExport | None:
harness, task = load_benchmark_info(artifacts_dir)
container = job_data.data.get("eval_image", None)
model_id = get_model_id(artifacts_dir)
launcher_command = load_launcher_command_from_metadata(artifacts_dir)
results_dir = get_results_dir_from_job_data(job_data.data)

return DataForExport(
artifacts_dir=artifacts_dir,
Expand All @@ -346,6 +354,8 @@ def prepare_data_for_export(self, job_data: JobData) -> DataForExport | None:
job_id=job_data.job_id,
timestamp=job_data.timestamp,
job_data=job_data.data,
launcher_command=launcher_command,
results_dir=results_dir,
)
except Exception as e:
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _json_upsert(
"scores": data.metrics,
"timestamp": data.timestamp,
"executor": data.executor,
"results_dir": data.results_dir,
}
if benchmark_name not in merged_data["benchmarks"]:
# new benchmark
Expand Down Expand Up @@ -174,6 +175,7 @@ def _csv_upsert(
"Container",
"Invocation ID",
"Job ID",
"Results Dir",
]

skipped_jobs = []
Expand Down Expand Up @@ -214,6 +216,7 @@ def _csv_upsert(
"Container": data.container,
"Invocation ID": data.invocation_id,
"Job ID": data.job_id,
"Results Dir": data.results_dir,
}
# Add metrics (as columns)
for k, v in (data.metrics or {}).items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ def _export_one_job(self, job_data: DataForExport) -> str:
"executor": job_data.executor,
"timestamp": str(job_data.timestamp),
}
if job_data.launcher_command is not None:
all_params["launcher_command"] = job_data.launcher_command
if job_data.results_dir is not None:
all_params["results_dir"] = job_data.results_dir

# Add extra metadata if provided
if self.config.extra_metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class DataForExport:
timestamp: float

job_data: Optional[Dict[str, Any]] = None
launcher_command: Optional[str] = None
results_dir: Optional[str] = None


def get_relevant_artifacts() -> List[str]:
Expand Down Expand Up @@ -132,6 +134,7 @@ class MetricConflictError(Exception):
RESULTS_FILE = "results.yml"
METADATA_FILE = "metadata.yaml"
METADATA_CONFIG_KEY = "launcher_resolved_config"
METADATA_LAUNCHER_COMMAND_KEY = "launcher_command"
NE_CONFIG_FILE = "run_config.yml"


Expand Down Expand Up @@ -191,6 +194,36 @@ def load_config_from_metadata(artifacts_dir: Path) -> Dict[str, Any]:
return metadata[METADATA_CONFIG_KEY]


def get_results_dir_from_job_data(job_data_dict: Dict[str, Any]) -> Optional[str]:
"""Return the output directory string from job data.

For Slurm jobs returns ``hostname:remote_rundir_path`` (scp-ready).
For local jobs returns the local ``output_dir`` path.
"""
remote_path = job_data_dict.get("remote_rundir_path")
if remote_path:
hostname = job_data_dict.get("hostname")
if hostname:
return f"{hostname}:{remote_path}"
return str(remote_path)
local_path = job_data_dict.get("output_dir")
if local_path:
return str(local_path)
return None


def load_launcher_command_from_metadata(artifacts_dir: Path) -> Optional[str]:
"""Load the launcher CLI command from artifacts/metadata.yaml if present."""
try:
with open(artifacts_dir / METADATA_FILE, "r", encoding="utf-8") as f:
metadata = yaml.safe_load(f)
if not isinstance(metadata, dict):
return None
return metadata.get(METADATA_LAUNCHER_COMMAND_KEY)
except Exception:
return None


def load_benchmark_info(artifacts_dir: Path) -> Tuple[str, str]:
"""Load benchmark info from ne config file."""
config_file = artifacts_dir / NE_CONFIG_FILE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ def _create_wandb_run(
run_config["harness"] = data[0].harness
run_config["benchmark"] = data[0].task

if data[0].launcher_command is not None:
run_config["launcher_command"] = data[0].launcher_command
if data[0].results_dir is not None:
run_config["results_dir"] = data[0].results_dir

run_config.update(self.config.extra_metadata)
run_args["config"] = run_config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,5 @@ def test_data_for_export_optional_fields(self, tmp_path):
assert data.logs_dir is None
assert data.harness is None
assert data.job_data is None
assert data.launcher_command is None
assert data.results_dir is None
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
"""Tests for local functionality."""

import json
from pathlib import Path
from typing import List, Tuple
from unittest.mock import patch
Expand Down Expand Up @@ -78,6 +79,13 @@ def test_export_with_format_json(tmp_path: Path, mock_execdb, prepare_local_job)
job_export_dir = output_dir / inv / j1.job_id
assert job_export_dir.exists()

# results_dir is present in output even when None (no data dict keys set)
out = json.loads(
(output_dir / inv / j1.job_id / "processed_results.json").read_text()
)
entry = out["benchmarks"]["simple_evals.mmlu"]["models"]["test-model"][0]
assert "results_dir" in entry


def test_export_with_format_csv(tmp_path: Path, mock_execdb, prepare_local_job):
"""Test export with CSV format."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_export_with_update_existing(
assert result.successful_jobs == [jd.job_id]

def test_log_config_params_flattens_config(
self, monkeypatch, mlflow_fake, make_mlflow_job
self, monkeypatch, mlflow_fake, make_mlflow_job, tmp_path
):
"""Test that log_config_params=True flattens the config into MLflow params."""
_ML, _RunCtx = mlflow_fake
Expand All @@ -154,6 +154,9 @@ def test_log_config_params_flattens_config(
},
}
jd = make_mlflow_job("test004", config=config)
(tmp_path / "test004" / "test004.0" / "artifacts" / "metadata.yaml").write_text(
"launcher_command: nel run test004\n"
)

logged_params = {}
monkeypatch.setattr(
Expand All @@ -170,6 +173,8 @@ def test_log_config_params_flattens_config(
assert logged_params["config.deployment.tensor_parallel_size"] == "8"
assert "config.deployment.model" in logged_params
assert logged_params["config.deployment.model"] == "test-model"
assert logged_params.get("launcher_command") == "nel run test004"
assert "results_dir" in logged_params

def test_log_config_params_with_max_depth(
self, monkeypatch, mlflow_fake, make_mlflow_job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ def test_per_task_ok(self, monkeypatch, wandb_fake, tmp_path):
job_id="i1.0",
timestamp=0.0,
job_data={},
launcher_command="nel run i1",
results_dir="/tmp/results/i1",
)
successful, failed, skipped = WandBExporter(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def test_get_eval_factory_command_basic(monkeypatch):
assert merged["config"]["type"] == "my_task"
assert merged["config"]["output_dir"] == "/results"

# Metadata is populated, including resolved launcher config and versioning
# Metadata is populated, including resolved launcher config, versioning, and launcher_command
assert "metadata" in merged and isinstance(merged["metadata"], dict)
assert merged["metadata"]["versioning"] == "TEST_VER"
assert "launcher_command" in merged["metadata"]
# Validate a few salient fields from the resolved config got embedded
resolved = merged["metadata"]["launcher_resolved_config"]
assert resolved["deployment"]["type"] == "none"
Expand Down
Loading