diff --git a/moe_pretraining/nemo/Dockerfile b/moe_pretraining/nemo/Dockerfile new file mode 100644 index 000000000..77cc6f113 --- /dev/null +++ b/moe_pretraining/nemo/Dockerfile @@ -0,0 +1,72 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + + +ARG FROM_IMAGE_NAME=nvcr.io/nvidia/pytorch:25.12-py3 + +FROM ${FROM_IMAGE_NAME} + +ENV PIP_CONSTRAINT="" + +RUN git config --global user.name "a" && \ + git config --global user.email "a" + +## 0. Pytorch Checkpoint size patch +WORKDIR /workspace/ + + +RUN pip install numcodecs==0.16.3 +RUN pip install nvidia-mathdx==25.1.1 + + +## 1. NeMo-Run +ARG NEMORUN_REVISION=v0.5.0 +ENV CUSTOM_NEMORUN_REVISION ${NEMORUN_REVISION} +RUN git clone https://github.com/NVIDIA/NeMo-Run.git && \ + cd NeMo-Run && \ + git checkout ${NEMORUN_REVISION} && \ + echo NEMORUN_COMMIT_HASH=$(git rev-parse HEAD) && \ + pip install -e . + +## 2. Megatron-bridge and megatron-core +ARG MBRIDGE_REVISION=main +RUN pip uninstall -y megatron-core && \ + git clone https://github.com/NVIDIA-NeMo/Megatron-Bridge.git /workspace/Megatron-Bridge && \ + cd /workspace/Megatron-Bridge && \ + git checkout ${MBRIDGE_REVISION} && \ + git submodule update --init --recursive && \ + echo MBRIDGE_COMMIT_HASH=$(git rev-parse HEAD) && \ + echo $(git rev-parse HEAD) > /MBRIDGE_COMMIT_HASH.env && \ + cd /workspace/Megatron-Bridge/3rdparty/Megatron-LM && \ + echo MCORE_COMMIT_HASH=$(git rev-parse HEAD) && \ + echo $(git rev-parse HEAD) > /MCORE_COMMIT_HASH.env && \ + pip install -e . + +ENV PYTHONPATH "/workspace/Megatron-Bridge/src:/workspace/Megatron-Bridge/3rdparty/Megatron-LM:${PYTHONPATH}" + +## 3. Benchmark dependencies +RUN pip uninstall transformers -y +COPY requirements.txt . +RUN pip install --no-cache-dir -U -r requirements.txt + + +WORKDIR /workspace/code + +COPY . . diff --git a/moe_pretraining/nemo/callback.py b/moe_pretraining/nemo/callback.py new file mode 100644 index 000000000..6a7478f0f --- /dev/null +++ b/moe_pretraining/nemo/callback.py @@ -0,0 +1,606 @@ +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from functools import wraps +from typing import Any, Callable, List, Optional, Protocol, Union + +import torch +from megatron.bridge.training.config import ConfigContainer +from megatron.bridge.training.forward_step_func_types import ForwardStepCallable +from megatron.bridge.training.state import GlobalState +from megatron.core import parallel_state as mpu +from megatron.core.full_cuda_graph import FullCudaGraphWrapper +from megatron.core.inference.communication_utils import broadcast_from_last_pipeline_stage +from megatron.core.num_microbatches_calculator import get_num_microbatches +from megatron.core.optimizer import MegatronOptimizer +from megatron.core.optimizer_param_scheduler import OptimizerParamScheduler +from megatron.core.pipeline_parallel import get_forward_backward_func +from megatron.core.process_groups_config import ProcessGroupCollection +from megatron.core.rerun_state_machine import RerunDataIterator +from megatron.core.transformer import MegatronModule +from mlperf_common.frameworks.pyt import PyTCommunicationHandler +from mlperf_common.logging import MLLoggerWrapper + + +logger = logging.getLogger(__name__) + + +def get_last_pp_rank(): + """Check if current rank is the last pipeline parallel rank.""" + is_last_pp = mpu.is_pipeline_last_stage(ignore_virtual=True) + is_first_dp = mpu.get_data_parallel_rank() == 0 + is_first_tp = mpu.get_tensor_model_parallel_rank() == 0 + is_first_cp = mpu.get_context_parallel_rank() == 0 + return is_last_pp and is_first_dp and is_first_tp and is_first_cp + + +def broadcast_loss(loss_reduced): + """Broadcast loss from last pipeline stage to all ranks.""" + if "lm loss" in loss_reduced: + loss_tensor = loss_reduced["lm loss"] + else: + loss_tensor = None + + loss_synced = broadcast_from_last_pipeline_stage( + size=[1], + dtype=torch.float32, + tensor=loss_tensor.unsqueeze(0) if loss_tensor is not None else None, + ) + + return loss_synced.item() + + +mllogger = MLLoggerWrapper(PyTCommunicationHandler()) + + +class DeltaTimer: + """Timer for measuring time deltas.""" + + def __init__(self): + self.reset() + + def reset(self): + self.start_time = time.perf_counter() + return self.start_time + + def get_delta(self): + prev_time = self.start_time + return self.reset() - prev_time + + +# ============================================================================= +# Callback Protocol and Manager +# ============================================================================= + +class TrainingCallback(Protocol): + """Protocol defining all available callback hooks.""" + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called once at the start of training.""" + pass + + def on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called once at the end of training.""" + pass + + def on_train_batch_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called before each training step.""" + pass + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + """Called after each training step.""" + pass + + def on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ): + """Called before validation begins.""" + pass + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + """Called after validation ends.""" + pass + + +class CallbackManager: + """Manages callbacks and provides function wrappers.""" + + def __init__(self): + self.callbacks: List[TrainingCallback] = [] + + def register(self, callback: TrainingCallback) -> None: + self.callbacks.append(callback) + + def trigger_on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_start"): + callback.on_train_start( + global_state, + forward_step_func, + model, + optimizer, + scheduler, + ) + + def trigger_on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + for callback in self.callbacks: + if hasattr(callback, "on_train_end"): + callback.on_train_end(global_state, forward_step_func, model, optimizer, scheduler) + + def trigger_on_train_batch_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_batch_start"): + callback.on_train_batch_start(global_state, forward_step_func, model, optimizer, scheduler) + + def trigger_on_train_batch_end( + self, + global_state: GlobalState, + iteration: int, + loss_dict: dict[str, torch.Tensor], + optimizer: MegatronOptimizer, + model: list[MegatronModule], + ret: Any, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_batch_end"): + callback.on_train_batch_end(global_state, iteration, loss_dict, optimizer, model, ret) + + def trigger_on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_validation_start"): + callback.on_validation_start(global_state, forward_step_func, model) + + def trigger_on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_validation_end"): + callback.on_validation_end(global_state, forward_step_func, model, ret) + + def wrap_train(self, train_func: Callable) -> Callable: + """Wrap the train() function to add on_train_start and on_train_end hooks.""" + + @wraps(train_func) + def wrapped_train( + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + train_data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + valid_data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + global_state: GlobalState, + checkpointing_context: dict[str, Any], + pg_collection: ProcessGroupCollection, + process_non_loss_data_func: Optional[Callable] = None, + non_loss_data_func: Optional[Callable] = None, + ) -> None: + self.trigger_on_train_start(global_state, forward_step_func, model, optimizer, scheduler) + train_func( + forward_step_func, + model, + optimizer, + scheduler, + train_data_iterator, + valid_data_iterator, + global_state, + checkpointing_context, + pg_collection, + process_non_loss_data_func, + non_loss_data_func, + ) + self.trigger_on_train_end(global_state, forward_step_func, model, optimizer, scheduler) + + return wrapped_train + + def wrap_train_step(self, train_step_func: Callable) -> Callable: + """Wrap the train_step() function to add batch-level hooks.""" + + @wraps(train_step_func) + def wrapped_train_step( + forward_step_func: ForwardStepCallable, + data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + global_state: GlobalState, + pg_collection: ProcessGroupCollection, + forward_backward_func: Callable, + ): + self.trigger_on_train_batch_start(global_state, forward_step_func, model, optimizer, scheduler) + ret = train_step_func( + forward_step_func, + data_iterator, + model, + optimizer, + scheduler, + global_state, + pg_collection, + forward_backward_func, + ) + self.trigger_on_train_batch_end(global_state, forward_step_func, model, optimizer, scheduler, ret) + return ret + + return wrapped_train_step + + def wrap_evaluate(self, evaluate_func: Callable) -> Callable: + """Wrap the evaluate() function to add validation hooks.""" + + @wraps(evaluate_func) + def wrapped_evaluate( + state: GlobalState, + forward_step_func: ForwardStepCallable, + data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + model: list[MegatronModule], + process_non_loss_data_func: Optional[Callable], + config: ConfigContainer, + verbose: bool = False, + non_loss_data_func: Optional[Callable] = None, + ): + self.trigger_on_validation_start(state, forward_step_func, model) + ret = evaluate_func( + state, + forward_step_func, + data_iterator, + model, + process_non_loss_data_func, + config, + verbose, + non_loss_data_func, + ) + self.trigger_on_validation_end(state, forward_step_func, model, ret) + + return ret + + return wrapped_evaluate + + +_callback_manager = CallbackManager() + + +def register_callback(callback: TrainingCallback) -> None: + """Register a callback globally.""" + _callback_manager.register(callback) + + +def install_callbacks() -> None: + """Install callbacks by wrapping the train, train_step, and evaluate functions.""" + import sys + + from megatron.bridge.training import eval as eval_module + from megatron.bridge.training import train as train_module + + train_module.train = _callback_manager.wrap_train(train_module.train) + train_module.train_step = _callback_manager.wrap_train_step(train_module.train_step) + eval_module.evaluate = _callback_manager.wrap_evaluate(eval_module.evaluate) + + if "megatron.bridge.training.pretrain" in sys.modules: + pretrain_module = sys.modules["megatron.bridge.training.pretrain"] + pretrain_module.train = train_module.train + + +class MLPerfLoggingCallback: + """MLPerf logging callback for compliance logging.""" + + def __init__(self, cfg): + self.cfg = cfg + self.global_batch_size = self.cfg.model.global_batch_size + self.train_block_started = True + self.train_current_block = 0 + self.previous_step = 0 + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + mllogger.log_init_stop_run_start() + global_state.should_stop = False + mllogger.start( + mllogger.constants.BLOCK_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: self.cfg.trainer.val_check_interval * self.global_batch_size, + "step": global_state.train_state.step, + }, + ) + self.timer = DeltaTimer() + + def on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + if self.train_block_started: + self._end_train_block(global_state) + + FullCudaGraphWrapper.cuda_graph = None + + def on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ): + """Log validation start.""" + if hasattr(global_state, "warmup") and global_state.warmup: + return + self._log_train_step_time(global_state) + if self.train_block_started: + self._end_train_block(global_state) + + mllogger.start( + mllogger.constants.EVAL_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: self._get_samples_count(global_state), + "step": self._get_step(global_state), + }, + ) + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + if hasattr(global_state, "warmup") and global_state.warmup: + return + self._log_custom_timedelta("validation_time", self._get_step(global_state)) + + samples_count = self._get_samples_count(global_state) + if self.cfg.model.pipeline_model_parallel_size > 1: + loss = broadcast_loss(ret[0]) + else: + loss = ret[0]["lm loss"].item() + + mllogger.event( + key=mllogger.constants.EVAL_ACCURACY, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count}, + value=loss, + ) + mllogger.end( + mllogger.constants.EVAL_STOP, + metadata={ + mllogger.constants.SAMPLES_COUNT: samples_count, + "step": self._get_step(global_state), + }, + ) + + if loss < self.cfg.custom.target_log_ppl: + global_state.should_stop = True + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "success"}, + ) + elif global_state.train_state.step >= self.cfg.trainer.max_steps: + global_state.should_stop = True + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "aborted"}, + ) + if not os.environ.get("VAL_CHECK_INTERVAL"): + global_state.cfg.train.eval_interval = self.cfg.default_val_check_interval + + if not global_state.should_stop: + self._start_train_block(global_state) + else: + global_state.train_state.step = self.cfg.trainer.max_steps + 1 + global_state.train_state.do_valid = False + global_state.train_state.do_test = False + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + step = global_state.train_state.step + 1 + last_step = step >= self.cfg.trainer.max_steps + eval_after_this_step = step % global_state.cfg.train.eval_interval == 0 + if last_step and not eval_after_this_step: + samples_count = self._get_samples_count(global_state) + self._end_train_block(global_state) + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "aborted"}, + ) + self.train_block_started = False + global_state.should_stop = True + global_state.train_state.do_valid = False + global_state.train_state.do_test = False + + def _start_train_block(self, global_state: GlobalState) -> None: + self.train_block_started = True + mllogger.start( + mllogger.constants.BLOCK_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: global_state.cfg.train.eval_interval * self.global_batch_size, + "step": self._get_step(global_state), + }, + ) + + def _end_train_block(self, global_state: GlobalState) -> None: + mllogger.end( + mllogger.constants.BLOCK_STOP, + metadata={ + mllogger.constants.SAMPLES_COUNT: global_state.cfg.train.eval_interval * self.global_batch_size, + "step": self._get_step(global_state), + }, + ) + self.train_block_started = False + + def _log_train_step_time( + self, + global_state: GlobalState, + ) -> None: + delta_t = self.timer.get_delta() + global_step = self._get_step(global_state) + delta_step = global_step - self.previous_step + mllogger.event( + key="tracked_stats", + metadata={mllogger.constants.SAMPLES_COUNT: delta_step * self.global_batch_size}, + value={ + "train_step_time": delta_t / (delta_step + 1e-8), + }, + ) + + self.previous_step = global_step + + def _log_custom_timedelta(self, value_key, step: int = 0): + mllogger.event( + key="tracked_stats", + metadata={"step": step}, + value={value_key: self.timer.get_delta()}, + ) + + def _get_step(self, global_state): + return global_state.train_state.step + + def _get_samples_count(self, global_state): + return self._get_step(global_state) * self.global_batch_size + + +class DeltaTimingCallback: + """Callback for tracking training step timing.""" + + def __init__(self, cfg): + self.t0 = 0 + self.total_train_step_time = [0, 0] + self.global_batch_size = cfg.model.global_batch_size + self.log_every_n_steps = cfg.trainer.log_every_n_steps + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + self.t0 = time.time() + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + t1 = time.time() + d = t1 - self.t0 + self.total_train_step_time[0] += d + self.total_train_step_time[1] += 1 + self.t0 = t1 + + if global_state.train_state.step % self.log_every_n_steps == 0 and get_last_pp_rank(): + mllogger.event( + key="tracked_stats", + metadata={mllogger.constants.SAMPLES_COUNT: self.global_batch_size * global_state.train_state.step}, + value={ + "train_step_time": d, + "reduced_train_loss": ret[0]["lm loss"].item(), + }, + unique=False, + ) + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + """Reset timer after validation to avoid including validation time in first train step.""" + self.t0 = time.time() diff --git a/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh new file mode 100644 index 000000000..e340cce45 --- /dev/null +++ b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh @@ -0,0 +1,69 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SLURM: Username on a cluster +export USER="" +# Slurm: account for job submission +export ACCOUNT="" +# Slurm: partition for job submission +export PARTITION="" +# Slurm: job time limit, defaults to 8 hours +export TIME="02:00:00" +# Slurm: --nodes arguments +export NNODES=64 +# Slurm: --gpus_per_node and --ntasks_per_node argument +export GPUS_PER_NODE=4 +# Slurm: max job retries for transient job failures +export MAX_RETRIES=1 + +# Folder mapping: +# Output directory that holds logs, any path that you like. +export LOG_DIR="" +# Image / container path, either local cache file or remote URL +export IMAGE="" +# Dataset: C4 dataset location that contains the dataset after preprocessing +export DATA_DIR="" +# Model checkpoint path +export MODEL_CKPT="" + +# Training Configs: +# Dataloader: Global batch size +export GBS=1024 +# Dataloader: Micro batch size +export MBS=1 +export MAX_LR="2e-4" +export WARMUP_STEPS=256 +export EVAL_CHECK_INTERVAL=10 # every $EVAL_CHECK_INTERVAL steps +export EVAL_BATCHES=1 # evaluate on $EVAL_BATCHES * $GBS samples + + +export TENSOR_PARALLEL_SIZE=1 +export PIPELINE_PARALLEL_SIZE=4 +export CONTEXT_PARALLEL_SIZE=1 +export EXPERT_PARALLEL_SIZE=64 +export EXPERT_TENSOR_PARALLEL_SIZE=1 +export RECOMPUTE_MODULES="mlp,moe" +export CUDA_GRAPH_IMPLEMENTATION="transformer_engine" +export CUDA_GRAPH_SCOPE="attn" + +# Experiment manager: Number of experiments to launch +export NEXP=1 +# Experiment manager: how many consecutive jobs we want for each experiment +export NPAR=1 +# Experiment manager: provides seeds to the launched experiments, use space as delimiter, such as "1234 1235 1236" +# The training script will discard all excessive seeds, and generate seeds if given seeds < NEXP. +# To preserve randomness, we recommend not to set this value so that each time seeds can be randomly generated. + + +export DGXSYSTEM=$(basename $(readlink -f ${BASH_SOURCE[0]}) | sed 's/^config_//' | sed 's/\.sh$//' ) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py new file mode 100644 index 000000000..a34a8017e --- /dev/null +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -0,0 +1,291 @@ +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os + +from megatron.bridge.recipes.deepseek.deepseek_v3 import deepseek_v3_pretrain_config, set_deepseek_v3_pipeline_model_parallel_layout +from megatron.bridge.training.config import GPTDatasetConfig, ConfigContainer +from megatron.bridge.training.gpt_step import forward_step +from megatron.bridge.training.pretrain import pretrain +from megatron.bridge.training.tokenizers.config import TokenizerConfig + +from callback import ( + MLPerfLoggingCallback, + DeltaTimingCallback, + mllogger, + install_callbacks, + register_callback, +) + + +def get_rank(): + """Get the current process rank.""" + import torch.distributed as dist + if dist.is_available() and dist.is_initialized(): + return dist.get_rank() + return 0 + + +def init_logging(): + """Initialize logging configuration.""" + import logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + +def get_data(seq_length: int, seed): + """Configure dataset paths and parameters.""" + dataset_path = os.getenv("PREPROCESSED_PATH", "/preproc_data") + val_test_path = f"{dataset_path}/c4-validation-91205-samples.en_text_document" + train_datasets = [f"{dataset_path}/c4-train.en_{idx}_text_document" for idx in [6]] + train_datasets_weights = [50] * len(train_datasets) + + data_paths = [ + (train_datasets, train_datasets_weights), + ([val_test_path], None), + ([val_test_path], None) + ] + + return GPTDatasetConfig( + dataloader_type="single", + blend_per_split=data_paths, + sequence_length=seq_length, + random_seed=seed, + num_workers=8, + path_to_cache="/npy_index", + reset_position_ids=False, + reset_attention_mask=False, + eod_mask_loss=False, + ) + + +def log_hyperparams(args, mbridge_config: ConfigContainer): + """Log hyperparameters for MLPerf compliance.""" + bmark = mllogger.constants.DEEPSEEK_V3 + opt_lr_decay_steps = args.max_steps - args.warmup_steps + mllogger.mlperf_submission_log(bmark) + + # Compute gradient accumulation steps + tp = args.tensor_parallel_size + pp = args.pipeline_parallel_size + cp = args.context_parallel_size + dp = (args.nodes * args.gpus_per_node) // (tp * pp * cp) + mini_batch_size = args.gbs // dp + grad_accumulation_steps = mini_batch_size // args.mbs + + logging_configs = { + mllogger.constants.SEED: args.seed, + mllogger.constants.GLOBAL_BATCH_SIZE: args.gbs, + mllogger.constants.GRADIENT_ACCUMULATION_STEPS: grad_accumulation_steps, + mllogger.constants.MAX_SEQUENCE_LENGTH: args.sequence_length, + mllogger.constants.EVAL_SAMPLES: args.gbs * args.eval_batches, + mllogger.constants.TRAIN_SAMPLES: 1574207408, + mllogger.constants.INIT_CHECKPOINT_STEP: 0, + mllogger.constants.OPT_NAME: mllogger.constants.ADAMW, + mllogger.constants.OPT_BASE_LR: mbridge_config.optimizer.lr, + mllogger.constants.OPT_ADAMW_BETA_1: mbridge_config.optimizer.adam_beta1, + mllogger.constants.OPT_ADAMW_BETA_2: mbridge_config.optimizer.adam_beta2, + mllogger.constants.OPT_ADAMW_EPSILON: mbridge_config.optimizer.adam_eps, + mllogger.constants.OPT_ADAMW_WEIGHT_DECAY: mbridge_config.optimizer.weight_decay, + mllogger.constants.OPT_GRADIENT_CLIP_NORM: mbridge_config.optimizer.clip_grad, + mllogger.constants.OPT_END_LR: args.min_lr, + mllogger.constants.OPT_LR_WARMUP_STEPS: mbridge_config.scheduler.lr_warmup_iters, + mllogger.constants.OPT_LR_DECAY_STEPS: opt_lr_decay_steps, + mllogger.constants.MAX_STEPS: args.max_steps, + mllogger.constants.OPT_LR_DECAY_SCHEDULE: "cosine with linear warmup", + "target_accuracy": args.target_log_ppl, + } + + for key, value in logging_configs.items(): + mllogger.event(key=key, value=value) + + +def get_tokenizer_config(): + return TokenizerConfig( + tokenizer_type="HuggingFaceTokenizer", + tokenizer_model="/tokenizer", + hf_tokenizer_kwargs={"use_fast": True}, + ) + + +def create_config(args): + """Create the training configuration from arguments.""" + config = deepseek_v3_pretrain_config() + + # Model parallelism configuration (hardcoded for DeepSeek V3) + model_cfg = config.model + model_cfg.pipeline_model_parallel_size = args.pipeline_parallel_size + model_cfg.virtual_pipeline_model_parallel_size = args.virtual_pipeline_parallel_size + set_deepseek_v3_pipeline_model_parallel_layout(model_cfg) + + model_cfg.tensor_model_parallel_size = args.tensor_parallel_size + model_cfg.context_parallel_size = args.context_parallel_size + model_cfg.expert_model_parallel_size = args.expert_model_parallel_size + model_cfg.expert_tensor_parallel_size = args.expert_tensor_parallel_size + model_cfg.sequence_parallel = args.tensor_parallel_size > 1 + model_cfg.seq_length = args.sequence_length + model_cfg.recompute_modules = args.recompute_modules.split(",") if args.recompute_modules else [] + if args.cuda_graph_implementation: + model_cfg.cuda_graph_implementation = args.cuda_graph_implementation + if args.cuda_graph_scope: + model_cfg.cuda_graph_scope = args.cuda_graph_scope + # MoE params + model_cfg.moe_token_dispatcher_type = args.moe_token_dispatcher_type + model_cfg.moe_grouped_gemm = args.moe_grouped_gemm + model_cfg.moe_permute_fusion = args.moe_permute_fusion + model_cfg.moe_router_fusion = args.moe_router_fusion + model_cfg.moe_router_force_load_balancing = False + + + + # Training configuration + train_cfg = config.train + train_cfg.micro_batch_size = args.mbs + train_cfg.global_batch_size = args.gbs + train_cfg.train_iters = args.max_steps + + # Eval configuration + train_cfg.eval_interval = args.eval_check_interval + train_cfg.eval_iters = args.eval_batches + + # Optimizer configuration + optimizer_cfg = config.optimizer + optimizer_cfg.lr = args.lr + optimizer_cfg.min_lr = args.min_lr + + # Scheduler configuration + scheduler_cfg = config.scheduler + scheduler_cfg.lr_warmup_iters = args.warmup_steps + + # RNG configuration + rng_cfg = config.rng + rng_cfg.seed = args.seed + + # Dataset configuration + config.dataset = get_data( + seq_length=args.sequence_length, + seed=args.seed, + ) + + # Tokenizer configuration + config.tokenizer = get_tokenizer_config() + + # Checkpoint configuration + checkpoint_cfg = config.checkpoint + checkpoint_cfg.load = "/checkpoint" + checkpoint_cfg.load_optim = False + checkpoint_cfg.load_rng = False + checkpoint_cfg.exit_on_missing_checkpoint = True + + # Logger configuration + logger_cfg = config.logger + logger_cfg.log_interval = 1 + + return config + + +def get_parser() -> argparse.ArgumentParser: + """Create argument parser for DeepSeek V3 pretraining.""" + parser = argparse.ArgumentParser(description="DeepSeek V3 Pretraining") + + # Cluster arguments (used for logging) + cluster_group = parser.add_argument_group("Cluster arguments") + cluster_group.add_argument("--nodes", type=int, required=True, help="Number of nodes") + cluster_group.add_argument("--gpus_per_node", type=int, required=True, help="Number of GPUs per node") + + # Model arguments + model_group = parser.add_argument_group("Model arguments") + model_group.add_argument("--tensor_parallel_size", type=int, required=True, help="Tensor parallel size") + model_group.add_argument("--pipeline_parallel_size", type=int, required=True, help="Pipeline parallel size") + model_group.add_argument("--virtual_pipeline_parallel_size", type=int, default=None, help="Virtual pipeline parallel size") + model_group.add_argument("--context_parallel_size", type=int, required=True, help="Context parallel size") + model_group.add_argument("--expert_model_parallel_size", type=int, required=True, help="Expert model parallel size") + model_group.add_argument("--expert_tensor_parallel_size", type=int, required=True, help="Expert tensor parallel size") + model_group.add_argument("--recompute_modules", type=str, help="Recompute modules") + model_group.add_argument("--cuda_graph_implementation", type=str, help="CUDA graph implementation") + model_group.add_argument("--cuda_graph_scope", type=str, help="CUDA graph scope") + model_group.add_argument("--moe_token_dispatcher_type", type=str, default="alltoall", help="MoE token dispatcher type") + model_group.add_argument("--moe_grouped_gemm", type=bool, default=True, help="MoE grouped GEMM") + model_group.add_argument("--moe_permute_fusion", type=bool, default=False, help="MoE permute fusion") + model_group.add_argument("--moe_router_fusion", type=bool, default=False, help="MoE router fusion") + model_group.add_argument("--sequence_length", type=int, default=4096, help="Sequence length") + + # Training arguments + training_group = parser.add_argument_group("Training arguments") + training_group.add_argument("--gbs", type=int, default=1024, help="Global batch size") + training_group.add_argument("--mbs", type=int, default=1, help="Micro batch size") + training_group.add_argument("--lr", type=float, default=2e-4, help="Initial learning rate after warmup") + training_group.add_argument("--min_lr", type=float, default=5e-6, help="Minimum learning rate") + training_group.add_argument("--max_steps", type=int, default=1000, help="Maximum number of steps") + training_group.add_argument("--warmup_steps", type=int, default=0, help="Number of steps for LR warmup") + training_group.add_argument("--seed", type=int, default=1234, help="Random seed") + training_group.add_argument("--eval_check_interval", type=int, default=10, help="Evaluate every N steps") + training_group.add_argument("--eval_batches", type=int, default=1, help="Evaluate N batches") + training_group.add_argument("--target_log_ppl", type=float, default=1.0, help="Target log perplexity") + + return parser + + +class ArgsConfig: + """Configuration object that wraps args to match expected interface.""" + def __init__(self, args): + self.args = args + + # Create nested config structure matching what callbacks expect + self.model = type('ModelConfig', (), { + 'global_batch_size': args.gbs, + 'micro_batch_size': args.mbs, + 'pipeline_model_parallel_size': args.pipeline_parallel_size, + 'encoder_seq_length': args.sequence_length, + 'seed': args.seed, + })() + + self.trainer = type('TrainerConfig', (), { + 'max_steps': args.max_steps, + 'val_check_interval': args.eval_check_interval, + 'limit_val_batches': args.eval_batches, + 'log_every_n_steps': 1, + 'warmup_train_steps': 0, + })() + + self.custom = type('CustomConfig', (), { + 'target_log_ppl': args.target_log_ppl, + })() + + self.default_val_check_interval = self.trainer.val_check_interval + + +def main(): + """Main entry point for DeepSeek V3 pretraining.""" + args = get_parser().parse_args() + init_logging() + config = create_config(args) + cfg = ArgsConfig(args) + + if get_rank() == 0: + log_hyperparams(args, config) + mllogger.start(key=mllogger.constants.INIT_START) + + register_callback(DeltaTimingCallback(cfg)) + register_callback(MLPerfLoggingCallback(cfg)) + install_callbacks() + + pretrain(config, forward_step_func=forward_step) + + +if __name__ == "__main__": + main() diff --git a/moe_pretraining/nemo/requirements.txt b/moe_pretraining/nemo/requirements.txt new file mode 100644 index 000000000..4762e3dbf --- /dev/null +++ b/moe_pretraining/nemo/requirements.txt @@ -0,0 +1,16 @@ +git+https://github.com/denys-fridman/logging.git@dfridman/deepseek-v3 # TODO(dfridman): revert to main repo once merged +git+https://github.com/NVIDIA/mlperf-common.git@b86d175a05849d650a8ff69c1e2c37b9f4e61d51 +transformers==4.57.1 +blobfile==3.0.0 +prettytable==3.11.0 +# WAR for MultiStorageClient +multi-storage-client<0.26.0 + +# TODO(dfridman): below are needed by megatron-bridge. Dependencies need to be fixed on MBridge side: +# some of the below packages are in `uv.lock`, but not in `pyproject.toml` - so we can't easily pip install them. +wandb==0.23.0 +nvidia-modelopt[all]==0.39.0 +nvidia-resiliency-ext==0.5.0 +megatron-energon==6.0.1 +bitstring==4.3.1 +filetype==1.2.0 diff --git a/moe_pretraining/nemo/run_deepseek.py b/moe_pretraining/nemo/run_deepseek.py new file mode 100644 index 000000000..d6c0bda84 --- /dev/null +++ b/moe_pretraining/nemo/run_deepseek.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NemoRun launcher script for DeepSeek V3 pretraining. + +This script uses NemoRun's Slurm executor to launch pretrain_deepseek_v3_671b.py +on a Slurm cluster. + +Example usage: + python run_deepseek.py \ + --account my_account \ + --partition my_partition \ + --nodes 64 \ + --gpus_per_node 4 \ + --time_limit 04:00:00 \ + --container_image nvcr.io/nvidia/nemo:dev \ + --log_dir /path/to/logs \ + --mounts /data:/data,/checkpoints:/checkpoints \ + -- \ + --tensor_parallel_size 1 \ + --pipeline_parallel_size 4 \ + --context_parallel_size 1 \ + --expert_model_parallel_size 64 \ + --expert_tensor_parallel_size 1 \ + --gbs 2048 \ + --max_steps 1000 +""" + +import argparse +import logging +import os +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional + +import nemo_run as run +from nemo_run.config import get_nemorun_home, set_nemorun_home +from nemo_run.core.execution.launcher import SlurmTemplate + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SCRIPT_DIR = Path(__file__).parent.resolve() +PRETRAIN_SCRIPT = "pretrain_deepseek_v3_671b.py" + +# Inline bash template for Slurm +INLINE_TEMPLATE = r""" +#!/usr/bin/env bash +set -euo pipefail + +bash -c '{{ pre_cmds }} {{ command }}' +""" + +# Default environment variables for performance +PERF_ENV_VARS = { + "TORCH_NCCL_AVOID_RECORD_STREAMS": "1", + "TRANSFORMERS_OFFLINE": "0", + "TOKENIZERS_PARALLELISM": "False", + "NCCL_NVLS_ENABLE": "0", + "NVTE_NORM_FWD_USE_CUDNN": "1", + "NVTE_NORM_BWD_USE_CUDNN": "1", + "TORCH_NCCL_HIGH_PRIORITY": "1", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", +} + + +def slurm_executor( + gpu: str, + account: str, + partition: str, + log_dir: str, + nodes: int, + num_gpus_per_node: int, + time_limit: str = "04:00:00", + container_image: str = "nvcr.io/nvidia/nemo:dev", + custom_mounts: List[str] = None, + custom_env_vars: Dict[str, str] = None, + custom_srun_args: List[str] = None, + gres: Optional[str] = None, +) -> run.SlurmExecutor: + """ + Create a Slurm executor for DeepSeek V3 pretraining. + + Args: + gpu: GPU type (e.g., "gb200", "gb300", "h100") + account: Slurm account + partition: Slurm partition + log_dir: Directory for logs + nodes: Number of nodes + num_gpus_per_node: GPUs per node + time_limit: Job time limit + container_image: Container image to use + custom_mounts: Additional container mounts + custom_env_vars: Additional environment variables + custom_srun_args: Additional srun arguments + gres: GPU resource specification + """ + custom_mounts = custom_mounts or [] + custom_env_vars = custom_env_vars or {} + custom_srun_args = custom_srun_args or [] + custom_bash_cmds = [] + + mounts = [] + srun_args = custom_srun_args.copy() + [ + "--mpi=pmix", + "--no-container-mount-home", + ] + + if log_dir is not None: + set_nemorun_home(log_dir) + else: + if os.environ.get("NEMORUN_HOME") is None: + logger.warning( + f"Logs will be written to {get_nemorun_home()}. " + "Set NEMORUN_HOME or use --log_dir to change this." + ) + + env_vars = PERF_ENV_VARS.copy() + + # GPU-specific settings + if gpu.lower() in ["gb200", "gb300"]: + env_vars["NCCL_NET_GDR_LEVEL"] = "PHB" + env_vars["NCCL_NET_GDR_C2C"] = "1" + + env_vars.update(custom_env_vars) + mounts.extend(custom_mounts) + + # Mount the script directory + mounts.append(f"{SCRIPT_DIR}:{SCRIPT_DIR}") + + # Compute segment for GB200/GB300 + segment = None + if num_gpus_per_node == 4: + if nodes <= 18: + segment = nodes + else: + for segment_candidate in range(18, 0, -1): + if nodes % segment_candidate == 0: + segment = segment_candidate + break + + # NUMA binding + numa_divisor = 2 if gpu.lower() in ["gb200", "gb300"] else 4 + numa_cmd = f"numactl --cpunodebind=$((SLURM_LOCALID/{numa_divisor})) --membind=$((SLURM_LOCALID/{numa_divisor}))" + custom_bash_cmds.append(numa_cmd) + + launcher = SlurmTemplate( + template_inline=INLINE_TEMPLATE, + template_vars={"pre_cmds": " ; ".join(custom_bash_cmds)}, + ) + + executor = run.SlurmExecutor( + account=account, + partition=partition, + tunnel=run.LocalTunnel(job_dir=os.path.join(get_nemorun_home(), "experiments")), + nodes=nodes, + ntasks_per_node=num_gpus_per_node, + gres=gres, + container_image=container_image, + container_mounts=mounts, + env_vars=env_vars, + srun_args=srun_args, + time=time_limit, + mem="0", + exclusive=True, + packager=run.GitArchivePackager(), + segment=segment, + launcher=launcher, + ) + + return executor + + +def get_parser() -> argparse.ArgumentParser: + """Create argument parser for the launcher.""" + parser = argparse.ArgumentParser( + description="NemoRun launcher for DeepSeek V3 pretraining", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + # Slurm configuration + slurm_group = parser.add_argument_group("Slurm configuration") + slurm_group.add_argument("--account", type=str, required=True, help="Slurm account") + slurm_group.add_argument("--partition", type=str, required=True, help="Slurm partition") + slurm_group.add_argument("--nodes", type=int, required=True, help="Number of nodes") + slurm_group.add_argument("--gpus_per_node", type=int, default=4, help="GPUs per node") + slurm_group.add_argument("--time_limit", type=str, default="04:00:00", help="Job time limit") + slurm_group.add_argument("--gpu", type=str, default="gb300", help="GPU type (gb200, gb300, h100)") + slurm_group.add_argument("--gres", type=str, default=None, help="GPU resource specification") + + # Container configuration + container_group = parser.add_argument_group("Container configuration") + container_group.add_argument( + "--container_image", + type=str, + default="nvcr.io/nvidia/nemo:dev", + help="Container image" + ) + container_group.add_argument( + "--mounts", + type=str, + default="", + help="Container mounts (comma-separated, format: src:dst)" + ) + container_group.add_argument( + "--envvars", + type=str, + default="", + help="Environment variables (comma-separated, format: KEY=VALUE)" + ) + + # Logging configuration + log_group = parser.add_argument_group("Logging configuration") + log_group.add_argument("--log_dir", type=str, default=None, help="Log directory") + log_group.add_argument("--exp_name", type=str, default=None, help="Experiment name") + + # Execution control + exec_group = parser.add_argument_group("Execution control") + exec_group.add_argument("--dryrun", action="store_true", help="Print command without executing") + exec_group.add_argument("--detach", action="store_true", help="Detach after submitting job") + + return parser + + +def parse_mounts(mounts_str: str) -> List[str]: + """Parse comma-separated mounts string.""" + if not mounts_str: + return [] + return [m.strip() for m in mounts_str.split(",") if m.strip()] + + +def parse_envvars(envvars_str: str) -> Dict[str, str]: + """Parse comma-separated environment variables string.""" + if not envvars_str: + return {} + result = {} + for item in envvars_str.split(","): + if "=" in item: + key, value = item.split("=", 1) + result[key.strip()] = value.strip() + return result + + +def main(): + """Main entry point.""" + # Split arguments: before '--' are for launcher, after are for pretrain script + if "--" in sys.argv: + split_idx = sys.argv.index("--") + launcher_args = sys.argv[1:split_idx] + pretrain_args = sys.argv[split_idx + 1:] + else: + launcher_args = sys.argv[1:] + pretrain_args = [] + + parser = get_parser() + args = parser.parse_args(launcher_args) + + # Parse mounts and env vars + custom_mounts = parse_mounts(args.mounts) + custom_env_vars = parse_envvars(args.envvars) + + # Create executor + executor = slurm_executor( + gpu=args.gpu, + account=args.account, + partition=args.partition, + log_dir=args.log_dir, + nodes=args.nodes, + num_gpus_per_node=args.gpus_per_node, + time_limit=args.time_limit, + container_image=args.container_image, + custom_mounts=custom_mounts, + custom_env_vars=custom_env_vars, + gres=args.gres, + ) + + # Build the pretrain script path + pretrain_script_path = SCRIPT_DIR / PRETRAIN_SCRIPT + if not pretrain_script_path.is_file(): + logger.error(f"Pretrain script not found: {pretrain_script_path}") + sys.exit(1) + + # Create NemoRun script + nemorun_script = run.Script( + path=str(pretrain_script_path), + entrypoint="python", + env={"PYTHONPATH": f"{SCRIPT_DIR}:$PYTHONPATH"}, + args=pretrain_args, + ) + + # Generate experiment name + exp_name = args.exp_name or f"deepseek_v3_{args.nodes}x{args.gpus_per_node}gpu" + + logger.info(f"Launching: {' '.join(nemorun_script.to_command())}") + + # Run the experiment + run.run( + nemorun_script, + executor=executor, + dryrun=args.dryrun, + detach=args.detach, + name=exp_name, + ) + + if args.dryrun: + logger.info("Dryrun complete. No job submitted.") + elif args.detach: + logger.info(f"Job submitted. Experiment name: {exp_name}") + else: + logger.info("Job completed.") + + +if __name__ == "__main__": + main() diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh new file mode 100644 index 000000000..66e67bf28 --- /dev/null +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -0,0 +1,154 @@ +#!/bin/bash + +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cd $(dirname $0) + +set -e + +# Vars without defaults +# Slurm settings +: "${ACCOUNT:?ACCOUNT not set}" +: "${PARTITION:?PARTITION not set}" + +# Job settings +: "${LOG_DIR:?LOG_DIR not set}" +: "${IMAGE:?IMAGE not set}" + +# Dataset settings +: "${DATA_DIR:?DATA_DIR not set}" +: "${MODEL_CKPT:?MODEL_CKPT not set}" + +# Vars with defaults +# Slurm settings +: "${TIME:="04:00:00"}" +: "${NNODES:=64}" +: "${GPUS_PER_NODE:=4}" +: "${GPU:="gb300"}" + + +# Model settings +: "${GBS:=2048}" +: "${MBS:=1}" +: "${TENSOR_PARALLEL_SIZE:=1}" +: "${PIPELINE_PARALLEL_SIZE:=4}" +: "${VIRTUAL_PIPELINE_PARALLEL_SIZE:=""}" +: "${CONTEXT_PARALLEL_SIZE:=1}" +: "${EXPERT_PARALLEL_SIZE:=64}" +: "${EXPERT_TENSOR_PARALLEL_SIZE:=1}" +: "${SEQUENCE_LENGTH:=4096}" +: "${RECOMPUTE_MODULES:="mlp,moe_act"}" +: "${CUDA_GRAPH_IMPLEMENTATION:="transformer_engine"}" +: "${CUDA_GRAPH_SCOPE:="attn"}" +: "${MOE_TOKEN_DISPATCHER_TYPE:="alltoall"}" +: "${MOE_GROUPED_GEMM:=True}" +: "${MOE_PERMUTE_FUSION:=False}" +: "${MOE_ROUTER_FUSION:=False}" + +# Training settings +: "${MAX_STEPS:=1000}" +: "${WARMUP_STEPS:=0}" +: "${LR:="2e-4"}" +: "${MIN_LR:="5e-6"}" +: "${SEED:=1234}" + +# Eval settings +: "${EVAL_CHECK_INTERVAL:=10}" +: "${EVAL_BATCHES:=1}" + +# Experiment settings +: "${EXP_NAME:=""}" +: "${TARGET:="1.0"}" +: "${DRYRUN:=0}" +: "${DETACH:=1}" + +# Build mounts +MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${DATA_DIR}:/preproc_data,${MODEL_CKPT}:/checkpoint,${DATA_DIR}/tokenizer:/tokenizer" + +TMP_NPY_INDEX="$LOG_DIR/npy_index" +mkdir -p "$TMP_NPY_INDEX" +MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" + +# Build launcher arguments +LAUNCHER_ARGS="--account $ACCOUNT --partition $PARTITION" +LAUNCHER_ARGS="$LAUNCHER_ARGS --nodes $NNODES --gpus_per_node $GPUS_PER_NODE" +LAUNCHER_ARGS="$LAUNCHER_ARGS --gpu $GPU" +LAUNCHER_ARGS="$LAUNCHER_ARGS --time_limit $TIME" +LAUNCHER_ARGS="$LAUNCHER_ARGS --container_image $IMAGE" +LAUNCHER_ARGS="$LAUNCHER_ARGS --log_dir $LOG_DIR" +LAUNCHER_ARGS="$LAUNCHER_ARGS --mounts $MOUNTS" + +if [ -n "$EXP_NAME" ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --exp_name $EXP_NAME" +fi + +if [ "$DRYRUN" -gt 0 ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --dryrun" +fi + +if [ "$DETACH" -gt 0 ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --detach" +fi + +# Build pretrain arguments +PRETRAIN_ARGS="--nodes $NNODES --gpus_per_node $GPUS_PER_NODE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --tensor_parallel_size $TENSOR_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --pipeline_parallel_size $PIPELINE_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --context_parallel_size $CONTEXT_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --expert_model_parallel_size $EXPERT_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --expert_tensor_parallel_size $EXPERT_TENSOR_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --sequence_length $SEQUENCE_LENGTH" +PRETRAIN_ARGS="$PRETRAIN_ARGS --gbs $GBS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --mbs $MBS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --lr $LR" +PRETRAIN_ARGS="$PRETRAIN_ARGS --min_lr $MIN_LR" +PRETRAIN_ARGS="$PRETRAIN_ARGS --max_steps $MAX_STEPS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --warmup_steps $WARMUP_STEPS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --seed $SEED" +PRETRAIN_ARGS="$PRETRAIN_ARGS --eval_check_interval $EVAL_CHECK_INTERVAL" +PRETRAIN_ARGS="$PRETRAIN_ARGS --eval_batches $EVAL_BATCHES" +PRETRAIN_ARGS="$PRETRAIN_ARGS --target_log_ppl $TARGET" + +if [ -n "$VIRTUAL_PIPELINE_PARALLEL_SIZE" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --virtual_pipeline_parallel_size $VIRTUAL_PIPELINE_PARALLEL_SIZE" +fi + +if [ -n "$RECOMPUTE_MODULES" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --recompute_modules $RECOMPUTE_MODULES" +fi + +if [ -n "$CUDA_GRAPH_IMPLEMENTATION" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --cuda_graph_implementation $CUDA_GRAPH_IMPLEMENTATION" +fi + +if [ -n "$CUDA_GRAPH_SCOPE" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --cuda_graph_scope $CUDA_GRAPH_SCOPE" +fi + +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_token_dispatcher_type $MOE_TOKEN_DISPATCHER_TYPE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_grouped_gemm $MOE_GROUPED_GEMM" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_permute_fusion $MOE_PERMUTE_FUSION" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_router_fusion $MOE_ROUTER_FUSION" + +# Allows MLLogger objects to be constructed locally +if [ ! -d /mlperf-outputs ]; then mkdir -p /mlperf-outputs 2>/dev/null || true; fi + +set -x + +python3 run_deepseek.py \ + $LAUNCHER_ARGS \ + -- \ + $PRETRAIN_ARGS