From 18410e3eb8bf706b68f7ffb5124571d8c2f88b90 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 14 Jan 2026 12:20:22 +0100 Subject: [PATCH 01/16] [WIP] Initial DeepSeek reference implementation --- moe_pretraining/nemo/Dockerfile | 72 +++ moe_pretraining/nemo/callback.py | 609 ++++++++++++++++++ .../nemo/config_GB300_64x4x32xtp1pp4cp1.sh | 82 +++ .../nemo/pretrain_deepseek_v3_671b.py | 315 +++++++++ moe_pretraining/nemo/requirements.txt | 16 + moe_pretraining/nemo/run_deepseek_v3_671b.sh | 134 ++++ 6 files changed, 1228 insertions(+) create mode 100644 moe_pretraining/nemo/Dockerfile create mode 100644 moe_pretraining/nemo/callback.py create mode 100644 moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh create mode 100644 moe_pretraining/nemo/pretrain_deepseek_v3_671b.py create mode 100644 moe_pretraining/nemo/requirements.txt create mode 100644 moe_pretraining/nemo/run_deepseek_v3_671b.sh diff --git a/moe_pretraining/nemo/Dockerfile b/moe_pretraining/nemo/Dockerfile new file mode 100644 index 000000000..77cc6f113 --- /dev/null +++ b/moe_pretraining/nemo/Dockerfile @@ -0,0 +1,72 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + + +ARG FROM_IMAGE_NAME=nvcr.io/nvidia/pytorch:25.12-py3 + +FROM ${FROM_IMAGE_NAME} + +ENV PIP_CONSTRAINT="" + +RUN git config --global user.name "a" && \ + git config --global user.email "a" + +## 0. Pytorch Checkpoint size patch +WORKDIR /workspace/ + + +RUN pip install numcodecs==0.16.3 +RUN pip install nvidia-mathdx==25.1.1 + + +## 1. NeMo-Run +ARG NEMORUN_REVISION=v0.5.0 +ENV CUSTOM_NEMORUN_REVISION ${NEMORUN_REVISION} +RUN git clone https://github.com/NVIDIA/NeMo-Run.git && \ + cd NeMo-Run && \ + git checkout ${NEMORUN_REVISION} && \ + echo NEMORUN_COMMIT_HASH=$(git rev-parse HEAD) && \ + pip install -e . + +## 2. Megatron-bridge and megatron-core +ARG MBRIDGE_REVISION=main +RUN pip uninstall -y megatron-core && \ + git clone https://github.com/NVIDIA-NeMo/Megatron-Bridge.git /workspace/Megatron-Bridge && \ + cd /workspace/Megatron-Bridge && \ + git checkout ${MBRIDGE_REVISION} && \ + git submodule update --init --recursive && \ + echo MBRIDGE_COMMIT_HASH=$(git rev-parse HEAD) && \ + echo $(git rev-parse HEAD) > /MBRIDGE_COMMIT_HASH.env && \ + cd /workspace/Megatron-Bridge/3rdparty/Megatron-LM && \ + echo MCORE_COMMIT_HASH=$(git rev-parse HEAD) && \ + echo $(git rev-parse HEAD) > /MCORE_COMMIT_HASH.env && \ + pip install -e . + +ENV PYTHONPATH "/workspace/Megatron-Bridge/src:/workspace/Megatron-Bridge/3rdparty/Megatron-LM:${PYTHONPATH}" + +## 3. Benchmark dependencies +RUN pip uninstall transformers -y +COPY requirements.txt . +RUN pip install --no-cache-dir -U -r requirements.txt + + +WORKDIR /workspace/code + +COPY . . diff --git a/moe_pretraining/nemo/callback.py b/moe_pretraining/nemo/callback.py new file mode 100644 index 000000000..f1d9e7abd --- /dev/null +++ b/moe_pretraining/nemo/callback.py @@ -0,0 +1,609 @@ +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from functools import wraps +from typing import Any, Callable, List, Optional, Protocol, Union + +import torch +from megatron.bridge.training.config import ConfigContainer +from megatron.bridge.training.forward_step_func_types import ForwardStepCallable +from megatron.bridge.training.state import GlobalState +from megatron.core import parallel_state as mpu +from megatron.core.full_cuda_graph import FullCudaGraphWrapper +from megatron.core.inference.communication_utils import broadcast_from_last_pipeline_stage +from megatron.core.num_microbatches_calculator import get_num_microbatches +from megatron.core.optimizer import MegatronOptimizer +from megatron.core.optimizer_param_scheduler import OptimizerParamScheduler +from megatron.core.pipeline_parallel import get_forward_backward_func +from megatron.core.process_groups_config import ProcessGroupCollection +from megatron.core.rerun_state_machine import RerunDataIterator +from megatron.core.transformer import MegatronModule +from mlperf_common.frameworks.pyt import PyTCommunicationHandler +from mlperf_common.logging import MLLoggerWrapper + + +logger = logging.getLogger(__name__) + + +def get_last_pp_rank(): + """Check if current rank is the last pipeline parallel rank.""" + is_last_pp = mpu.is_pipeline_last_stage(ignore_virtual=True) + is_first_dp = mpu.get_data_parallel_rank() == 0 + is_first_tp = mpu.get_tensor_model_parallel_rank() == 0 + is_first_cp = mpu.get_context_parallel_rank() == 0 + return is_last_pp and is_first_dp and is_first_tp and is_first_cp + + +def broadcast_loss(loss_reduced): + """Broadcast loss from last pipeline stage to all ranks.""" + if "lm loss" in loss_reduced: + loss_tensor = loss_reduced["lm loss"] + else: + loss_tensor = None + + loss_synced = broadcast_from_last_pipeline_stage( + size=[1], + dtype=torch.float32, + tensor=loss_tensor.unsqueeze(0) if loss_tensor is not None else None, + ) + + return loss_synced.item() + + +mllogger = MLLoggerWrapper(PyTCommunicationHandler()) + + +class DeltaTimer: + """Timer for measuring time deltas.""" + + def __init__(self): + self.reset() + + def reset(self): + self.start_time = time.perf_counter() + return self.start_time + + def get_delta(self): + prev_time = self.start_time + return self.reset() - prev_time + + +# ============================================================================= +# Callback Protocol and Manager +# ============================================================================= + +class TrainingCallback(Protocol): + """Protocol defining all available callback hooks.""" + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called once at the start of training.""" + pass + + def on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called once at the end of training.""" + pass + + def on_train_batch_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + """Called before each training step.""" + pass + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + """Called after each training step.""" + pass + + def on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ): + """Called before validation begins.""" + pass + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + """Called after validation ends.""" + pass + + +class CallbackManager: + """Manages callbacks and provides function wrappers.""" + + def __init__(self): + self.callbacks: List[TrainingCallback] = [] + + def register(self, callback: TrainingCallback) -> None: + self.callbacks.append(callback) + + def trigger_on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_start"): + callback.on_train_start( + global_state, + forward_step_func, + model, + optimizer, + scheduler, + ) + + def trigger_on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + for callback in self.callbacks: + if hasattr(callback, "on_train_end"): + callback.on_train_end(global_state, forward_step_func, model, optimizer, scheduler) + + def trigger_on_train_batch_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_batch_start"): + callback.on_train_batch_start(global_state, forward_step_func, model, optimizer, scheduler) + + def trigger_on_train_batch_end( + self, + global_state: GlobalState, + iteration: int, + loss_dict: dict[str, torch.Tensor], + optimizer: MegatronOptimizer, + model: list[MegatronModule], + ret: Any, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_train_batch_end"): + callback.on_train_batch_end(global_state, iteration, loss_dict, optimizer, model, ret) + + def trigger_on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_validation_start"): + callback.on_validation_start(global_state, forward_step_func, model) + + def trigger_on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ) -> None: + for callback in self.callbacks: + if hasattr(callback, "on_validation_end"): + callback.on_validation_end(global_state, forward_step_func, model, ret) + + def wrap_train(self, train_func: Callable) -> Callable: + """Wrap the train() function to add on_train_start and on_train_end hooks.""" + + @wraps(train_func) + def wrapped_train( + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + train_data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + valid_data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + global_state: GlobalState, + checkpointing_context: dict[str, Any], + pg_collection: ProcessGroupCollection, + process_non_loss_data_func: Optional[Callable] = None, + non_loss_data_func: Optional[Callable] = None, + ) -> None: + self.trigger_on_train_start(global_state, forward_step_func, model, optimizer, scheduler) + train_func( + forward_step_func, + model, + optimizer, + scheduler, + train_data_iterator, + valid_data_iterator, + global_state, + checkpointing_context, + pg_collection, + process_non_loss_data_func, + non_loss_data_func, + ) + self.trigger_on_train_end(global_state, forward_step_func, model, optimizer, scheduler) + + return wrapped_train + + def wrap_train_step(self, train_step_func: Callable) -> Callable: + """Wrap the train_step() function to add batch-level hooks.""" + + @wraps(train_step_func) + def wrapped_train_step( + forward_step_func: ForwardStepCallable, + data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + global_state: GlobalState, + pg_collection: ProcessGroupCollection, + forward_backward_func: Callable, + ): + self.trigger_on_train_batch_start(global_state, forward_step_func, model, optimizer, scheduler) + ret = train_step_func( + forward_step_func, + data_iterator, + model, + optimizer, + scheduler, + global_state, + pg_collection, + forward_backward_func, + ) + self.trigger_on_train_batch_end(global_state, forward_step_func, model, optimizer, scheduler, ret) + return ret + + return wrapped_train_step + + def wrap_evaluate(self, evaluate_func: Callable) -> Callable: + """Wrap the evaluate() function to add validation hooks.""" + + @wraps(evaluate_func) + def wrapped_evaluate( + state: GlobalState, + forward_step_func: ForwardStepCallable, + data_iterator: Optional[Union[RerunDataIterator, list[RerunDataIterator]]], + model: list[MegatronModule], + process_non_loss_data_func: Optional[Callable], + config: ConfigContainer, + verbose: bool = False, + non_loss_data_func: Optional[Callable] = None, + ): + self.trigger_on_validation_start(state, forward_step_func, model) + ret = evaluate_func( + state, + forward_step_func, + data_iterator, + model, + process_non_loss_data_func, + config, + verbose, + non_loss_data_func, + ) + self.trigger_on_validation_end(state, forward_step_func, model, ret) + + return ret + + return wrapped_evaluate + + +_callback_manager = CallbackManager() + + +def register_callback(callback: TrainingCallback) -> None: + """Register a callback globally.""" + _callback_manager.register(callback) + + +def install_callbacks() -> None: + """Install callbacks by wrapping the train, train_step, and evaluate functions.""" + import sys + + from megatron.bridge.training import eval as eval_module + from megatron.bridge.training import train as train_module + + train_module.train = _callback_manager.wrap_train(train_module.train) + train_module.train_step = _callback_manager.wrap_train_step(train_module.train_step) + eval_module.evaluate = _callback_manager.wrap_evaluate(eval_module.evaluate) + + if "megatron.bridge.training.pretrain" in sys.modules: + pretrain_module = sys.modules["megatron.bridge.training.pretrain"] + pretrain_module.train = train_module.train + + +class MLPerfLoggingCallback: + """MLPerf logging callback for compliance logging.""" + + def __init__(self, cfg): + self.cfg = cfg + self.global_batch_size = self.cfg.model.global_batch_size + self.train_block_started = True + self.train_current_block = 0 + self.force_success = cfg.custom.force_success_status + self.previous_step = 0 + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + mllogger.log_init_stop_run_start() + global_state.should_stop = False + mllogger.start( + mllogger.constants.BLOCK_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: self.cfg.trainer.val_check_interval * self.global_batch_size, + "step": global_state.train_state.step, + }, + ) + self.timer = DeltaTimer() + + def on_train_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + if self.train_block_started: + self._end_train_block(global_state) + + FullCudaGraphWrapper.cuda_graph = None + + def on_validation_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ): + """Log validation start.""" + if hasattr(global_state, "warmup") and global_state.warmup: + return + self._log_train_step_time(global_state) + if self.train_block_started: + self._end_train_block(global_state) + + mllogger.start( + mllogger.constants.EVAL_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: self._get_samples_count(global_state), + "step": self._get_step(global_state), + }, + ) + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + if hasattr(global_state, "warmup") and global_state.warmup: + return + self._log_custom_timedelta("validation_time", self._get_step(global_state)) + + samples_count = self._get_samples_count(global_state) + if self.cfg.model.pipeline_model_parallel_size > 1: + loss = broadcast_loss(ret[0]) + else: + loss = ret[0]["lm loss"].item() + + mllogger.event( + key=mllogger.constants.EVAL_ACCURACY, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count}, + value=loss, + ) + mllogger.end( + mllogger.constants.EVAL_STOP, + metadata={ + mllogger.constants.SAMPLES_COUNT: samples_count, + "step": self._get_step(global_state), + }, + ) + + if loss < self.cfg.custom.target_log_ppl: + global_state.should_stop = True + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "success"}, + ) + elif global_state.train_state.step >= self.cfg.trainer.max_steps: + global_state.should_stop = True + status = "success" if self.force_success else "aborted" + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": status}, + ) + if not os.environ.get("VAL_CHECK_INTERVAL"): + global_state.cfg.train.eval_interval = self.cfg.default_val_check_interval + + if not global_state.should_stop: + self._start_train_block(global_state) + else: + global_state.train_state.step = self.cfg.trainer.max_steps + 1 + global_state.train_state.do_valid = False + global_state.train_state.do_test = False + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + step = global_state.train_state.step + 1 + last_step = step >= self.cfg.trainer.max_steps + eval_after_this_step = step % global_state.cfg.train.eval_interval == 0 + if last_step and not eval_after_this_step: + samples_count = self._get_samples_count(global_state) + status = "success" if self.force_success else "aborted" + self._end_train_block(global_state) + mllogger.end( + mllogger.constants.RUN_STOP, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": status}, + ) + self.train_block_started = False + global_state.should_stop = True + global_state.train_state.do_valid = False + global_state.train_state.do_test = False + + def _start_train_block(self, global_state: GlobalState) -> None: + self.train_block_started = True + mllogger.start( + mllogger.constants.BLOCK_START, + metadata={ + mllogger.constants.SAMPLES_COUNT: global_state.cfg.train.eval_interval * self.global_batch_size, + "step": self._get_step(global_state), + }, + ) + + def _end_train_block(self, global_state: GlobalState) -> None: + mllogger.end( + mllogger.constants.BLOCK_STOP, + metadata={ + mllogger.constants.SAMPLES_COUNT: global_state.cfg.train.eval_interval * self.global_batch_size, + "step": self._get_step(global_state), + }, + ) + self.train_block_started = False + + def _log_train_step_time( + self, + global_state: GlobalState, + ) -> None: + delta_t = self.timer.get_delta() + global_step = self._get_step(global_state) + delta_step = global_step - self.previous_step + mllogger.event( + key="tracked_stats", + metadata={mllogger.constants.SAMPLES_COUNT: delta_step * self.global_batch_size}, + value={ + "train_step_time": delta_t / (delta_step + 1e-8), + }, + ) + + self.previous_step = global_step + + def _log_custom_timedelta(self, value_key, step: int = 0): + mllogger.event( + key="tracked_stats", + metadata={"step": step}, + value={value_key: self.timer.get_delta()}, + ) + + def _get_step(self, global_state): + return global_state.train_state.step + + def _get_samples_count(self, global_state): + return self._get_step(global_state) * self.global_batch_size + + +class DeltaTimingCallback: + """Callback for tracking training step timing.""" + + def __init__(self, cfg): + self.t0 = 0 + self.total_train_step_time = [0, 0] + self.global_batch_size = cfg.model.global_batch_size + self.log_every_n_steps = cfg.trainer.log_every_n_steps + + def on_train_start( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ): + self.t0 = time.time() + + def on_train_batch_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + optimizer: MegatronOptimizer, + scheduler: OptimizerParamScheduler, + ret: Any, + ): + t1 = time.time() + d = t1 - self.t0 + self.total_train_step_time[0] += d + self.total_train_step_time[1] += 1 + self.t0 = t1 + + if global_state.train_state.step % self.log_every_n_steps == 0 and get_last_pp_rank(): + mllogger.event( + key="tracked_stats", + metadata={mllogger.constants.SAMPLES_COUNT: self.global_batch_size * global_state.train_state.step}, + value={ + "train_step_time": d, + "reduced_train_loss": ret[0]["lm loss"].item(), + }, + unique=False, + ) + + def on_validation_end( + self, + global_state: GlobalState, + forward_step_func: ForwardStepCallable, + model: list[MegatronModule], + ret: Any, + ): + """Reset timer after validation to avoid including validation time in first train step.""" + self.t0 = time.time() diff --git a/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh new file mode 100644 index 000000000..b07be7399 --- /dev/null +++ b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh @@ -0,0 +1,82 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SSH: username that connects to the remote cluster +export USER="DUMMY" +# SSH: remote cluster URL +export HOST="DUMMY" +# Slurm: account for job submission +export ACCOUNT="DUMMY" +# Slurm: partition for job submission +export PARTITION="DUMMY" +# Slurm: job time limit, defaults to 8 hours +export TIME="08:00:00" +# Slurm: --nodes arguments, default to use 288 nodes +export NNODES=64 +# Slurm: --gpus_per_node and --ntasks_per_node argument, defaults to 8 GPUs per node +export GPUS_PER_NODE=4 +# Slurm: max job retries for transient job failures, defaults to retry 3 times +export MAX_RETRIES=1 + +# Folder mapping: +# Output directory that holds logs, any path that you like. +export JOB_DIR="/workspace/code/logs" +# Image / container path, either local cache file or remote URL +export IMAGE="DUMMY" +# Dataset: C4 dataset location that contains the dataset after preprocessing +# export ORIGINAL_C4_PATH="/data/data/C4" + +# This corresponds to the PREPROCESSED_PATH in README section 3's dataset download part +export PREPROCESSED_PATH="/data/deepseek_v3_671b/data/C4_processed" +export MERGED_C4_PATH="/data/deepseek_v3_671b/data/C4_merged" +# Dataset: Numpy index working directory, contains shuffled dataset +# This path must be able to hold >400GB data +export TMP_NPY_INDEX="/data/npy_indices" +# Dataset: Tokenizer path +# This corresponds to the TOKENIZER_PATH in README section 3's tokenizer download part +export TOKENIZER_PATH="/data/deepseek_v3_671b/model/DeepSeek-V3-671B-Base" +# export TOKENIZER_PATH="/data/llama3_405b_ref/tokenizer" + +export MODEL_CKPT="$TOKENIZER_PATH" + +# Training Configs: +# Dataloader: Global batch size +export GBS=1024 +# Dataloader: Micro batch size +export MBS=1 +export MAX_LR="2e-4" +export WARMUP_STEPS=256 +export EVAL_CHECK_INTERVAL=10 # every $EVAL_CHECK_INTERVAL steps +export EVAL_BATCHES=1 # evaluate on $EVAL_BATCHES * $GBS samples + + +export TENSOR_PARALLEL_SIZE=1 +export PIPELINE_PARALLEL_SIZE=4 +export CONTEXT_PARALLEL_SIZE=1 +export EXPERT_PARALLEL_SIZE=64 +export EXPERT_TENSOR_PARALLEL_SIZE=1 +export RECOMPUTE_MODULES="mlp,moe_act" +export CUDA_GRAPH_IMPLEMENTATION="transformer_engine" +export CUDA_GRAPH_SCOPE="attn" + +# Experiment manager: Number of experiments to launch +export NEXP=1 +# Experiment manager: how many consecutive jobs we want for each experiment +export NPAR=1 +# Experiment manager: provides seeds to the launched experiments, use space as delimiter, such as "1234 1235 1236" +# The training script will discard all excessive seeds, and generate seeds if given seeds < NEXP. +# To preserve randomness, we recommend not to set this value so that each time seeds can be randomly generated. + + +export DGXSYSTEM=$(basename $(readlink -f ${BASH_SOURCE[0]}) | sed 's/^config_//' | sed 's/\.sh$//' ) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py new file mode 100644 index 000000000..27566c088 --- /dev/null +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -0,0 +1,315 @@ +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import math +import os +from typing import Optional + +import torch + +from megatron.bridge.recipes.deepseek import deepseek_v3_pretrain_config +from megatron.bridge.training.config import GPTDatasetConfig, ConfigContainer +from megatron.bridge.training.gpt_step import forward_step +from megatron.bridge.training.pretrain import pretrain + +from callback import ( + MLPerfLoggingCallback, + DeltaTimingCallback, + mllogger, + install_callbacks, + register_callback, +) + + +def get_rank(): + """Get the current process rank.""" + import torch.distributed as dist + if dist.is_available() and dist.is_initialized(): + return dist.get_rank() + return 0 + + +def init_logging(): + """Initialize logging configuration.""" + import logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + +def get_data(seq_length: int, seed): + """Configure dataset paths and parameters.""" + dataset_path = os.getenv("PREPROCESSED_PATH", "/preproc_data") + val_test_path = f"{dataset_path}/c4-validation-91205-samples.en_text_document" + train_datasets = [f"{dataset_path}/c4-train.en_{idx}_text_document" for idx in [6, 7]] + train_datasets_weights = [50] * 2 + + data_paths = [ + (train_datasets, train_datasets_weights), + ([val_test_path], None), + ([val_test_path], None) + ] + + return GPTDatasetConfig( + dataloader_type="single", + blend_per_split=data_paths, + sequence_length=seq_length, + random_seed=seed, + num_workers=8, + path_to_cache="/npy_index", + reset_position_ids=False, + reset_attention_mask=False, + eod_mask_loss=False, + ) + + +def log_hyperparams(args, mbridge_config: ConfigContainer): + """Log hyperparameters for MLPerf compliance.""" + bmark = mllogger.constants.DEEPSEEK_V3 + opt_lr_decay_steps = args.max_steps - args.warmup_steps + mllogger.mlperf_submission_log(bmark) + + # Compute gradient accumulation steps + tp = args.tensor_parallel_size + pp = args.pipeline_parallel_size + cp = args.context_parallel_size + dp = (args.nodes * args.gpus_per_node) // (tp * pp * cp) + mini_batch_size = args.gbs // dp + grad_accumulation_steps = mini_batch_size // args.mbs + + logging_configs = { + mllogger.constants.SEED: args.seed, + mllogger.constants.GLOBAL_BATCH_SIZE: args.gbs, + mllogger.constants.GRADIENT_ACCUMULATION_STEPS: grad_accumulation_steps, + mllogger.constants.MAX_SEQUENCE_LENGTH: args.sequence_length, + mllogger.constants.EVAL_SAMPLES: args.gbs * args.eval_batches, + mllogger.constants.TRAIN_SAMPLES: 1574207408, + mllogger.constants.INIT_CHECKPOINT_STEP: 0, + mllogger.constants.OPT_NAME: mllogger.constants.ADAMW, + mllogger.constants.OPT_BASE_LR: mbridge_config.optimizer.lr, + mllogger.constants.OPT_ADAMW_BETA_1: mbridge_config.optimizer.adam_beta1, + mllogger.constants.OPT_ADAMW_BETA_2: mbridge_config.optimizer.adam_beta2, + mllogger.constants.OPT_ADAMW_EPSILON: mbridge_config.optimizer.adam_eps, + mllogger.constants.OPT_ADAMW_WEIGHT_DECAY: mbridge_config.optimizer.weight_decay, + mllogger.constants.OPT_GRADIENT_CLIP_NORM: mbridge_config.optimizer.clip_grad, + mllogger.constants.OPT_END_LR: args.min_lr, + mllogger.constants.OPT_LR_WARMUP_STEPS: mbridge_config.scheduler.lr_warmup_iters, + mllogger.constants.OPT_LR_DECAY_STEPS: opt_lr_decay_steps, + mllogger.constants.MAX_STEPS: args.max_steps, + mllogger.constants.OPT_LR_DECAY_SCHEDULE: "cosine with linear warmup", + "target_accuracy": args.target_log_ppl, + } + + for key, value in logging_configs.items(): + mllogger.event(key=key, value=value) + + +def create_config(args): + """Create the training configuration from arguments.""" + config = deepseek_v3_pretrain_config( + pipeline_model_parallel_size=args.pipeline_parallel_size, + virtual_pipeline_parallel_size=args.virtual_pipeline_parallel_size, + ) + + # Model parallelism configuration (hardcoded for DeepSeek V3) + model_cfg = config.model + model_cfg.tensor_model_parallel_size = args.tensor_parallel_size + model_cfg.context_parallel_size = args.context_parallel_size + model_cfg.expert_model_parallel_size = args.expert_model_parallel_size + model_cfg.expert_tensor_parallel_size = args.expert_tensor_parallel_size + model_cfg.sequence_parallel = args.tensor_parallel_size > 1 + model_cfg.seq_length = args.sequence_length + model_cfg.recompute_modules = args.recompute_modules.split(",") if args.recompute_modules else [] + model_cfg.cuda_graph_implementation = args.cuda_graph_implementation + model_cfg.cuda_graph_scope = args.cuda_graph_scope.split(",") if args.cuda_graph_scope else [] + + # MoE parameters (hardcoded for DeepSeek V3) + model_cfg.moe_token_dispatcher_type = args.moe_token_dispatcher_type + model_cfg.moe_grouped_gemm = args.moe_grouped_gemm + model_cfg.moe_permute_fusion = args.moe_permute_fusion + model_cfg.moe_router_fusion = args.moe_router_fusion + model_cfg.moe_router_force_load_balancing = False + + # Training configuration + train_cfg = config.train + train_cfg.micro_batch_size = args.mbs + train_cfg.global_batch_size = args.gbs + train_cfg.train_iters = args.max_steps + + # Compute eval intervals + eval_every_n_batches = math.ceil(args.eval_every / args.gbs) + eval_batches = math.ceil(args.eval_tokens / args.gbs) + train_cfg.eval_interval = eval_every_n_batches + train_cfg.eval_iters = eval_batches + + # Optimizer configuration + optimizer_cfg = config.optimizer + optimizer_cfg.lr = args.lr + optimizer_cfg.min_lr = args.min_lr + + # Scheduler configuration + scheduler_cfg = config.scheduler + scheduler_cfg.lr_warmup_iters = args.warmup_steps + + # RNG configuration + rng_cfg = config.rng + rng_cfg.seed = args.seed + + # Dataset configuration + config.dataset = get_data( + seq_length=args.sequence_length, + seed=args.seed, + ) + + # Checkpoint configuration + checkpoint_cfg = config.checkpoint + checkpoint_cfg.load = "/checkpoint" + checkpoint_cfg.load_optim = False + checkpoint_cfg.load_rng = False + checkpoint_cfg.exit_on_missing_checkpoint = True + + # Logger configuration + logger_cfg = config.logger + logger_cfg.log_interval = 1 + + return config + + +def get_parser() -> argparse.ArgumentParser: + """Create argument parser with same structure as llama31 pretrain script.""" + parser = argparse.ArgumentParser(description="DeepSeek V3 Pretraining") + parser.add_argument("--tag", type=str, help="Optional experiment tag", required=False, default="") + + # Slurm and executor related + slurm_group = parser.add_argument_group("Slurm executor arguments") + slurm_group.add_argument('--user', type=str, required=True, help="Remote cluster SSH user name") + slurm_group.add_argument("--host", type=str, required=True, help="Remote cluster host address") + slurm_group.add_argument("--job_dir", type=str, required=True, help="Remote job directory") + slurm_group.add_argument("--account", type=str, required=True, help="Account to be used for Slurm job submission") + slurm_group.add_argument("--partition", type=str, required=True, help="Partition to be used for Slurm job submission") + slurm_group.add_argument("--nodes", type=int, required=True, help="Number of nodes to be used") + slurm_group.add_argument("--gpus_per_node", type=int, required=True, help="Number of GPUs per node") + slurm_group.add_argument("--time", type=str, required=True, help="Time limit for the job") + slurm_group.add_argument("--dependencies", nargs="*", help="list of dependencies for the job") + slurm_group.add_argument("--max_retries", type=int, default=0) + slurm_group.add_argument("--run_slurm", action="store_true", help="run in slurm executor instead of locally") + slurm_group.add_argument( + "--mounts", + type=str, + required=True, + help=( + "Custom mount paths, formatted as a string of :[,:], " + "and should contain one path for /output, dataset path: /preproc_data, /npy_index" + )) + slurm_group.add_argument("--envvars", type=str, help="Environment variables to be added", default=None) + slurm_group.add_argument("--image", type=str, required=True, help="Container image path, either remote or local") + + # Model arguments + model_group = parser.add_argument_group("Model arguments") + model_group.add_argument("--tensor_parallel_size", type=int, required=True, help="Tensor parallel size") + model_group.add_argument("--pipeline_parallel_size", type=int, required=True, help="Pipeline parallel size") + model_group.add_argument("--context_parallel_size", type=int, required=True, help="Context parallel size") + model_group.add_argument("--expert_model_parallel_size", type=int, required=True, help="Expert model parallel size") + model_group.add_argument("--expert_tensor_parallel_size", type=int, required=True, help="Expert tensor parallel size") + model_group.add_argument("--recompute_modules", type=str, help="Recompute modules") + model_group.add_argument("--cuda_graph_implementation", type=str, help="CUDA graph implementation") + model_group.add_argument("--cuda_graph_scope", type=str, help="CUDA graph scope") + model_group.add_argument("--moe_token_dispatcher_type", type=str, help="MoE token dispatcher type", default="alltoall") + model_group.add_argument("--moe_grouped_gemm", type=bool, help="MoE grouped GEMM", default=True) + model_group.add_argument("--moe_permute_fusion", type=bool, help="MoE permute fusion", default=False) + model_group.add_argument("--moe_router_fusion", type=bool, help="MoE router fusion", default=False) + model_group.add_argument("--moe_router_force_load_balancing", type=bool, help="MoE router force load balancing", default=False) + model_group.add_argument("--sequence_length", type=int, help="Sequence length", default=4096) + + + # Training arguments + training_group = parser.add_argument_group("Training arguments") + training_group.add_argument("--gbs", type=int, default=1024, help="Global batch size") + training_group.add_argument("--mbs", type=int, default=1, help="Micro batch size") + training_group.add_argument("--lr", type=float, default=2e-4, help="Initial learning rate after warmup.") + training_group.add_argument("--min_lr", type=float, default=5e-6, help="Minimum learning rate") + training_group.add_argument('--max_steps', type=int, default=1000, help="Maximum number of steps") + training_group.add_argument('--warmup_steps', type=int, default=0, help="Number of steps for LR warmup") + training_group.add_argument("--seed", type=int, default=1234, help="Random seed") + training_group.add_argument("--eval_check_interval", type=int, default=10, help="Evaluate every N steps") + training_group.add_argument("--eval_batches", type=int, default=1, help="Evaluate N batches") + + + # Experiment management arguments + experiment_group = parser.add_argument_group("Experiment management arguments") + experiment_group.add_argument("--dryrun", action="store_true", help="Whether we are launching dryrun or actual runs") + experiment_group.add_argument("--seeds", type=int, nargs="*", default=[], help="random seeds") + experiment_group.add_argument("--num_exps", type=int, default=1) + experiment_group.add_argument("--num_pars", type=int, default=1) + experiment_group.add_argument("--target_log_ppl", type=float, default=1.0, help="Target log perplexity") + experiment_group.add_argument("--step_time_atol", type=int, default=1600, help="train step time atol") + + return parser + + +class ArgsConfig: + """Configuration object that wraps args to match expected interface.""" + def __init__(self, args): + self.args = args + + # Create nested config structure matching what callbacks expect + self.model = type('ModelConfig', (), { + 'global_batch_size': args.gbs, + 'micro_batch_size': args.mbs, + 'pipeline_model_parallel_size': args.pipeline_parallel_size, + 'encoder_seq_length': args.sequence_length, + 'seed': args.seed, + })() + + self.trainer = type('TrainerConfig', (), { + 'max_steps': args.max_steps, + 'val_check_interval': args.eval_check_interval, + 'limit_val_batches': args.eval_batches, + 'log_every_n_steps': 1, + 'warmup_train_steps': 0, + })() + + self.custom = type('CustomConfig', (), { + 'target_log_ppl': args.target_log_ppl, + })() + + self.default_val_check_interval = self.trainer.val_check_interval + + +def main(): + """Main entry point for DeepSeek V3 pretraining.""" + args = get_parser().parse_args() + + if args.tag and not args.tag.startswith("-"): + args.tag = "-" + args.tag + + init_logging() + config = create_config(args) + cfg = ArgsConfig(args) + + if get_rank() == 0: + log_hyperparams(args, config) + mllogger.start(key=mllogger.constants.INIT_START) + + register_callback(DeltaTimingCallback(cfg)) + register_callback(MLPerfLoggingCallback(cfg)) + install_callbacks() + + pretrain(config, forward_step_func=forward_step) + + +if __name__ == "__main__": + main() diff --git a/moe_pretraining/nemo/requirements.txt b/moe_pretraining/nemo/requirements.txt new file mode 100644 index 000000000..4762e3dbf --- /dev/null +++ b/moe_pretraining/nemo/requirements.txt @@ -0,0 +1,16 @@ +git+https://github.com/denys-fridman/logging.git@dfridman/deepseek-v3 # TODO(dfridman): revert to main repo once merged +git+https://github.com/NVIDIA/mlperf-common.git@b86d175a05849d650a8ff69c1e2c37b9f4e61d51 +transformers==4.57.1 +blobfile==3.0.0 +prettytable==3.11.0 +# WAR for MultiStorageClient +multi-storage-client<0.26.0 + +# TODO(dfridman): below are needed by megatron-bridge. Dependencies need to be fixed on MBridge side: +# some of the below packages are in `uv.lock`, but not in `pyproject.toml` - so we can't easily pip install them. +wandb==0.23.0 +nvidia-modelopt[all]==0.39.0 +nvidia-resiliency-ext==0.5.0 +megatron-energon==6.0.1 +bitstring==4.3.1 +filetype==1.2.0 diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh new file mode 100644 index 000000000..496dbb14c --- /dev/null +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -0,0 +1,134 @@ +#!/bin/bash + +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +#git config --global --add safe.directory /workspace/llama31 + +# Vars without defaults +# Slurm settings +: "${USER:?USER not set}" +: "${HOST:?HOST not set}" +: "${ACCOUNT:?ACCOUNT not set}" +: "${PARTITION:?PARTITION not set}" +: "${REMOTE:=0}" + +# Job settings +: "${JOB_DIR:?JOB_DIR not set}" +: "${IMAGE:?IMAGE not set}" + +# Dataset settings +: "${PREPROCESSED_PATH:?PREPROCESSED_PATH not set}" +: "${TOKENIZER_PATH:?TOKENIZER_PATH not set}" + +# Vars with defaults +# Slurm settings +: "${TIME:="04:00:00"}" +: "${NNODES:=1}" +: "${GPUS_PER_NODE:=8}" +: "${DEPENDENCIES:=""}" + +# Job settings +: "${NEMO_RUN_DIR:=""}" # Provide customized NeMo-Run path here +: "${TMP_NPY_INDEX:=""}" # Provide temporary NNumpy Index saving directory +: "${MAX_RETRIES:=0}" + +# Model settings +: "${GBS:=1024}" +: "${MBS:=1}" + +# Eval settings +: "${EVAL_CHECK_INTERVAL:=10}" +: "${EVAL_BATCHES:=1}" + +# Dataloader settings +: "${MAX_STEPS:="1200000"}" + +# Experiment settings +: "${SEEDS:=""}" +IFS=" " read -ra seeds <<< $SEEDS +: "${NEXP:=1}" +: "${NPAR:=1}" +: "${TAG:=""}" +: "${TARGET:="1.0"}" # TODO(dfridman): update once determined +: "${STEP_TIME_ATOL:="18000"}" # maximum tolerable step time, setting to 5hr by default + +# Run + +MOUNTS="${JOB_DIR}:/output,${JOB_DIR}:/mlperf-outputs,${PREPROCESSED_PATH}:/preproc_data,${TOKENIZER_PATH}:/tokenizer,${MODEL_CKPT}:/checkpoint" + +CMD_SUFFIX="" + +if [ ! $NEMO_RUN_DIR = "" ]; then + MOUNTS="${MOUNTS},${NEMO_RUN_DIR}:/opt/NeMo-Run" +fi + +if [ ! $TMP_NPY_INDEX = "" ]; then + MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" +fi + +if [ ! $DEPENDENCIES = "" ]; then + CMD_SUFFIX="${CMD_SUFFIX} --dependencies ${DEPENDENCIES}" +fi + +if [ ! $MAX_STEPS = "" ]; then + CMD_SUFFIX="${CMD_SUFFIX} --max_steps ${MAX_STEPS}" +fi + +if [ ! $TAG = "" ]; then + CMD_SUFFIX="${CMD_SUFFIX} --tag ${TAG}" +fi + +if [ $REMOTE -gt 0 ]; then + CMD_SUFFIX="${CMD_SUFFIX} --run_slurm" +fi + +# Allows MLLogger objects to be constructed locally +if [ ! -d /mlperf-outputs ]; then mkdir /mlperf-outputs; fi + +set -x + +python3 pretrain_llama31.py \ +--user $USER --host $HOST \ +--job_dir $JOB_DIR \ +--account $ACCOUNT --partition $PARTITION \ +--nodes $NNODES --gpus_per_node $GPUS_PER_NODE \ +--time $TIME \ +--max_retries $MAX_RETRIES \ +--mounts $MOUNTS \ +--image $IMAGE \ +--size $SIZE \ +--gbs $GBS \ +--mbs $MBS \ +--seeds ${seeds[@]} \ +--num_exps $NEXP \ +--num_pars $NPAR \ +--tokenizer_path $TOKENIZER_PATH \ +--target_log_ppl $TARGET \ +--step_time_atol $STEP_TIME_ATOL \ +--warmup_steps $WARMUP_STEPS \ +--tensor_parallel_size $TENSOR_PARALLEL_SIZE \ +--pipeline_parallel_size $PIPELINE_PARALLEL_SIZE \ +--context_parallel_size $CONTEXT_PARALLEL_SIZE \ +--expert_model_parallel_size $EXPERT_PARALLEL_SIZE \ +--expert_tensor_parallel_size $EXPERT_TENSOR_PARALLEL_SIZE \ +--recompute_modules $RECOMPUTE_MODULES \ +--cuda_graph_implementation $CUDA_GRAPH_IMPLEMENTATION \ +--cuda_graph_scope $CUDA_GRAPH_SCOPE \ +--lr $LR \ +--eval_check_interval $EVAL_CHECK_INTERVAL \ +--eval_batches $EVAL_BATCHES \ +$CMD_SUFFIX From 412ae5395ba4e74a608b9d934e9e08dbcd185247 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Tue, 20 Jan 2026 11:14:35 +0100 Subject: [PATCH 02/16] Add NemoRun launcher for DeepSeek V3 pretraining --- .../nemo/pretrain_deepseek_v3_671b.py | 1 + moe_pretraining/nemo/run_deepseek.py | 329 ++++++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 moe_pretraining/nemo/run_deepseek.py diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index 27566c088..6e24faf5d 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -221,6 +221,7 @@ def get_parser() -> argparse.ArgumentParser: model_group = parser.add_argument_group("Model arguments") model_group.add_argument("--tensor_parallel_size", type=int, required=True, help="Tensor parallel size") model_group.add_argument("--pipeline_parallel_size", type=int, required=True, help="Pipeline parallel size") + model_group.add_argument("--virtual_pipeline_parallel_size", type=int, default=None, help="Virtual pipeline parallel size") model_group.add_argument("--context_parallel_size", type=int, required=True, help="Context parallel size") model_group.add_argument("--expert_model_parallel_size", type=int, required=True, help="Expert model parallel size") model_group.add_argument("--expert_tensor_parallel_size", type=int, required=True, help="Expert tensor parallel size") diff --git a/moe_pretraining/nemo/run_deepseek.py b/moe_pretraining/nemo/run_deepseek.py new file mode 100644 index 000000000..d6c0bda84 --- /dev/null +++ b/moe_pretraining/nemo/run_deepseek.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NemoRun launcher script for DeepSeek V3 pretraining. + +This script uses NemoRun's Slurm executor to launch pretrain_deepseek_v3_671b.py +on a Slurm cluster. + +Example usage: + python run_deepseek.py \ + --account my_account \ + --partition my_partition \ + --nodes 64 \ + --gpus_per_node 4 \ + --time_limit 04:00:00 \ + --container_image nvcr.io/nvidia/nemo:dev \ + --log_dir /path/to/logs \ + --mounts /data:/data,/checkpoints:/checkpoints \ + -- \ + --tensor_parallel_size 1 \ + --pipeline_parallel_size 4 \ + --context_parallel_size 1 \ + --expert_model_parallel_size 64 \ + --expert_tensor_parallel_size 1 \ + --gbs 2048 \ + --max_steps 1000 +""" + +import argparse +import logging +import os +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional + +import nemo_run as run +from nemo_run.config import get_nemorun_home, set_nemorun_home +from nemo_run.core.execution.launcher import SlurmTemplate + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SCRIPT_DIR = Path(__file__).parent.resolve() +PRETRAIN_SCRIPT = "pretrain_deepseek_v3_671b.py" + +# Inline bash template for Slurm +INLINE_TEMPLATE = r""" +#!/usr/bin/env bash +set -euo pipefail + +bash -c '{{ pre_cmds }} {{ command }}' +""" + +# Default environment variables for performance +PERF_ENV_VARS = { + "TORCH_NCCL_AVOID_RECORD_STREAMS": "1", + "TRANSFORMERS_OFFLINE": "0", + "TOKENIZERS_PARALLELISM": "False", + "NCCL_NVLS_ENABLE": "0", + "NVTE_NORM_FWD_USE_CUDNN": "1", + "NVTE_NORM_BWD_USE_CUDNN": "1", + "TORCH_NCCL_HIGH_PRIORITY": "1", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", +} + + +def slurm_executor( + gpu: str, + account: str, + partition: str, + log_dir: str, + nodes: int, + num_gpus_per_node: int, + time_limit: str = "04:00:00", + container_image: str = "nvcr.io/nvidia/nemo:dev", + custom_mounts: List[str] = None, + custom_env_vars: Dict[str, str] = None, + custom_srun_args: List[str] = None, + gres: Optional[str] = None, +) -> run.SlurmExecutor: + """ + Create a Slurm executor for DeepSeek V3 pretraining. + + Args: + gpu: GPU type (e.g., "gb200", "gb300", "h100") + account: Slurm account + partition: Slurm partition + log_dir: Directory for logs + nodes: Number of nodes + num_gpus_per_node: GPUs per node + time_limit: Job time limit + container_image: Container image to use + custom_mounts: Additional container mounts + custom_env_vars: Additional environment variables + custom_srun_args: Additional srun arguments + gres: GPU resource specification + """ + custom_mounts = custom_mounts or [] + custom_env_vars = custom_env_vars or {} + custom_srun_args = custom_srun_args or [] + custom_bash_cmds = [] + + mounts = [] + srun_args = custom_srun_args.copy() + [ + "--mpi=pmix", + "--no-container-mount-home", + ] + + if log_dir is not None: + set_nemorun_home(log_dir) + else: + if os.environ.get("NEMORUN_HOME") is None: + logger.warning( + f"Logs will be written to {get_nemorun_home()}. " + "Set NEMORUN_HOME or use --log_dir to change this." + ) + + env_vars = PERF_ENV_VARS.copy() + + # GPU-specific settings + if gpu.lower() in ["gb200", "gb300"]: + env_vars["NCCL_NET_GDR_LEVEL"] = "PHB" + env_vars["NCCL_NET_GDR_C2C"] = "1" + + env_vars.update(custom_env_vars) + mounts.extend(custom_mounts) + + # Mount the script directory + mounts.append(f"{SCRIPT_DIR}:{SCRIPT_DIR}") + + # Compute segment for GB200/GB300 + segment = None + if num_gpus_per_node == 4: + if nodes <= 18: + segment = nodes + else: + for segment_candidate in range(18, 0, -1): + if nodes % segment_candidate == 0: + segment = segment_candidate + break + + # NUMA binding + numa_divisor = 2 if gpu.lower() in ["gb200", "gb300"] else 4 + numa_cmd = f"numactl --cpunodebind=$((SLURM_LOCALID/{numa_divisor})) --membind=$((SLURM_LOCALID/{numa_divisor}))" + custom_bash_cmds.append(numa_cmd) + + launcher = SlurmTemplate( + template_inline=INLINE_TEMPLATE, + template_vars={"pre_cmds": " ; ".join(custom_bash_cmds)}, + ) + + executor = run.SlurmExecutor( + account=account, + partition=partition, + tunnel=run.LocalTunnel(job_dir=os.path.join(get_nemorun_home(), "experiments")), + nodes=nodes, + ntasks_per_node=num_gpus_per_node, + gres=gres, + container_image=container_image, + container_mounts=mounts, + env_vars=env_vars, + srun_args=srun_args, + time=time_limit, + mem="0", + exclusive=True, + packager=run.GitArchivePackager(), + segment=segment, + launcher=launcher, + ) + + return executor + + +def get_parser() -> argparse.ArgumentParser: + """Create argument parser for the launcher.""" + parser = argparse.ArgumentParser( + description="NemoRun launcher for DeepSeek V3 pretraining", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + # Slurm configuration + slurm_group = parser.add_argument_group("Slurm configuration") + slurm_group.add_argument("--account", type=str, required=True, help="Slurm account") + slurm_group.add_argument("--partition", type=str, required=True, help="Slurm partition") + slurm_group.add_argument("--nodes", type=int, required=True, help="Number of nodes") + slurm_group.add_argument("--gpus_per_node", type=int, default=4, help="GPUs per node") + slurm_group.add_argument("--time_limit", type=str, default="04:00:00", help="Job time limit") + slurm_group.add_argument("--gpu", type=str, default="gb300", help="GPU type (gb200, gb300, h100)") + slurm_group.add_argument("--gres", type=str, default=None, help="GPU resource specification") + + # Container configuration + container_group = parser.add_argument_group("Container configuration") + container_group.add_argument( + "--container_image", + type=str, + default="nvcr.io/nvidia/nemo:dev", + help="Container image" + ) + container_group.add_argument( + "--mounts", + type=str, + default="", + help="Container mounts (comma-separated, format: src:dst)" + ) + container_group.add_argument( + "--envvars", + type=str, + default="", + help="Environment variables (comma-separated, format: KEY=VALUE)" + ) + + # Logging configuration + log_group = parser.add_argument_group("Logging configuration") + log_group.add_argument("--log_dir", type=str, default=None, help="Log directory") + log_group.add_argument("--exp_name", type=str, default=None, help="Experiment name") + + # Execution control + exec_group = parser.add_argument_group("Execution control") + exec_group.add_argument("--dryrun", action="store_true", help="Print command without executing") + exec_group.add_argument("--detach", action="store_true", help="Detach after submitting job") + + return parser + + +def parse_mounts(mounts_str: str) -> List[str]: + """Parse comma-separated mounts string.""" + if not mounts_str: + return [] + return [m.strip() for m in mounts_str.split(",") if m.strip()] + + +def parse_envvars(envvars_str: str) -> Dict[str, str]: + """Parse comma-separated environment variables string.""" + if not envvars_str: + return {} + result = {} + for item in envvars_str.split(","): + if "=" in item: + key, value = item.split("=", 1) + result[key.strip()] = value.strip() + return result + + +def main(): + """Main entry point.""" + # Split arguments: before '--' are for launcher, after are for pretrain script + if "--" in sys.argv: + split_idx = sys.argv.index("--") + launcher_args = sys.argv[1:split_idx] + pretrain_args = sys.argv[split_idx + 1:] + else: + launcher_args = sys.argv[1:] + pretrain_args = [] + + parser = get_parser() + args = parser.parse_args(launcher_args) + + # Parse mounts and env vars + custom_mounts = parse_mounts(args.mounts) + custom_env_vars = parse_envvars(args.envvars) + + # Create executor + executor = slurm_executor( + gpu=args.gpu, + account=args.account, + partition=args.partition, + log_dir=args.log_dir, + nodes=args.nodes, + num_gpus_per_node=args.gpus_per_node, + time_limit=args.time_limit, + container_image=args.container_image, + custom_mounts=custom_mounts, + custom_env_vars=custom_env_vars, + gres=args.gres, + ) + + # Build the pretrain script path + pretrain_script_path = SCRIPT_DIR / PRETRAIN_SCRIPT + if not pretrain_script_path.is_file(): + logger.error(f"Pretrain script not found: {pretrain_script_path}") + sys.exit(1) + + # Create NemoRun script + nemorun_script = run.Script( + path=str(pretrain_script_path), + entrypoint="python", + env={"PYTHONPATH": f"{SCRIPT_DIR}:$PYTHONPATH"}, + args=pretrain_args, + ) + + # Generate experiment name + exp_name = args.exp_name or f"deepseek_v3_{args.nodes}x{args.gpus_per_node}gpu" + + logger.info(f"Launching: {' '.join(nemorun_script.to_command())}") + + # Run the experiment + run.run( + nemorun_script, + executor=executor, + dryrun=args.dryrun, + detach=args.detach, + name=exp_name, + ) + + if args.dryrun: + logger.info("Dryrun complete. No job submitted.") + elif args.detach: + logger.info(f"Job submitted. Experiment name: {exp_name}") + else: + logger.info("Job completed.") + + +if __name__ == "__main__": + main() From 3ac907cd3bef8be68e25737249dcfcb66e5d510d Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Tue, 20 Jan 2026 11:24:16 +0100 Subject: [PATCH 03/16] Update run_deepseek_v3_671b.sh to use NemoRun launcher --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 170 +++++++++++-------- 1 file changed, 96 insertions(+), 74 deletions(-) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index 496dbb14c..2e54e0722 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -1,6 +1,6 @@ #!/bin/bash -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,119 +16,141 @@ set -e -#git config --global --add safe.directory /workspace/llama31 - # Vars without defaults # Slurm settings -: "${USER:?USER not set}" -: "${HOST:?HOST not set}" : "${ACCOUNT:?ACCOUNT not set}" : "${PARTITION:?PARTITION not set}" -: "${REMOTE:=0}" # Job settings -: "${JOB_DIR:?JOB_DIR not set}" +: "${LOG_DIR:?LOG_DIR not set}" : "${IMAGE:?IMAGE not set}" # Dataset settings : "${PREPROCESSED_PATH:?PREPROCESSED_PATH not set}" -: "${TOKENIZER_PATH:?TOKENIZER_PATH not set}" +: "${MODEL_CKPT:?MODEL_CKPT not set}" # Vars with defaults # Slurm settings : "${TIME:="04:00:00"}" -: "${NNODES:=1}" -: "${GPUS_PER_NODE:=8}" -: "${DEPENDENCIES:=""}" +: "${NNODES:=64}" +: "${GPUS_PER_NODE:=4}" +: "${GPU:="gb300"}" # Job settings -: "${NEMO_RUN_DIR:=""}" # Provide customized NeMo-Run path here -: "${TMP_NPY_INDEX:=""}" # Provide temporary NNumpy Index saving directory -: "${MAX_RETRIES:=0}" +: "${TMP_NPY_INDEX:=""}" # Model settings -: "${GBS:=1024}" +: "${GBS:=2048}" : "${MBS:=1}" - -# Eval settings +: "${TENSOR_PARALLEL_SIZE:=1}" +: "${PIPELINE_PARALLEL_SIZE:=4}" +: "${VIRTUAL_PIPELINE_PARALLEL_SIZE:=""}" +: "${CONTEXT_PARALLEL_SIZE:=1}" +: "${EXPERT_PARALLEL_SIZE:=64}" +: "${EXPERT_TENSOR_PARALLEL_SIZE:=1}" +: "${SEQUENCE_LENGTH:=4096}" +: "${RECOMPUTE_MODULES:="mlp,moe_act"}" +: "${CUDA_GRAPH_IMPLEMENTATION:="transformer_engine"}" +: "${CUDA_GRAPH_SCOPE:="attn"}" +: "${MOE_TOKEN_DISPATCHER_TYPE:="alltoall"}" +: "${MOE_GROUPED_GEMM:=True}" +: "${MOE_PERMUTE_FUSION:=False}" +: "${MOE_ROUTER_FUSION:=False}" + +# Training settings +: "${MAX_STEPS:=1000}" +: "${WARMUP_STEPS:=0}" +: "${LR:="2e-4"}" +: "${MIN_LR:="5e-6"}" +: "${SEED:=1234}" + +# Eval settings : "${EVAL_CHECK_INTERVAL:=10}" : "${EVAL_BATCHES:=1}" -# Dataloader settings -: "${MAX_STEPS:="1200000"}" - # Experiment settings -: "${SEEDS:=""}" -IFS=" " read -ra seeds <<< $SEEDS -: "${NEXP:=1}" -: "${NPAR:=1}" -: "${TAG:=""}" -: "${TARGET:="1.0"}" # TODO(dfridman): update once determined -: "${STEP_TIME_ATOL:="18000"}" # maximum tolerable step time, setting to 5hr by default +: "${EXP_NAME:=""}" +: "${TARGET:="1.0"}" +: "${DRYRUN:=0}" +: "${DETACH:=0}" -# Run +# Build mounts +MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${PREPROCESSED_PATH}:/preproc_data,${MODEL_CKPT}:/checkpoint" -MOUNTS="${JOB_DIR}:/output,${JOB_DIR}:/mlperf-outputs,${PREPROCESSED_PATH}:/preproc_data,${TOKENIZER_PATH}:/tokenizer,${MODEL_CKPT}:/checkpoint" +if [ -n "$TMP_NPY_INDEX" ]; then + MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" +fi -CMD_SUFFIX="" +# Build environment variables +ENVVARS="PREPROCESSED_PATH=/preproc_data" + +# Build launcher arguments +LAUNCHER_ARGS="--account $ACCOUNT --partition $PARTITION" +LAUNCHER_ARGS="$LAUNCHER_ARGS --nodes $NNODES --gpus_per_node $GPUS_PER_NODE" +LAUNCHER_ARGS="$LAUNCHER_ARGS --gpu $GPU" +LAUNCHER_ARGS="$LAUNCHER_ARGS --time_limit $TIME" +LAUNCHER_ARGS="$LAUNCHER_ARGS --container_image $IMAGE" +LAUNCHER_ARGS="$LAUNCHER_ARGS --log_dir $LOG_DIR" +LAUNCHER_ARGS="$LAUNCHER_ARGS --mounts $MOUNTS" +LAUNCHER_ARGS="$LAUNCHER_ARGS --envvars $ENVVARS" + +if [ -n "$EXP_NAME" ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --exp_name $EXP_NAME" +fi -if [ ! $NEMO_RUN_DIR = "" ]; then - MOUNTS="${MOUNTS},${NEMO_RUN_DIR}:/opt/NeMo-Run" +if [ "$DRYRUN" -gt 0 ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --dryrun" fi -if [ ! $TMP_NPY_INDEX = "" ]; then - MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" +if [ "$DETACH" -gt 0 ]; then + LAUNCHER_ARGS="$LAUNCHER_ARGS --detach" fi -if [ ! $DEPENDENCIES = "" ]; then - CMD_SUFFIX="${CMD_SUFFIX} --dependencies ${DEPENDENCIES}" +# Build pretrain arguments +PRETRAIN_ARGS="--tensor_parallel_size $TENSOR_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --pipeline_parallel_size $PIPELINE_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --context_parallel_size $CONTEXT_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --expert_model_parallel_size $EXPERT_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --expert_tensor_parallel_size $EXPERT_TENSOR_PARALLEL_SIZE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --sequence_length $SEQUENCE_LENGTH" +PRETRAIN_ARGS="$PRETRAIN_ARGS --gbs $GBS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --mbs $MBS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --lr $LR" +PRETRAIN_ARGS="$PRETRAIN_ARGS --min_lr $MIN_LR" +PRETRAIN_ARGS="$PRETRAIN_ARGS --max_steps $MAX_STEPS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --warmup_steps $WARMUP_STEPS" +PRETRAIN_ARGS="$PRETRAIN_ARGS --seed $SEED" +PRETRAIN_ARGS="$PRETRAIN_ARGS --eval_check_interval $EVAL_CHECK_INTERVAL" +PRETRAIN_ARGS="$PRETRAIN_ARGS --eval_batches $EVAL_BATCHES" +PRETRAIN_ARGS="$PRETRAIN_ARGS --target_log_ppl $TARGET" + +if [ -n "$VIRTUAL_PIPELINE_PARALLEL_SIZE" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --virtual_pipeline_parallel_size $VIRTUAL_PIPELINE_PARALLEL_SIZE" fi -if [ ! $MAX_STEPS = "" ]; then - CMD_SUFFIX="${CMD_SUFFIX} --max_steps ${MAX_STEPS}" +if [ -n "$RECOMPUTE_MODULES" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --recompute_modules $RECOMPUTE_MODULES" fi -if [ ! $TAG = "" ]; then - CMD_SUFFIX="${CMD_SUFFIX} --tag ${TAG}" +if [ -n "$CUDA_GRAPH_IMPLEMENTATION" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --cuda_graph_implementation $CUDA_GRAPH_IMPLEMENTATION" fi -if [ $REMOTE -gt 0 ]; then - CMD_SUFFIX="${CMD_SUFFIX} --run_slurm" +if [ -n "$CUDA_GRAPH_SCOPE" ]; then + PRETRAIN_ARGS="$PRETRAIN_ARGS --cuda_graph_scope $CUDA_GRAPH_SCOPE" fi +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_token_dispatcher_type $MOE_TOKEN_DISPATCHER_TYPE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_grouped_gemm $MOE_GROUPED_GEMM" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_permute_fusion $MOE_PERMUTE_FUSION" +PRETRAIN_ARGS="$PRETRAIN_ARGS --moe_router_fusion $MOE_ROUTER_FUSION" + # Allows MLLogger objects to be constructed locally -if [ ! -d /mlperf-outputs ]; then mkdir /mlperf-outputs; fi +if [ ! -d /mlperf-outputs ]; then mkdir -p /mlperf-outputs 2>/dev/null || true; fi set -x -python3 pretrain_llama31.py \ ---user $USER --host $HOST \ ---job_dir $JOB_DIR \ ---account $ACCOUNT --partition $PARTITION \ ---nodes $NNODES --gpus_per_node $GPUS_PER_NODE \ ---time $TIME \ ---max_retries $MAX_RETRIES \ ---mounts $MOUNTS \ ---image $IMAGE \ ---size $SIZE \ ---gbs $GBS \ ---mbs $MBS \ ---seeds ${seeds[@]} \ ---num_exps $NEXP \ ---num_pars $NPAR \ ---tokenizer_path $TOKENIZER_PATH \ ---target_log_ppl $TARGET \ ---step_time_atol $STEP_TIME_ATOL \ ---warmup_steps $WARMUP_STEPS \ ---tensor_parallel_size $TENSOR_PARALLEL_SIZE \ ---pipeline_parallel_size $PIPELINE_PARALLEL_SIZE \ ---context_parallel_size $CONTEXT_PARALLEL_SIZE \ ---expert_model_parallel_size $EXPERT_PARALLEL_SIZE \ ---expert_tensor_parallel_size $EXPERT_TENSOR_PARALLEL_SIZE \ ---recompute_modules $RECOMPUTE_MODULES \ ---cuda_graph_implementation $CUDA_GRAPH_IMPLEMENTATION \ ---cuda_graph_scope $CUDA_GRAPH_SCOPE \ ---lr $LR \ ---eval_check_interval $EVAL_CHECK_INTERVAL \ ---eval_batches $EVAL_BATCHES \ -$CMD_SUFFIX +python3 run_deepseek.py \ + $LAUNCHER_ARGS \ + -- \ + $PRETRAIN_ARGS From b7bb8cd0316021bd7eb35417d9bf3e3a08f8f957 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 10:04:56 +0100 Subject: [PATCH 04/16] cd to current dir --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index 2e54e0722..6badb0daa 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. + +cd $(dirname $0) + set -e # Vars without defaults From ff7e1605f2b71afdcfe3a4f754b5ff1450012157 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 10:09:43 +0100 Subject: [PATCH 05/16] detach = True by default --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index 6badb0daa..caee9946d 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -75,7 +75,7 @@ set -e : "${EXP_NAME:=""}" : "${TARGET:="1.0"}" : "${DRYRUN:=0}" -: "${DETACH:=0}" +: "${DETACH:=1}" # Build mounts MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${PREPROCESSED_PATH}:/preproc_data,${MODEL_CKPT}:/checkpoint" From 38e318c8e7a4cf412fc833cfed91f307eb53f90b Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 01:22:01 -0800 Subject: [PATCH 06/16] Fix env vars --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index caee9946d..8c6da9723 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -29,7 +29,7 @@ set -e : "${IMAGE:?IMAGE not set}" # Dataset settings -: "${PREPROCESSED_PATH:?PREPROCESSED_PATH not set}" +: "${DATA_DIR:?DATA_DIR not set}" : "${MODEL_CKPT:?MODEL_CKPT not set}" # Vars with defaults @@ -78,14 +78,12 @@ set -e : "${DETACH:=1}" # Build mounts -MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${PREPROCESSED_PATH}:/preproc_data,${MODEL_CKPT}:/checkpoint" +MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${DATA_DIR}:/preproc_data,${MODEL_CKPT}:/checkpoint,${DATA_DIR}/tokenizer:/tokenizer" if [ -n "$TMP_NPY_INDEX" ]; then MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" fi -# Build environment variables -ENVVARS="PREPROCESSED_PATH=/preproc_data" # Build launcher arguments LAUNCHER_ARGS="--account $ACCOUNT --partition $PARTITION" @@ -95,7 +93,6 @@ LAUNCHER_ARGS="$LAUNCHER_ARGS --time_limit $TIME" LAUNCHER_ARGS="$LAUNCHER_ARGS --container_image $IMAGE" LAUNCHER_ARGS="$LAUNCHER_ARGS --log_dir $LOG_DIR" LAUNCHER_ARGS="$LAUNCHER_ARGS --mounts $MOUNTS" -LAUNCHER_ARGS="$LAUNCHER_ARGS --envvars $ENVVARS" if [ -n "$EXP_NAME" ]; then LAUNCHER_ARGS="$LAUNCHER_ARGS --exp_name $EXP_NAME" From 200f49674e6ff24216fa1c954c24a73440b6d911 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 11:22:35 +0100 Subject: [PATCH 07/16] Remove unused arguments and fix eval config bug --- .../nemo/pretrain_deepseek_v3_671b.py | 77 +++++-------------- 1 file changed, 18 insertions(+), 59 deletions(-) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index 6e24faf5d..9fd800d92 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -13,11 +13,7 @@ # limitations under the License. import argparse -import math import os -from typing import Optional - -import torch from megatron.bridge.recipes.deepseek import deepseek_v3_pretrain_config from megatron.bridge.training.config import GPTDatasetConfig, ConfigContainer @@ -149,11 +145,9 @@ def create_config(args): train_cfg.global_batch_size = args.gbs train_cfg.train_iters = args.max_steps - # Compute eval intervals - eval_every_n_batches = math.ceil(args.eval_every / args.gbs) - eval_batches = math.ceil(args.eval_tokens / args.gbs) - train_cfg.eval_interval = eval_every_n_batches - train_cfg.eval_iters = eval_batches + # Eval configuration + train_cfg.eval_interval = args.eval_check_interval + train_cfg.eval_iters = args.eval_batches # Optimizer configuration optimizer_cfg = config.optimizer @@ -189,33 +183,13 @@ def create_config(args): def get_parser() -> argparse.ArgumentParser: - """Create argument parser with same structure as llama31 pretrain script.""" + """Create argument parser for DeepSeek V3 pretraining.""" parser = argparse.ArgumentParser(description="DeepSeek V3 Pretraining") - parser.add_argument("--tag", type=str, help="Optional experiment tag", required=False, default="") - - # Slurm and executor related - slurm_group = parser.add_argument_group("Slurm executor arguments") - slurm_group.add_argument('--user', type=str, required=True, help="Remote cluster SSH user name") - slurm_group.add_argument("--host", type=str, required=True, help="Remote cluster host address") - slurm_group.add_argument("--job_dir", type=str, required=True, help="Remote job directory") - slurm_group.add_argument("--account", type=str, required=True, help="Account to be used for Slurm job submission") - slurm_group.add_argument("--partition", type=str, required=True, help="Partition to be used for Slurm job submission") - slurm_group.add_argument("--nodes", type=int, required=True, help="Number of nodes to be used") - slurm_group.add_argument("--gpus_per_node", type=int, required=True, help="Number of GPUs per node") - slurm_group.add_argument("--time", type=str, required=True, help="Time limit for the job") - slurm_group.add_argument("--dependencies", nargs="*", help="list of dependencies for the job") - slurm_group.add_argument("--max_retries", type=int, default=0) - slurm_group.add_argument("--run_slurm", action="store_true", help="run in slurm executor instead of locally") - slurm_group.add_argument( - "--mounts", - type=str, - required=True, - help=( - "Custom mount paths, formatted as a string of :[,:], " - "and should contain one path for /output, dataset path: /preproc_data, /npy_index" - )) - slurm_group.add_argument("--envvars", type=str, help="Environment variables to be added", default=None) - slurm_group.add_argument("--image", type=str, required=True, help="Container image path, either remote or local") + + # Cluster arguments (used for logging) + cluster_group = parser.add_argument_group("Cluster arguments") + cluster_group.add_argument("--nodes", type=int, required=True, help="Number of nodes") + cluster_group.add_argument("--gpus_per_node", type=int, required=True, help="Number of GPUs per node") # Model arguments model_group = parser.add_argument_group("Model arguments") @@ -228,35 +202,24 @@ def get_parser() -> argparse.ArgumentParser: model_group.add_argument("--recompute_modules", type=str, help="Recompute modules") model_group.add_argument("--cuda_graph_implementation", type=str, help="CUDA graph implementation") model_group.add_argument("--cuda_graph_scope", type=str, help="CUDA graph scope") - model_group.add_argument("--moe_token_dispatcher_type", type=str, help="MoE token dispatcher type", default="alltoall") - model_group.add_argument("--moe_grouped_gemm", type=bool, help="MoE grouped GEMM", default=True) - model_group.add_argument("--moe_permute_fusion", type=bool, help="MoE permute fusion", default=False) - model_group.add_argument("--moe_router_fusion", type=bool, help="MoE router fusion", default=False) - model_group.add_argument("--moe_router_force_load_balancing", type=bool, help="MoE router force load balancing", default=False) - model_group.add_argument("--sequence_length", type=int, help="Sequence length", default=4096) - + model_group.add_argument("--moe_token_dispatcher_type", type=str, default="alltoall", help="MoE token dispatcher type") + model_group.add_argument("--moe_grouped_gemm", type=bool, default=True, help="MoE grouped GEMM") + model_group.add_argument("--moe_permute_fusion", type=bool, default=False, help="MoE permute fusion") + model_group.add_argument("--moe_router_fusion", type=bool, default=False, help="MoE router fusion") + model_group.add_argument("--sequence_length", type=int, default=4096, help="Sequence length") # Training arguments training_group = parser.add_argument_group("Training arguments") training_group.add_argument("--gbs", type=int, default=1024, help="Global batch size") training_group.add_argument("--mbs", type=int, default=1, help="Micro batch size") - training_group.add_argument("--lr", type=float, default=2e-4, help="Initial learning rate after warmup.") + training_group.add_argument("--lr", type=float, default=2e-4, help="Initial learning rate after warmup") training_group.add_argument("--min_lr", type=float, default=5e-6, help="Minimum learning rate") - training_group.add_argument('--max_steps', type=int, default=1000, help="Maximum number of steps") - training_group.add_argument('--warmup_steps', type=int, default=0, help="Number of steps for LR warmup") + training_group.add_argument("--max_steps", type=int, default=1000, help="Maximum number of steps") + training_group.add_argument("--warmup_steps", type=int, default=0, help="Number of steps for LR warmup") training_group.add_argument("--seed", type=int, default=1234, help="Random seed") training_group.add_argument("--eval_check_interval", type=int, default=10, help="Evaluate every N steps") training_group.add_argument("--eval_batches", type=int, default=1, help="Evaluate N batches") - - - # Experiment management arguments - experiment_group = parser.add_argument_group("Experiment management arguments") - experiment_group.add_argument("--dryrun", action="store_true", help="Whether we are launching dryrun or actual runs") - experiment_group.add_argument("--seeds", type=int, nargs="*", default=[], help="random seeds") - experiment_group.add_argument("--num_exps", type=int, default=1) - experiment_group.add_argument("--num_pars", type=int, default=1) - experiment_group.add_argument("--target_log_ppl", type=float, default=1.0, help="Target log perplexity") - experiment_group.add_argument("--step_time_atol", type=int, default=1600, help="train step time atol") + training_group.add_argument("--target_log_ppl", type=float, default=1.0, help="Target log perplexity") return parser @@ -293,10 +256,6 @@ def __init__(self, args): def main(): """Main entry point for DeepSeek V3 pretraining.""" args = get_parser().parse_args() - - if args.tag and not args.tag.startswith("-"): - args.tag = "-" + args.tag - init_logging() config = create_config(args) cfg = ArgsConfig(args) From b0423ff1fb4e6828a45af629b4638da072fd0d7c Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 11:37:32 +0100 Subject: [PATCH 08/16] add missing pretrain args --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index 8c6da9723..be54a41e0 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -107,7 +107,8 @@ if [ "$DETACH" -gt 0 ]; then fi # Build pretrain arguments -PRETRAIN_ARGS="--tensor_parallel_size $TENSOR_PARALLEL_SIZE" +PRETRAIN_ARGS="--nodes $NNODES --gpus_per_node $GPUS_PER_NODE" +PRETRAIN_ARGS="$PRETRAIN_ARGS --tensor_parallel_size $TENSOR_PARALLEL_SIZE" PRETRAIN_ARGS="$PRETRAIN_ARGS --pipeline_parallel_size $PIPELINE_PARALLEL_SIZE" PRETRAIN_ARGS="$PRETRAIN_ARGS --context_parallel_size $CONTEXT_PARALLEL_SIZE" PRETRAIN_ARGS="$PRETRAIN_ARGS --expert_model_parallel_size $EXPERT_PARALLEL_SIZE" From c59fd892a10b16b57734685f0100561c68e69ee2 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 11:45:41 +0100 Subject: [PATCH 09/16] fix pipeline layout + tokenizer config --- .../nemo/pretrain_deepseek_v3_671b.py | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index 9fd800d92..ac2e8e3b4 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -15,10 +15,11 @@ import argparse import os -from megatron.bridge.recipes.deepseek import deepseek_v3_pretrain_config +from megatron.bridge.recipes.deepseek import deepseek_v3_pretrain_config, set_deepseek_v3_pipeline_model_parallel_layout from megatron.bridge.training.config import GPTDatasetConfig, ConfigContainer from megatron.bridge.training.gpt_step import forward_step from megatron.bridge.training.pretrain import pretrain +from megatron.bridge.training.tokenizers.config import TokenizerConfig from callback import ( MLPerfLoggingCallback, @@ -113,31 +114,23 @@ def log_hyperparams(args, mbridge_config: ConfigContainer): mllogger.event(key=key, value=value) +def get_tokenizer_config(): + return TokenizerConfig( + tokenizer_type="HuggingFaceTokenizer", + tokenizer_model="/tokenizer", + hf_tokenizer_kwargs={"use_fast": True}, + ) + + def create_config(args): """Create the training configuration from arguments.""" - config = deepseek_v3_pretrain_config( - pipeline_model_parallel_size=args.pipeline_parallel_size, - virtual_pipeline_parallel_size=args.virtual_pipeline_parallel_size, - ) + config = deepseek_v3_pretrain_config() # Model parallelism configuration (hardcoded for DeepSeek V3) model_cfg = config.model - model_cfg.tensor_model_parallel_size = args.tensor_parallel_size - model_cfg.context_parallel_size = args.context_parallel_size - model_cfg.expert_model_parallel_size = args.expert_model_parallel_size - model_cfg.expert_tensor_parallel_size = args.expert_tensor_parallel_size - model_cfg.sequence_parallel = args.tensor_parallel_size > 1 - model_cfg.seq_length = args.sequence_length - model_cfg.recompute_modules = args.recompute_modules.split(",") if args.recompute_modules else [] - model_cfg.cuda_graph_implementation = args.cuda_graph_implementation - model_cfg.cuda_graph_scope = args.cuda_graph_scope.split(",") if args.cuda_graph_scope else [] - - # MoE parameters (hardcoded for DeepSeek V3) - model_cfg.moe_token_dispatcher_type = args.moe_token_dispatcher_type - model_cfg.moe_grouped_gemm = args.moe_grouped_gemm - model_cfg.moe_permute_fusion = args.moe_permute_fusion - model_cfg.moe_router_fusion = args.moe_router_fusion - model_cfg.moe_router_force_load_balancing = False + model_cfg.pipeline_model_parallel_size = args.pipeline_parallel_size + model_cfg.virtual_pipeline_model_parallel_size = args.virtual_pipeline_parallel_size + set_deepseek_v3_pipeline_model_parallel_layout(model_cfg) # Training configuration train_cfg = config.train @@ -168,6 +161,9 @@ def create_config(args): seed=args.seed, ) + # Tokenizer configuration + config.tokenizer = get_tokenizer_config() + # Checkpoint configuration checkpoint_cfg = config.checkpoint checkpoint_cfg.load = "/checkpoint" From 48fd489d5baebdd7d0177bfa3668be24f56ab31a Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 13:04:54 +0100 Subject: [PATCH 10/16] fix import --- moe_pretraining/nemo/pretrain_deepseek_v3_671b.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index ac2e8e3b4..83fb3ed4d 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -15,7 +15,7 @@ import argparse import os -from megatron.bridge.recipes.deepseek import deepseek_v3_pretrain_config, set_deepseek_v3_pipeline_model_parallel_layout +from megatron.bridge.recipes.deepseek.deepseek_v3 import deepseek_v3_pretrain_config, set_deepseek_v3_pipeline_model_parallel_layout from megatron.bridge.training.config import GPTDatasetConfig, ConfigContainer from megatron.bridge.training.gpt_step import forward_step from megatron.bridge.training.pretrain import pretrain From 55ac23c02d7cd698bce008e43740fbdbee7111d6 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 13:35:37 +0100 Subject: [PATCH 11/16] remove force_success_status var --- moe_pretraining/nemo/callback.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/moe_pretraining/nemo/callback.py b/moe_pretraining/nemo/callback.py index f1d9e7abd..6a7478f0f 100644 --- a/moe_pretraining/nemo/callback.py +++ b/moe_pretraining/nemo/callback.py @@ -368,7 +368,6 @@ def __init__(self, cfg): self.global_batch_size = self.cfg.model.global_batch_size self.train_block_started = True self.train_current_block = 0 - self.force_success = cfg.custom.force_success_status self.previous_step = 0 def on_train_start( @@ -462,10 +461,9 @@ def on_validation_end( ) elif global_state.train_state.step >= self.cfg.trainer.max_steps: global_state.should_stop = True - status = "success" if self.force_success else "aborted" mllogger.end( mllogger.constants.RUN_STOP, - metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": status}, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "aborted"}, ) if not os.environ.get("VAL_CHECK_INTERVAL"): global_state.cfg.train.eval_interval = self.cfg.default_val_check_interval @@ -491,11 +489,10 @@ def on_train_batch_end( eval_after_this_step = step % global_state.cfg.train.eval_interval == 0 if last_step and not eval_after_this_step: samples_count = self._get_samples_count(global_state) - status = "success" if self.force_success else "aborted" self._end_train_block(global_state) mllogger.end( mllogger.constants.RUN_STOP, - metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": status}, + metadata={mllogger.constants.SAMPLES_COUNT: samples_count, "status": "aborted"}, ) self.train_block_started = False global_state.should_stop = True From 0932bad196c8acb3bcbcf2626cfddf3e857d32f4 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 13:48:25 +0100 Subject: [PATCH 12/16] fix args --- .../nemo/pretrain_deepseek_v3_671b.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index 83fb3ed4d..f4b3b09f4 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -132,6 +132,26 @@ def create_config(args): model_cfg.virtual_pipeline_model_parallel_size = args.virtual_pipeline_parallel_size set_deepseek_v3_pipeline_model_parallel_layout(model_cfg) + model_cfg.tensor_model_parallel_size = args.tensor_parallel_size + model_cfg.context_parallel_size = cfg.model.context_parallel_size + model_cfg.expert_model_parallel_size = args.expert_model_parallel_size + model_cfg.expert_tensor_parallel_size = args.expert_tensor_parallel_size + model_cfg.sequence_parallel = args.tensor_parallel_size > 1 + model_cfg.seq_length = args.sequence_length + model_cfg.recompute_modules = args.recompute_modules.split(",") if args.recompute_modules else [] + if args.cuda_graph_implementation: + model_cfg.cuda_graph_implementation = args.cuda_graph_implementation + if args.cuda_graph_scope: + model_cfg.cuda_graph_scope = args.cuda_graph_scope + # MoE params + model_cfg.moe_token_dispatcher_type = args.moe_token_dispatcher_type + model_cfg.moe_grouped_gemm = args.moe_grouped_gemm + model_cfg.moe_permute_fusion = args.moe_permute_fusion + model_cfg.moe_router_fusion = args.moe_router_fusion + model_cfg.moe_router_force_load_balancing = False + + + # Training configuration train_cfg = config.train train_cfg.micro_batch_size = args.mbs From 99df10116a38f75c4170ff9777bac31db2266b56 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 14:37:39 +0100 Subject: [PATCH 13/16] fix args --- moe_pretraining/nemo/pretrain_deepseek_v3_671b.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index f4b3b09f4..0902e417c 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -133,7 +133,7 @@ def create_config(args): set_deepseek_v3_pipeline_model_parallel_layout(model_cfg) model_cfg.tensor_model_parallel_size = args.tensor_parallel_size - model_cfg.context_parallel_size = cfg.model.context_parallel_size + model_cfg.context_parallel_size = args.context_parallel_size model_cfg.expert_model_parallel_size = args.expert_model_parallel_size model_cfg.expert_tensor_parallel_size = args.expert_tensor_parallel_size model_cfg.sequence_parallel = args.tensor_parallel_size > 1 From 6a458411bf58ed35ba75c45c24b50429bde5bf88 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 15:27:48 +0100 Subject: [PATCH 14/16] use 8b data paths --- moe_pretraining/nemo/pretrain_deepseek_v3_671b.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py index 0902e417c..a34a8017e 100644 --- a/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py +++ b/moe_pretraining/nemo/pretrain_deepseek_v3_671b.py @@ -51,8 +51,8 @@ def get_data(seq_length: int, seed): """Configure dataset paths and parameters.""" dataset_path = os.getenv("PREPROCESSED_PATH", "/preproc_data") val_test_path = f"{dataset_path}/c4-validation-91205-samples.en_text_document" - train_datasets = [f"{dataset_path}/c4-train.en_{idx}_text_document" for idx in [6, 7]] - train_datasets_weights = [50] * 2 + train_datasets = [f"{dataset_path}/c4-train.en_{idx}_text_document" for idx in [6]] + train_datasets_weights = [50] * len(train_datasets) data_paths = [ (train_datasets, train_datasets_weights), From f166413330ae9b332c6aad4c1d1e2bc2695b0a5f Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 15:59:13 +0100 Subject: [PATCH 15/16] use TMP_NPY_INDEX --- moe_pretraining/nemo/run_deepseek_v3_671b.sh | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/moe_pretraining/nemo/run_deepseek_v3_671b.sh b/moe_pretraining/nemo/run_deepseek_v3_671b.sh index be54a41e0..66e67bf28 100644 --- a/moe_pretraining/nemo/run_deepseek_v3_671b.sh +++ b/moe_pretraining/nemo/run_deepseek_v3_671b.sh @@ -39,8 +39,6 @@ set -e : "${GPUS_PER_NODE:=4}" : "${GPU:="gb300"}" -# Job settings -: "${TMP_NPY_INDEX:=""}" # Model settings : "${GBS:=2048}" @@ -80,10 +78,9 @@ set -e # Build mounts MOUNTS="${LOG_DIR}:/output,${LOG_DIR}:/mlperf-outputs,${DATA_DIR}:/preproc_data,${MODEL_CKPT}:/checkpoint,${DATA_DIR}/tokenizer:/tokenizer" -if [ -n "$TMP_NPY_INDEX" ]; then - MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" -fi - +TMP_NPY_INDEX="$LOG_DIR/npy_index" +mkdir -p "$TMP_NPY_INDEX" +MOUNTS="${MOUNTS},${TMP_NPY_INDEX}:/npy_index" # Build launcher arguments LAUNCHER_ARGS="--account $ACCOUNT --partition $PARTITION" From ea2b48f969f2848251a62e7b11d20212be7a4db5 Mon Sep 17 00:00:00 2001 From: Denys Fridman Date: Wed, 21 Jan 2026 16:53:51 +0100 Subject: [PATCH 16/16] update config --- .../nemo/config_GB300_64x4x32xtp1pp4cp1.sh | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh index b07be7399..e340cce45 100644 --- a/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh +++ b/moe_pretraining/nemo/config_GB300_64x4x32xtp1pp4cp1.sh @@ -12,43 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -# SSH: username that connects to the remote cluster -export USER="DUMMY" -# SSH: remote cluster URL -export HOST="DUMMY" +# SLURM: Username on a cluster +export USER="" # Slurm: account for job submission -export ACCOUNT="DUMMY" +export ACCOUNT="" # Slurm: partition for job submission -export PARTITION="DUMMY" +export PARTITION="" # Slurm: job time limit, defaults to 8 hours -export TIME="08:00:00" -# Slurm: --nodes arguments, default to use 288 nodes +export TIME="02:00:00" +# Slurm: --nodes arguments export NNODES=64 -# Slurm: --gpus_per_node and --ntasks_per_node argument, defaults to 8 GPUs per node +# Slurm: --gpus_per_node and --ntasks_per_node argument export GPUS_PER_NODE=4 -# Slurm: max job retries for transient job failures, defaults to retry 3 times +# Slurm: max job retries for transient job failures export MAX_RETRIES=1 # Folder mapping: # Output directory that holds logs, any path that you like. -export JOB_DIR="/workspace/code/logs" +export LOG_DIR="" # Image / container path, either local cache file or remote URL -export IMAGE="DUMMY" +export IMAGE="" # Dataset: C4 dataset location that contains the dataset after preprocessing -# export ORIGINAL_C4_PATH="/data/data/C4" - -# This corresponds to the PREPROCESSED_PATH in README section 3's dataset download part -export PREPROCESSED_PATH="/data/deepseek_v3_671b/data/C4_processed" -export MERGED_C4_PATH="/data/deepseek_v3_671b/data/C4_merged" -# Dataset: Numpy index working directory, contains shuffled dataset -# This path must be able to hold >400GB data -export TMP_NPY_INDEX="/data/npy_indices" -# Dataset: Tokenizer path -# This corresponds to the TOKENIZER_PATH in README section 3's tokenizer download part -export TOKENIZER_PATH="/data/deepseek_v3_671b/model/DeepSeek-V3-671B-Base" -# export TOKENIZER_PATH="/data/llama3_405b_ref/tokenizer" - -export MODEL_CKPT="$TOKENIZER_PATH" +export DATA_DIR="" +# Model checkpoint path +export MODEL_CKPT="" # Training Configs: # Dataloader: Global batch size @@ -66,7 +53,7 @@ export PIPELINE_PARALLEL_SIZE=4 export CONTEXT_PARALLEL_SIZE=1 export EXPERT_PARALLEL_SIZE=64 export EXPERT_TENSOR_PARALLEL_SIZE=1 -export RECOMPUTE_MODULES="mlp,moe_act" +export RECOMPUTE_MODULES="mlp,moe" export CUDA_GRAPH_IMPLEMENTATION="transformer_engine" export CUDA_GRAPH_SCOPE="attn"