-
Notifications
You must be signed in to change notification settings - Fork 78
Open
Labels
Description
Set up-
Running NeMo in EKS Cluster with 2 g6e.12xl instances (8 L402 gpus):
`import nemo_run as run
import json
import argparse
import os
from datetime import datetime
from functools import partial
from typing import Any, Optional
import lightning.pytorch as pl
from nemo.collections import llm
from nemo.utils import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from custom_data_module import CustomDataModule
from multiprocessing import Process, set_start_method
from nemo.collections.llm import api
def configure_finetune_recipe(
exp_name=None,
work_dir=None,
peft_scheme=None,
lora_enabled=None,
max_steps=None,
nodes=None,
gpu_devices=None,
seq_length=None,
micro_batch_size=None,
global_batch_size=None,
):
finetune_recipe = llm.llama31_8b.finetune_recipe(
num_nodes=nodes,
name=exp_name,
dir=work_dir,
peft_scheme=peft_scheme
)
finetune_recipe.resume.restore_config.path = "/data/fsx/nemo/cache/meta-llama/Llama-3.1-8B"
finetune_recipe.trainer.devices = gpu_devices
finetune_recipe.trainer.num_sanity_val_steps = 0
finetune_recipe.trainer.max_steps = max_steps
finetune_recipe.trainer.strategy.context_parallel_size = 1
finetune_recipe.trainer.val_check_interval = 10
finetune_recipe.data = custom_dataset(
seq_length=seq_length,
micro_batch_size=micro_batch_size,
global_batch_size=global_batch_size
)
if lora_enabled:
finetune_recipe.trainer.strategy.ddp = "megatron"
return finetune_recipe
def skypilot_executor(
nodes: int,
pvc_mount: str,
gpu_devices: int,
gpus: str = "L40S",
efa_devices: Optional[int] = None,
custom_mounts: Optional[dict[str, str]] = None,
container_image: str = "nvcr.io/nvidia/nemo:25.04.01",
env_vars_file: str = "env_vars.json",
pvc_name: str = "fsx-claim",
lora_enabled: bool = False,
memory: int = 180
) -> run.SkypilotExecutor:
mounts = {}
# Custom mounts are defined here.
if custom_mounts:
for k, v in custom_mounts.items():
mounts[k] = v
# Env vars for jobs are configured here
with open(env_vars_file, 'r') as f:
env_vars = json.load(f)
packager = run.GitArchivePackager()
shared_pod_config = {
"kubernetes": {
"pod_config": {
"spec": {
"serviceAccountName": "default",
"securityContext": {
"runAsUser": 0,
"fsGroup": 0
},
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "nvidia.com/gpu.product",
"operator": "In",
"values": ["NVIDIA-L40S"]
}]
}]
}
}
},
"containers": [{
"volumeMounts": [
{"name": "nemo-runs", "mountPath": pvc_mount},
]
}],
"volumes": [{
"name": "nemo-runs",
"persistentVolumeClaim": {"claimName": pvc_name}
}]
}
}
}
}
if efa_devices is not None:
shared_pod_config["kubernetes"]["pod_config"]["spec"]["containers"][0]["resources"] = {
"requests": {
"vpc.amazonaws.com/efa": efa_devices
},
"limits": {
"vpc.amazonaws.com/efa": efa_devices
}
}
# This defines the skypilot executor.
executor = run.SkypilotExecutor(
cloud="kubernetes",
gpus=gpus,
gpus_per_node=gpu_devices,
num_nodes=nodes,
packager=packager,
cluster_config_overrides=shared_pod_config,
memory=memory
)
executor.container_image = container_image
executor.file_mounts = mounts
executor.env_vars = env_vars
executor.env_vars["NEMORUN_HOME"] = pvc_mount
executor.env_vars["NEMO_HOME"] = f"{pvc_mount}/fsx/nemo"
executor.env_vars["NEMO_MODELS_CACHE"] = f"{pvc_mount}/fsx/nemo/cache"
executor.env_vars["HF_HOME"] = f"{pvc_mount}/fsx/huggingface"
executor.env_vars["HF_HUB_CACHE"] = f"{pvc_mount}/fsx/huggingface/hub"
executor.env_vars["TRANSFORMERS_CACHE"] = f"{pvc_mount}/fsx/huggingface/transformers"
executor.env_vars["NEMO_DISABLE_LOCK"] = "1"
return executor
if __name__ == "__main__":
pvc_mount = "/data"
# LoRA is enabled by default, disabled if --disable_lora flag is present
lora_enabled = True
if lora_enabled:
peft_scheme = "lora"
exp_name = f"aws-nemo2-lora-finetune-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
else:
peft_scheme = None
exp_name = f"aws-nemo2-finetune-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
s3_out_dir = f"{pvc_mount}/experiments/{exp_name}"
import_ckpt = configure_checkpoint_conversion()
finetune_recipe = configure_finetune_recipe(
exp_name=exp_name,
work_dir=pvc_mount,
peft_scheme=peft_scheme,
lora_enabled=lora_enabled,
max_steps=200,
nodes=2,
gpu_devices=4,
seq_length=2048,
micro_batch_size=1,
global_batch_size=8
)
executor = skypilot_executor(
nodes=2,
gpus="l40s",
gpu_devices=4,
efa_devices=1,
container_image="########.dkr.ecr.us-east-1.amazonaws.com/aws-nemo:25.04.01",
custom_mounts={
"/root/nemo": "."
},
env_vars_file="env_vars.json",
pvc_name="fsx-claim",
pvc_mount=pvc_mount,
lora_enabled=lora_enabled,
memory=180
)
# Set launcher based on number of nodes
# if args.nodes > 1:
executor.launcher = "torchrun"
# executor.env_vars.update({
# "FI_PROVIDER": "efa",
# "NCCL_PROTO": "simple",
# "NCCL_DEBUG": "INFO"
# })
executor.env_vars.update({
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": "eth0",
"NCCL_P2P_DISABLE": "0",
"NCCL_NET_GDR_LEVEL": "0",
"FI_PROVIDER": "efa",
})
# Set up the executor for the checkpoint conversion
import_executor = executor.clone()
with run.Experiment(exp_name, log_level="INFO") as exp:
# exp.add(import_ckpt, executor=import_executor, name="checkpoint_conversion")
exp.add(
finetune_recipe,
executor=executor,
tail_logs=True,
name="finetuning"
)
local_executor = run.LocalExecutor()
# Run the experiment
exp.run(sequential=True, detach=True)
`
Error-
(head, rank=0, pid=3139) [default0]:Traceback (most recent call last):
(head, rank=0, pid=3139) [default0]: File "<frozen runpy>", line 198, in _run_module_as_main
(head, rank=0, pid=3139) [default0]: File "<frozen runpy>", line 88, in _run_code
(head, rank=0, pid=3139) [default0]: File "/opt/NeMo-Run/nemo_run/core/runners/fdl_runner.py", line 72, in <module>
(head, rank=0, pid=3139) [default0]: fdl_runner_app()
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/typer/main.py", line 339, in __call__
(head, rank=0, pid=3139) [default0]: raise e
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/typer/main.py", line 322, in __call__
(head, rank=0, pid=3139) [default0]: return get_command(self)(*args, **kwargs)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 1161, in __call__
(head, rank=0, pid=3139) [default0]: return self.main(*args, **kwargs)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/typer/core.py", line 677, in main
(head, rank=0, pid=3139) [default0]: return _main(
(head, rank=0, pid=3139) [default0]: ^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/typer/core.py", line 195, in _main
(head, rank=0, pid=3139) [default0]: rv = self.invoke(ctx)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 1443, in invoke
(head, rank=0, pid=3139) [default0]: return ctx.invoke(self.callback, **ctx.params)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/click/core.py", line 788, in invoke
(head, rank=0, pid=3139) [default0]: return __callback(*args, **kwargs)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/usr/local/lib/python3.12/dist-packages/typer/main.py", line 697, in wrapper
(head, rank=0, pid=3139) [default0]: return callback(**use_params)
(head, rank=0, pid=3139) [default0]: ^^^^^^^^^^^^^^^^^^^^^^
(head, rank=0, pid=3139) [default0]: File "/opt/NeMo-Run/nemo_run/core/runners/fdl_runner.py", line 68, in fdl_direct_run
(head, rank=0, pid=3139) [default0]: fdl_fn()
(head, rank=0, pid=3139) [default0]:TypeError: finetune() got an unexpected keyword argument 'tokenizer'