diff --git a/.github/workflows/gaudi1.yml b/.github/workflows/gaudi1.yml new file mode 100644 index 00000000000..197d4dd24ab --- /dev/null +++ b/.github/workflows/gaudi1.yml @@ -0,0 +1,77 @@ +name: Gaudi1 tests (scheduled) + +on: + workflow_dispatch: + schedule: + - cron: "0 2 * * *" + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +jobs: + run_gaudi1_tests: + name: Test on Gaudi1 + runs-on: + group: aws-dl1-24xlarge + + container: + image: docker://vault.habana.ai/gaudi-docker/1.20.0/ubuntu22.04/habanalabs/pytorch-installer-2.6.0:latest + options: --runtime=habana --shm-size=64G --cap-add=sys_nice --env HABANA_VISIBLE_DEVICES=0,1 + env: + OMPI_MCA_btl_vader_single_copy_mechanism: none + PT_ENABLE_INT64_SUPPORT: 1 + PT_HPU_LAZY_MODE: 0 + RUN_SLOW: 1 + + steps: + - name: HL-SMI (1) + run: | + hl-smi + echo "HABANA_VISIBLE_DEVICES=${HABANA_VISIBLE_DEVICES}" + echo "HABANA_VISIBLE_MODULES=${HABANA_VISIBLE_MODULES}" + + - name: Extract HPU visible modules + id: add-modules + run: | + export HABANA_VISIBLE_MODULES=$(hl-smi -Q module_id -f csv,noheader | tr '\n' ',' | sed 's/,$//') + echo "HABANA_VISIBLE_MODULES=${HABANA_VISIBLE_MODULES}" >> $GITHUB_ENV + + - name: HL-SMI (2) + run: | + hl-smi + echo "HABANA_VISIBLE_DEVICES=${HABANA_VISIBLE_DEVICES}" + echo "HABANA_VISIBLE_MODULES=${HABANA_VISIBLE_MODULES}" + + - name: Checkout to Accelerate + uses: actions/checkout@v4 + + - name: Install Accelerate with Transformers & DeepSpeed + run: | + pip install -e .[testing] \ + git+https://github.com/HabanaAI/DeepSpeed.git@1.20.0 \ + git+https://github.com/huggingface/transformers.git@hpu-support + + - name: Run CLI tests + run: | + make test_cli + + - name: Run Core tests + run: | + make test_core + + - name: Run Big Modeling tests + run: | + make test_big_modeling + + - name: Run FSDP integration tests + run: | + make test_fsdp + + - name: Run DeepSpeed integration tests + run: | + make test_deepspeed + + - name: Run Examples tests + run: | + make test_examples diff --git a/Makefile b/Makefile index f3876138519..54321c246f1 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ test_big_modeling: test_core: python -m pytest -s -v ./tests/ --ignore=./tests/test_examples.py --ignore=./tests/deepspeed --ignore=./tests/test_big_modeling.py \ - --ignore=./tests/fsdp --ignore=./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",) + --ignore=./tests/fsdp --ignore=./tests/tp --ignore=./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_core.log",) test_cli: python -m pytest -s -v ./tests/test_cli.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_cli.log",) @@ -39,6 +39,9 @@ test_deepspeed: test_fsdp: python -m pytest -s -v ./tests/fsdp $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_fsdp.log",) +test_tp: + python -m pytest -s -v ./tests/tp $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_tp.log",) + # Since the new version of pytest will *change* how things are collected, we need `deepspeed` to # run after test_core and test_cli test: @@ -47,13 +50,14 @@ test: $(MAKE) test_big_modeling $(MAKE) test_deepspeed $(MAKE) test_fsdp + $(MAKE) test_tp test_examples: python -m pytest -s -v ./tests/test_examples.py $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_examples.log",) # Broken down example tests for the CI runners test_integrations: - python -m pytest -s -v ./tests/deepspeed ./tests/fsdp $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_integrations.log",) + python -m pytest -s -v ./tests/deepspeed ./tests/fsdp ./tests/tp $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_integrations.log",) test_example_differences: python -m pytest -s -v ./tests/test_examples.py::ExampleDifferenceTests $(if $(IS_GITHUB_CI),--report-log "$(PYTORCH_VERSION)_example_diff.log",) diff --git a/examples/inference/distributed/stable_diffusion.py b/examples/inference/distributed/stable_diffusion.py index 0c5e6579b94..3c5ccd63306 100644 --- a/examples/inference/distributed/stable_diffusion.py +++ b/examples/inference/distributed/stable_diffusion.py @@ -18,7 +18,7 @@ from accelerate import PartialState # Can also be Accelerator or AcceleratorState -pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16) +pipe = DiffusionPipeline.from_pretrained("stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16) distributed_state = PartialState() pipe.to(distributed_state.device) diff --git a/examples/inference/pippy/bert.py b/examples/inference/pippy/bert.py index 474409f5d0f..f19cb03723d 100644 --- a/examples/inference/pippy/bert.py +++ b/examples/inference/pippy/bert.py @@ -17,9 +17,15 @@ from transformers import AutoModelForMaskedLM from accelerate import PartialState, prepare_pippy +from accelerate.test_utils import torch_device from accelerate.utils import set_seed +if torch_device == "hpu": + synchronize_func = torch.hpu.synchronize +else: + synchronize_func = torch.cuda.synchronize + # Set the random seed to have reproducable outputs set_seed(42) @@ -60,25 +66,25 @@ ) # Move the inputs to the first device -input = input.to("cuda:0") +input = input.to(torch_device) # Take an average of 5 times # Measure first batch -torch.cuda.synchronize() +synchronize_func() start_time = time.time() with torch.no_grad(): output = model(input) -torch.cuda.synchronize() +synchronize_func() end_time = time.time() first_batch = end_time - start_time -# Now that CUDA is init, measure after -torch.cuda.synchronize() +# Now that hpu is init, measure after +synchronize_func() start_time = time.time() for i in range(5): with torch.no_grad(): output = model(input) -torch.cuda.synchronize() +synchronize_func() end_time = time.time() # The outputs are only on the final process by default diff --git a/examples/inference/pippy/gpt2.py b/examples/inference/pippy/gpt2.py index d1f232b51de..20aed845586 100644 --- a/examples/inference/pippy/gpt2.py +++ b/examples/inference/pippy/gpt2.py @@ -17,9 +17,16 @@ from transformers import AutoModelForSequenceClassification from accelerate import PartialState, prepare_pippy +from accelerate.test_utils import torch_device from accelerate.utils import set_seed +if torch_device == "hpu": + synchronize_func = torch.hpu.synchronize +else: + synchronize_func = torch.cuda.synchronize + + # Set the random seed to have reproducable outputs set_seed(42) @@ -59,25 +66,25 @@ ) # Move the inputs to the first device -input = input.to("cuda:0") +input = input.to(torch_device) # Take an average of 5 times # Measure first batch -torch.cuda.synchronize() +synchronize_func() start_time = time.time() with torch.no_grad(): output = model(input) -torch.cuda.synchronize() +synchronize_func() end_time = time.time() first_batch = end_time - start_time -# Now that CUDA is init, measure after -torch.cuda.synchronize() +# Now that device/backend is init, measure after +synchronize_func() start_time = time.time() for i in range(5): with torch.no_grad(): output = model(input) -torch.cuda.synchronize() +synchronize_func() end_time = time.time() # The outputs are only on the final process by default diff --git a/setup.py b/setup.py index efeecd47feb..5a56cf810be 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ "ruff ~= 0.6.4", ] extras["docs"] = [] -extras["test_prod"] = ["pytest>=7.2.0,<=8.0.0", "pytest-xdist", "pytest-subtests", "parameterized"] +extras["test_prod"] = ["pytest>=7.2.0,<=8.0.0", "pytest-xdist", "pytest-subtests", "parameterized", "pytest-order"] extras["test_dev"] = [ "datasets", "diffusers", diff --git a/src/accelerate/accelerator.py b/src/accelerate/accelerator.py index 1abcee49345..b213f34e49d 100755 --- a/src/accelerate/accelerator.py +++ b/src/accelerate/accelerator.py @@ -174,7 +174,7 @@ class Accelerator: """ - Creates an instance of an accelerator for distributed training (on multi-GPU, TPU) or mixed precision training. + Creates an instance of an accelerator for distributed training or mixed precision training. Args: device_placement (`bool`, *optional*, defaults to `True`): @@ -534,9 +534,16 @@ def __init__( and self.distributed_type not in (DistributedType.DEEPSPEED, DistributedType.MEGATRON_LM) ): self.native_amp = True - if self.device.type not in ("xpu", "cuda", "npu", "xla", "mlu", "musa", "sdaa") or is_torch_xla_available( - check_is_tpu=True - ): + if self.device.type not in ( + "xpu", + "cuda", + "npu", + "xla", + "mlu", + "musa", + "hpu", + "sdaa", + ) or is_torch_xla_available(check_is_tpu=True): raise ValueError(f"fp16 mixed precision requires a GPU (not {self.device.type!r}).") kwargs = self.scaler_handler.to_kwargs() if self.scaler_handler is not None else {} self.scaler = get_grad_scaler(self.distributed_type, **kwargs) @@ -545,7 +552,7 @@ def __init__( DistributedType.DEEPSPEED, DistributedType.MEGATRON_LM, ): - if self.device.type in ["cpu", "xpu"]: + if self.device.type in ["cpu", "xpu", "hpu"]: self.native_amp = True else: self.native_amp = is_bf16_available(True) @@ -1202,6 +1209,7 @@ def join_uneven_inputs(self, joinables, even_batches=None): DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, DistributedType.MULTI_XPU, + DistributedType.MULTI_HPU, ): dl_even_batches_values = [] @@ -1437,6 +1445,7 @@ def prepare_model(self, model: torch.nn.Module, device_placement: bool = None, e """ if device_placement is None: device_placement = self.device_placement and self.distributed_type != DistributedType.FSDP + self._models.append(model) # TODO: Look at enabling native TP training directly with a proper config @@ -1515,12 +1524,16 @@ def prepare_model(self, model: torch.nn.Module, device_placement: bool = None, e DistributedType.MULTI_MUSA, DistributedType.MULTI_NPU, DistributedType.MULTI_XPU, + DistributedType.MULTI_HPU, ): if any(p.requires_grad for p in model.parameters()): kwargs = self.ddp_handler.to_kwargs() if self.ddp_handler is not None else {} # TODO: Look at enabling native TP training directly with a proper config if os.environ.get("ACCELERATE_BYPASS_DEVICE_MAP", "false") != "true": - device_ids, output_device = [self.local_process_index], self.local_process_index + if self.device.type == "hpu": + device_ids, output_device = [self.device.index], self.device.index + else: + device_ids, output_device = [self.local_process_index], self.local_process_index else: device_ids, output_device = None, None @@ -1920,13 +1933,25 @@ def _prepare_deepspeed(self, *args): if self.deepspeed_config["zero_optimization"].get("offload_optimizer", {}).get( "device", "none" ) != "none" and self.deepspeed_config.get("zero_force_ds_cpu_optimizer", True): + if self.device.type == "hpu" and os.environ.get("PT_HPU_LAZY_MODE", "1") == "1": + raise ValueError( + "You can't use an Offload Optimizer with HPU in Lazy Mode. " + "Please set the environment variable `PT_HPU_LAZY_MODE` to `0`." + ) + optimizer = map_pytorch_optim_to_deepspeed(optimizer) kwargs["optimizer"] = optimizer if scheduler is not None: if type(scheduler).__name__ in deepspeed.runtime.lr_schedules.VALID_LR_SCHEDULES: kwargs["lr_scheduler"] = scheduler + if self.device.type == "hpu": + # This env variable is initialized here to make sure it is set to "true" + # It should be done by the launcher but it does not work for multi-node runs + os.environ["DEEPSPEED_USE_HPU"] = "true" + engine, optimizer, _, lr_scheduler = ds_initialize(**kwargs) + if compare_versions("deepspeed", ">=", "0.14.4") and self.state.dynamo_plugin.backend != DynamoBackend.NO: compile_kwargs = self.state.dynamo_plugin.to_kwargs() engine.compile(backend=compile_kwargs.pop("backend"), compile_kwargs=compile_kwargs) @@ -3318,6 +3343,7 @@ def _inner(folder): DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, DistributedType.MULTI_NPU, + DistributedType.MULTI_HPU, ): map_location = "on_device" else: diff --git a/src/accelerate/checkpointing.py b/src/accelerate/checkpointing.py index 34cdae7c5b3..f8ddb866b59 100644 --- a/src/accelerate/checkpointing.py +++ b/src/accelerate/checkpointing.py @@ -32,6 +32,8 @@ SCHEDULER_NAME, WEIGHTS_NAME, get_pretty_name, + is_cuda_available, + is_hpu_available, is_mlu_available, is_musa_available, is_sdaa_available, @@ -158,7 +160,9 @@ def save_accelerator_state( states["torch_sdaa_manual_seed"] = torch.sdaa.get_rng_state_all() elif is_musa_available(): states["torch_musa_manual_seed"] = torch.musa.get_rng_state_all() - else: + if is_hpu_available(): + states["torch_hpu_manual_seed"] = torch.hpu.get_rng_state_all() + if is_cuda_available(): states["torch_cuda_manual_seed"] = torch.cuda.get_rng_state_all() if is_torch_xla_available(): states["xm_seed"] = xm.get_rng_state() diff --git a/src/accelerate/commands/config/default.py b/src/accelerate/commands/config/default.py index 0ef8ceaddf3..596ddcd171f 100644 --- a/src/accelerate/commands/config/default.py +++ b/src/accelerate/commands/config/default.py @@ -18,7 +18,14 @@ import torch -from ...utils import is_mlu_available, is_musa_available, is_npu_available, is_sdaa_available, is_xpu_available +from ...utils import ( + is_hpu_available, + is_mlu_available, + is_musa_available, + is_npu_available, + is_sdaa_available, + is_xpu_available, +) from .config_args import ClusterConfig, default_json_config_file from .config_utils import SubcommandHelpFormatter @@ -81,6 +88,14 @@ def write_basic_config(mixed_precision="no", save_location: str = default_json_c config["distributed_type"] = "MULTI_MUSA" else: config["distributed_type"] = "NO" + elif is_hpu_available(): + num_hpus = torch.hpu.device_count() + config["num_processes"] = num_hpus + config["use_cpu"] = False + if num_hpus > 1: + config["distributed_type"] = "MULTI_HPU" + else: + config["distributed_type"] = "NO" elif torch.cuda.is_available(): num_gpus = torch.cuda.device_count() config["num_processes"] = num_gpus diff --git a/src/accelerate/commands/launch.py b/src/accelerate/commands/launch.py index c460ea3ffb5..14437da3d3c 100644 --- a/src/accelerate/commands/launch.py +++ b/src/accelerate/commands/launch.py @@ -39,6 +39,7 @@ convert_dict_to_env_variables, is_bf16_available, is_deepspeed_available, + is_hpu_available, is_mlu_available, is_musa_available, is_npu_available, @@ -1019,6 +1020,7 @@ def _validate_launch_command(args): DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, DistributedType.MULTI_XPU, + DistributedType.MULTI_HPU, ) else False ) @@ -1098,6 +1100,8 @@ def _validate_launch_command(args): args.num_processes = torch.musa.device_count() elif is_npu_available(): args.num_processes = torch.npu.device_count() + elif is_hpu_available(): + args.num_processes = torch.hpu.device_count() else: args.num_processes = torch.cuda.device_count() warned.append(f"\t`--num_processes` was set to a value of `{args.num_processes}`") @@ -1107,12 +1111,13 @@ def _validate_launch_command(args): not args.multi_gpu and args.num_processes > 1 and ( - (args.use_xpu and is_xpu_available() and torch.xpu.device_count() > 1) + (is_xpu_available() and torch.xpu.device_count() > 1) + or (is_npu_available() and torch.npu.device_count() > 1) + or (is_hpu_available() and torch.hpu.device_count() > 1) or (is_mlu_available() and torch.mlu.device_count() > 1) or (is_sdaa_available() and torch.sdaa.device_count() > 1) or (is_musa_available() and torch.musa.device_count() > 1) - or (is_npu_available() and torch.npu.device_count() > 1) - or (torch.cuda.device_count() > 1) + or (torch.cuda.is_available() and torch.cuda.device_count() > 1) ) ): warned.append( diff --git a/src/accelerate/local_sgd.py b/src/accelerate/local_sgd.py index 263e0d15d0b..40b198d46aa 100644 --- a/src/accelerate/local_sgd.py +++ b/src/accelerate/local_sgd.py @@ -71,6 +71,7 @@ def __init__(self, accelerator: Accelerator, model: torch.nn.Module, local_sgd_s DistributedType.MULTI_GPU, DistributedType.MULTI_XPU, DistributedType.MULTI_MLU, + DistributedType.MULTI_HPU, DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, DistributedType.MULTI_NPU, diff --git a/src/accelerate/state.py b/src/accelerate/state.py index bb136f971e9..f233aa052e0 100644 --- a/src/accelerate/state.py +++ b/src/accelerate/state.py @@ -29,8 +29,8 @@ DistributedType, DynamoBackend, GradientAccumulationPlugin, + check_cuda_fp8_capability, check_cuda_p2p_ib_support, - check_fp8_capability, deepspeed_required, get_ccl_version, get_cpu_distributed_information, @@ -39,6 +39,8 @@ is_datasets_available, is_deepspeed_available, is_fp8_available, + is_habana_gaudi1, + is_hpu_available, is_ipex_available, is_mlu_available, is_mps_available, @@ -69,6 +71,7 @@ if is_npu_available(check_device=False): import torch_npu # noqa: F401 + logger = logging.getLogger(__name__) @@ -221,6 +224,7 @@ def __init__(self, cpu: bool = False, **kwargs): local_rank = os.environ.get("LOCAL_RANK", -1) torch.sdaa.set_device(f"sdaa:{local_rank}") torch.distributed.init_process_group(backend=self.backend, **kwargs) + # XPU and CPU require special env configs to be set if self.distributed_type in (DistributedType.MULTI_XPU, DistributedType.MULTI_CPU): dist_information = get_cpu_distributed_information() @@ -301,6 +305,7 @@ def __init__(self, cpu: bool = False, **kwargs): 'Please set `NCCL_P2P_DISABLE="1"` and `NCCL_IB_DISABLE="1" or use `accelerate launch` which ' "will do this automatically." ) + # Important: This should be the *only* code outside of `self.initialized!` self.fork_launched = parse_flag_from_env("FORK_LAUNCHED", 0) @@ -381,6 +386,7 @@ def wait_for_everyone(self): DistributedType.MULTI_NPU, DistributedType.MULTI_XPU, DistributedType.MULTI_CPU, + DistributedType.MULTI_HPU, DistributedType.DEEPSPEED, DistributedType.FSDP, ): @@ -700,6 +706,7 @@ def default_device(self) -> torch.device: - SDAA if `is_sdaa_available()` - MUSA if `is_musa_available()` - NPU if `is_npu_available()` + - HPU if `is_hpu_available()` - CPU otherwise """ if is_mps_available(): @@ -715,6 +722,8 @@ def default_device(self) -> torch.device: # See issue #3020: https://github.com/huggingface/accelerate/issues/3020 elif is_npu_available(): return torch.device("npu") + elif is_hpu_available(): + return torch.device("hpu") elif torch.cuda.is_available(): return torch.device("cuda") elif is_xpu_available(): @@ -735,6 +744,7 @@ def _prepare_backend( elif is_torch_xla_available(): backend = "xla" distributed_type = DistributedType.XLA + elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu: if is_mlu_available(): backend = "cncl" @@ -750,6 +760,10 @@ def _prepare_backend( elif is_npu_available(): backend = "hccl" distributed_type = DistributedType.MULTI_NPU + elif is_hpu_available(init_hccl=True): + if backend is None: + backend = "hccl" + distributed_type = DistributedType.MULTI_HPU elif torch.cuda.is_available(): if backend is None: backend = "nccl" @@ -794,12 +808,14 @@ def set_device(self): self.device = torch.device("cpu") if self._cpu else self.default_device return device = str(self.distributed_type).split(".")[-1].replace("MULTI_", "").lower() - if device not in ("cpu", "gpu", "mlu", "musa", "npu", "xpu", "xla", "sdaa"): + if device not in ("cpu", "gpu", "mlu", "musa", "npu", "xpu", "xla", "hpu", "sdaa"): raise ValueError( f"Can't set device for {self.distributed_type} ({device}), verify we should be calling `_set_device()` for it!" ) if device == "xla": self.device = xm.xla_device() + elif device == "hpu": + self.device = torch.device("hpu", torch.hpu.current_device()) else: if device == "gpu": device = "cuda" @@ -890,17 +906,24 @@ def __init__( else mixed_precision.lower() ) if mixed_precision == "fp8": + # this is confusing, why is is_fp8_available only checks for library availability ? if not is_fp8_available(): raise ValueError( "Using `fp8` precision requires `transformer_engine` or `MS-AMP` to be installed." ) - elif not check_fp8_capability(): + elif torch.cuda.is_available() and not check_cuda_fp8_capability(): logger.warning( f"The current device has compute capability of {torch.cuda.get_device_capability()} which is " "insufficient for FP8 mixed precision training (requires a GPU Hopper/Ada Lovelace " "or higher, compute capability of 8.9 or higher). Will use FP16 instead." ) mixed_precision = "fp16" + elif is_habana_gaudi1(): + logger.warning( + "The current HPU device is Gaudi1 which does not support FP8 mixed precision training (requires " + "Gaudi2 or higher). Will use BF16 instead." + ) + mixed_precision = "bf16" self.dynamo_plugin = dynamo_plugin if not _from_accelerator: @@ -930,6 +953,7 @@ def __init__( DistributedType.MULTI_MUSA, DistributedType.MULTI_NPU, DistributedType.MULTI_XPU, + DistributedType.MULTI_HPU, ]: if os.environ.get("ACCELERATE_USE_FSDP", "false") == "true" or fsdp_plugin is not None: self.distributed_type = DistributedType.FSDP diff --git a/src/accelerate/test_utils/__init__.py b/src/accelerate/test_utils/__init__.py index 94f71006f96..a68aff2d1ff 100644 --- a/src/accelerate/test_utils/__init__.py +++ b/src/accelerate/test_utils/__init__.py @@ -24,7 +24,10 @@ require_bnb, require_cpu, require_cuda, + require_cuda_or_hpu, require_cuda_or_xpu, + require_fp8, + require_fp16, require_huggingface_suite, require_mlu, require_mps, @@ -33,6 +36,7 @@ require_multi_xpu, require_musa, require_non_cpu, + require_non_hpu, require_non_torch_xla, require_non_xpu, require_npu, @@ -47,6 +51,7 @@ require_tpu, require_transformer_engine, require_xpu, + run_first, skip, slow, torch_device, diff --git a/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py b/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py index 5293f41ea5e..ef412e6dc55 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py @@ -23,7 +23,14 @@ from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed from accelerate import Accelerator, DistributedType -from accelerate.utils import is_mlu_available, is_musa_available, is_npu_available, is_sdaa_available, is_xpu_available +from accelerate.utils import ( + is_hpu_available, + is_mlu_available, + is_musa_available, + is_npu_available, + is_sdaa_available, + is_xpu_available, +) from accelerate.utils.deepspeed import DummyOptim, DummyScheduler @@ -64,6 +71,10 @@ def __enter__(self): torch.xpu.empty_cache() torch.xpu.reset_max_memory_allocated() # reset the peak gauge to zero self.begin = torch.xpu.memory_allocated() + elif is_hpu_available(): + # torch.hpu.empty_cache() # not available on hpu as it reserves all device memory for the current process + torch.hpu.reset_peak_memory_stats() # reset the peak gauge to zero + self.begin = torch.hpu.memory_allocated() return self def __exit__(self, *exc): @@ -74,15 +85,15 @@ def __exit__(self, *exc): self.peak = torch.cuda.max_memory_allocated() elif is_mlu_available(): torch.mlu.empty_cache() - torch.mlu.memory_allocated() # reset the peak gauge to zero + self.end = torch.mlu.memory_allocated() self.begin = torch.mlu.max_memory_allocated() elif is_sdaa_available(): torch.sdaa.empty_cache() - torch.sdaa.memory_allocated() # reset the peak gauge to zero + self.end = torch.sdaa.memory_allocated() self.begin = torch.sdaa.max_memory_allocated() elif is_musa_available(): torch.musa.empty_cache() - torch.musa.memory_allocated() # reset the peak gauge to zero + self.end = torch.musa.memory_allocated() self.begin = torch.musa.max_memory_allocated() elif is_npu_available(): torch.npu.empty_cache() @@ -92,6 +103,10 @@ def __exit__(self, *exc): torch.xpu.empty_cache() self.end = torch.xpu.memory_allocated() self.peak = torch.xpu.max_memory_allocated() + elif is_hpu_available(): + # torch.hpu.empty_cache() # not available on hpu as it reserves all device memory for the current process + self.end = torch.hpu.memory_allocated() + self.peak = torch.hpu.max_memory_allocated() self.used = b2mb(self.end - self.begin) self.peaked = b2mb(self.peak - self.begin) # print(f"delta used/peak {self.used:4d}/{self.peaked:4d}") diff --git a/src/accelerate/test_utils/scripts/external_deps/test_performance.py b/src/accelerate/test_utils/scripts/external_deps/test_performance.py index 7b6f21350fd..18b36738ca5 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_performance.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_performance.py @@ -21,9 +21,10 @@ from datasets import load_dataset from torch.optim import AdamW from torch.utils.data import DataLoader -from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed +from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup from accelerate import Accelerator, DistributedType +from accelerate.utils import SAFE_WEIGHTS_NAME, set_seed from accelerate.utils.deepspeed import DummyOptim, DummyScheduler @@ -216,7 +217,7 @@ def training_function(config, args): accelerator.save_model(model, args.output_dir) accelerator.wait_for_everyone() assert Path( - args.output_dir, "model.safetensors" + args.output_dir, SAFE_WEIGHTS_NAME ).exists(), "Model was not saved when calling `Accelerator.save_model`" accelerator.end_training() diff --git a/src/accelerate/test_utils/scripts/external_deps/test_pippy.py b/src/accelerate/test_utils/scripts/external_deps/test_pippy.py index 389b963e0cb..2ef461a92cb 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_pippy.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_pippy.py @@ -21,6 +21,7 @@ from accelerate import PartialState from accelerate.inference import prepare_pippy +from accelerate.test_utils import torch_device from accelerate.utils import DistributedType, set_seed @@ -50,7 +51,7 @@ def test_bert(batch_size: int = 2): model, trace_input, inference_inputs = get_model_and_data_for_text("bert", "cpu", batch_size) model = prepare_pippy(model, example_args=(trace_input,), no_split_module_classes=model._no_split_modules) # For inference args need to be a tuple - inputs = inference_inputs.to("cuda") + inputs = inference_inputs.to(torch_device) with torch.no_grad(): output = model(inputs) # Zach: Check that we just grab the real outputs we need at the end @@ -66,7 +67,7 @@ def test_gpt2(batch_size: int = 2): model, trace_input, inference_inputs = get_model_and_data_for_text("gpt2", "cpu", batch_size) model = prepare_pippy(model, example_args=(trace_input,), no_split_module_classes=model._no_split_modules) # For inference args need to be a tuple - inputs = inference_inputs.to("cuda") + inputs = inference_inputs.to(torch_device) with torch.no_grad(): output = model(inputs) # Zach: Check that we just grab the real outputs we need at the end @@ -87,7 +88,7 @@ def test_gpt2(batch_size: int = 2): # example_args=(input_tensor,), # ) # inference_inputs = torch.rand(batch_size, 3, 224, 224) -# inputs = send_to_device(inference_inputs, "cuda:0") +# inputs = send_to_device(inference_inputs, torch_device) # with torch.no_grad(): # output = model(inputs) # # Zach: Check that we just grab the real outputs we need at the end @@ -101,7 +102,7 @@ def test_gpt2(batch_size: int = 2): state = PartialState() state.print("Testing pippy integration...") try: - if state.distributed_type == DistributedType.MULTI_GPU: + if state.distributed_type in [DistributedType.MULTI_GPU, DistributedType.MULTI_HPU]: state.print("Testing GPT2...") test_gpt2() # Issue: When modifying the tokenizer for batch GPT2 inference, there's an issue diff --git a/src/accelerate/test_utils/scripts/external_deps/test_zero3_integration.py b/src/accelerate/test_utils/scripts/external_deps/test_zero3_integration.py index 2bbb324c8cd..f5352b19980 100644 --- a/src/accelerate/test_utils/scripts/external_deps/test_zero3_integration.py +++ b/src/accelerate/test_utils/scripts/external_deps/test_zero3_integration.py @@ -27,7 +27,13 @@ @require_huggingface_suite def init_torch_dist_then_launch_deepspeed(): - backend = "ccl" if torch_device == "xpu" else "nccl" + if torch_device == "xpu": + backend = "ccl" + elif torch_device == "hpu": + backend = "hccl" + else: + backend = "nccl" + torch.distributed.init_process_group(backend=backend) deepspeed_config = { "zero_optimization": { diff --git a/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py b/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py index 01c939db15b..0db5844e026 100644 --- a/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py +++ b/src/accelerate/test_utils/scripts/test_ddp_comm_hook.py @@ -14,6 +14,7 @@ import torch from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs, PartialState +from accelerate.utils import is_hpu_available class MockModel(torch.nn.Module): @@ -69,6 +70,12 @@ def main(): (DDPCommunicationHookType.BATCHED_POWER_SGD, DDPCommunicationHookType.FP16, {}), (DDPCommunicationHookType.BATCHED_POWER_SGD, DDPCommunicationHookType.BF16, {}), ]: + if is_hpu_available(): + HPU_UNSUPPORTED_COMM_HOOKS = {DDPCommunicationHookType.FP16, DDPCommunicationHookType.BF16} + if comm_hook in HPU_UNSUPPORTED_COMM_HOOKS or comm_wrapper in HPU_UNSUPPORTED_COMM_HOOKS: + print(f"Skipping test DDP comm hook: {comm_hook}, comm wrapper: {comm_wrapper} on HPU") + continue + print(f"Test DDP comm hook: {comm_hook}, comm wrapper: {comm_wrapper}") test_ddp_comm_hook(comm_hook, comm_wrapper, comm_state_option) PartialState().destroy_process_group() diff --git a/src/accelerate/test_utils/scripts/test_distributed_data_loop.py b/src/accelerate/test_utils/scripts/test_distributed_data_loop.py index 899dc6e3f87..ce76435f097 100644 --- a/src/accelerate/test_utils/scripts/test_distributed_data_loop.py +++ b/src/accelerate/test_utils/scripts/test_distributed_data_loop.py @@ -383,20 +383,21 @@ def main(): test_pickle_accelerator() dataset = DummyDataset() - # Conventional Dataloader with shuffle=False + + accelerator.print("Test DataLoader with shuffle=False") loader = DataLoader(dataset, shuffle=False, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS) test_data_loader(loader, accelerator) - # Conventional Dataloader with shuffle=True + accelerator.print("Test DataLoader with shuffle=True") loader = DataLoader(dataset, shuffle=True, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS) test_data_loader(loader, accelerator) - # Dataloader with batch_sampler + accelerator.print("Test DataLoader with batch_sampler") sampler = BatchSampler(RandomSampler(dataset), batch_size=BATCH_SIZE, drop_last=False) loader = DataLoader(dataset, batch_sampler=sampler, num_workers=NUM_WORKERS) test_data_loader(loader, accelerator) - # Dataloader with sampler as an instance of `BatchSampler` + accelerator.print("Test DataLoader with sampler as an instance of `BatchSampler`") sampler = BatchSampler(RandomSampler(dataset), batch_size=BATCH_SIZE, drop_last=False) loader = DataLoader(dataset, sampler=sampler, batch_size=None, collate_fn=default_collate, num_workers=NUM_WORKERS) test_data_loader(loader, accelerator) diff --git a/src/accelerate/test_utils/scripts/test_merge_weights.py b/src/accelerate/test_utils/scripts/test_merge_weights.py index a1390864047..8671cf99ecc 100644 --- a/src/accelerate/test_utils/scripts/test_merge_weights.py +++ b/src/accelerate/test_utils/scripts/test_merge_weights.py @@ -25,6 +25,7 @@ from accelerate import Accelerator, FullyShardedDataParallelPlugin from accelerate.commands.merge import merge_command, merge_command_parser from accelerate.state import AcceleratorState +from accelerate.test_utils import torch_device from accelerate.test_utils.training import RegressionDataset from accelerate.utils import merge_fsdp_weights, patch_environment, save_fsdp_model @@ -78,10 +79,10 @@ def mock_training(accelerator, model): def check_weights(operation, state_1, state_2): for weight_1, weight_2 in zip(state_1.values(), state_2.values()): - if str(weight_1.device) != "cuda": - weight_1 = weight_1.to("cuda") - if str(weight_2.device) != "cuda": - weight_2 = weight_2.to("cuda") + if str(weight_1.device) != torch_device: + weight_1 = weight_1.to(torch_device) + if str(weight_2.device) != torch_device: + weight_2 = weight_2.to(torch_device) if operation == "same": assert torch.allclose(weight_1, weight_2) else: diff --git a/src/accelerate/test_utils/scripts/test_ops.py b/src/accelerate/test_utils/scripts/test_ops.py index e70d0fe504f..f8f535d7b25 100644 --- a/src/accelerate/test_utils/scripts/test_ops.py +++ b/src/accelerate/test_utils/scripts/test_ops.py @@ -54,8 +54,9 @@ def test_gather_non_contigous(state): # Skip this test because the 'is_contiguous' function of XLA tensor always returns True. if state.distributed_type == DistributedType.XLA: return - # Create a non-contiguous tensor - tensor = torch.arange(12).view(4, 3).t().to(state.device) + + # Create a non-contiguous tensor (enforce non-contiguity after device memory allocation) + tensor = torch.arange(12, device=state.device).view(4, 3).t() assert not tensor.is_contiguous() # Shouldn't error out _ = gather(tensor) diff --git a/src/accelerate/test_utils/scripts/test_script.py b/src/accelerate/test_utils/scripts/test_script.py index b5b8fec1b6e..71891965cf6 100644 --- a/src/accelerate/test_utils/scripts/test_script.py +++ b/src/accelerate/test_utils/scripts/test_script.py @@ -35,12 +35,10 @@ gather, is_bf16_available, is_datasets_available, + is_fp16_available, + is_hpu_available, is_ipex_available, - is_mlu_available, - is_musa_available, - is_npu_available, is_pytest_available, - is_sdaa_available, is_xpu_available, set_seed, synchronize_rng_states, @@ -53,6 +51,13 @@ else: from accelerate.test_utils import RegressionModel +if is_hpu_available(): + ATOL = 1e-3 + RTOL = 1e-3 +else: + ATOL = 1e-6 + RTOL = 1e-6 + def generate_baseline_dataloader(train_set, generator, batch_size, use_seedable_sampler=False): "Creates a dataloader that can also use the `SeedableRandomSampler`" @@ -195,7 +200,6 @@ def dl_preparation_check(): result.append(gather(batch)) result = torch.cat(result) - print(state.process_index, result, type(dl)) assert torch.equal(result.cpu(), torch.arange(0, length).long()), "Wrong non-shuffled dataloader result." dl = DataLoader(range(length), batch_size=8) @@ -474,8 +478,20 @@ def training_check(use_seedable_sampler=False): optimizer.step() model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on CPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on CPU or distributed training." + torch.testing.assert_close( + old_model.a, + model.a, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) + torch.testing.assert_close( + old_model.b, + model.b, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) accelerator.print("Training yielded the same results on one CPU or distributed setup with no batch split.") @@ -499,42 +515,24 @@ def training_check(use_seedable_sampler=False): optimizer.step() model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on CPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on CPU or distributed training." - - accelerator.print("Training yielded the same results on one CPU or distributes setup with batch split.") - - if ( - torch.cuda.is_available() - or is_npu_available() - or is_mlu_available() - or is_musa_available() - or is_sdaa_available() - ): - # Mostly a test that FP16 doesn't crash as the operation inside the model is not converted to FP16 - print("FP16 training check.") - AcceleratorState._reset_state() - dataloader_config = DataLoaderConfiguration(use_seedable_sampler=use_seedable_sampler) - accelerator = Accelerator(mixed_precision="fp16", dataloader_config=dataloader_config) - train_dl = generate_baseline_dataloader(train_set, generator, batch_size, use_seedable_sampler) - model = RegressionModel() - optimizer = torch.optim.SGD(model.parameters(), lr=0.1) - - train_dl, model, optimizer = accelerator.prepare(train_dl, model, optimizer) - set_seed(42) - generator.manual_seed(42) - for _ in range(3): - for batch in train_dl: - model.zero_grad() - output = model(batch["x"]) - loss = torch.nn.functional.mse_loss(output, batch["y"]) - accelerator.backward(loss) - optimizer.step() + torch.testing.assert_close( + old_model.a, + model.a, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) + torch.testing.assert_close( + old_model.b, + model.b, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) - model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on CPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on CPU or distributed training." + accelerator.print("Training yielded the same results on one CPU or distributed setup with batch split.") + # FP32 wrapper check if torch.cuda.is_available(): # Mostly a test that model.forward will have autocast when running unwrap_model(model, keep_fp32_wrapper=True) print("Keep fp32 wrapper check.") @@ -550,7 +548,7 @@ def training_check(use_seedable_sampler=False): input_tensor = torch.Tensor([1, 2]).to(dtype=torch.float16, device=accelerator.device) output = model_with_fp32_wrapper(input_tensor) - # BF16 support is only for CPU + TPU, and some GPU + # BF16 support if is_bf16_available(): # Mostly a test that BF16 doesn't crash as the operation inside the model is not converted to BF16 print("BF16 training check.") @@ -573,15 +571,28 @@ def training_check(use_seedable_sampler=False): optimizer.step() model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on CPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on CPU or distributed training." + torch.testing.assert_close( + old_model.a, + model.a, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) + torch.testing.assert_close( + old_model.b, + model.b, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) - # IPEX support is only for CPU - if is_ipex_available(): - print("ipex BF16 training check.") + # FP16 support (HPU fp16 model seems to be off by 10% from the CPU, which is a lot of numerical error) + if is_fp16_available() and not is_hpu_available(): + # Mostly a test that FP16 doesn't crash as the operation inside the model is not converted to FP16 + print("FP16 training check.") AcceleratorState._reset_state() dataloader_config = DataLoaderConfiguration(use_seedable_sampler=use_seedable_sampler) - accelerator = Accelerator(mixed_precision="bf16", cpu=True, dataloader_config=dataloader_config) + accelerator = Accelerator(mixed_precision="fp16", dataloader_config=dataloader_config) train_dl = generate_baseline_dataloader(train_set, generator, batch_size, use_seedable_sampler) model = RegressionModel() optimizer = torch.optim.SGD(model.parameters(), lr=0.1) @@ -598,15 +609,27 @@ def training_check(use_seedable_sampler=False): optimizer.step() model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on CPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on CPU or distributed training." + torch.testing.assert_close( + old_model.a, + model.a, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) + torch.testing.assert_close( + old_model.b, + model.b, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) - # XPU support is only for XPU - if is_xpu_available(): - print("xpu BF16 training check.") + # IPEX support is only for CPU + if is_ipex_available(): + print("ipex BF16 training check.") AcceleratorState._reset_state() dataloader_config = DataLoaderConfiguration(use_seedable_sampler=use_seedable_sampler) - accelerator = Accelerator(mixed_precision="bf16", cpu=False, dataloader_config=dataloader_config) + accelerator = Accelerator(mixed_precision="bf16", cpu=True, dataloader_config=dataloader_config) train_dl = generate_baseline_dataloader(train_set, generator, batch_size, use_seedable_sampler) model = RegressionModel() optimizer = torch.optim.SGD(model.parameters(), lr=0.1) @@ -623,8 +646,20 @@ def training_check(use_seedable_sampler=False): optimizer.step() model = accelerator.unwrap_model(model).cpu() - assert torch.allclose(old_model.a, model.a), "Did not obtain the same model on XPU or distributed training." - assert torch.allclose(old_model.b, model.b), "Did not obtain the same model on XPU or distributed training." + torch.testing.assert_close( + old_model.a, + model.a, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) + torch.testing.assert_close( + old_model.b, + model.b, + atol=ATOL, + rtol=RTOL, + msg=lambda msg: f"Did not obtain the same model on CPU or distributed training.\n{msg}", + ) def test_split_between_processes_dataset(datasets_Dataset): @@ -694,7 +729,7 @@ def test_split_between_processes_nested_dict(): assert results["a"] == data_copy["a"][4:] elif state.process_index == 3: # We return a list each time - assert results["a"] == data_copy["a"][-2:], f'Expected: {data_copy["a"][-2]}, Actual: {results["a"]}' + assert results["a"] == data_copy["a"][-2:], f"Expected: {data_copy['a'][-2]}, Actual: {results['a']}" if state.process_index == 0: assert results["b"] == data_copy["b"][: 8 // state.num_processes] elif state.num_processes == 2: @@ -704,7 +739,7 @@ def test_split_between_processes_nested_dict(): if state.process_index == 0: assert torch.allclose( results["c"], data_copy["c"][: 8 // state.num_processes] - ), f"Did not obtain expected values on process 0, expected `{data['c'][:8 // state.num_processes]}`, received: {results['c']}" + ), f"Did not obtain expected values on process 0, expected `{data['c'][: 8 // state.num_processes]}`, received: {results['c']}" elif state.num_processes == 2: assert torch.allclose( results["c"], data_copy["c"][4:] @@ -723,9 +758,10 @@ def test_split_between_processes_tensor(): data = torch.tensor([[0, 1, 2, 3], [4, 5, 6, 7]]).to(state.device) with state.split_between_processes(data) as results: if state.process_index == 0: - assert torch.allclose(results, torch.tensor([0, 1, 2, 3]).to(state.device)) + expected = torch.tensor([[0, 1, 2, 3]]).to(state.device) else: - assert torch.allclose(results, torch.tensor([4, 5, 6, 7]).to(state.device)) + expected = torch.tensor([[4, 5, 6, 7]]).to(state.device) + torch.testing.assert_close(results, expected) state.wait_for_everyone() diff --git a/src/accelerate/test_utils/scripts/test_sync.py b/src/accelerate/test_utils/scripts/test_sync.py index e936362184c..02ce0cb743c 100644 --- a/src/accelerate/test_utils/scripts/test_sync.py +++ b/src/accelerate/test_utils/scripts/test_sync.py @@ -283,7 +283,7 @@ def test_gradient_accumulation_with_opt_and_scheduler( # Learning rates should be the same assert ( opt.param_groups[0]["lr"] == ddp_opt.param_groups[0]["lr"] - ), f'Learning rates found in each optimizer did not align\nopt: {opt.param_groups[0]["lr"]}\nDDP opt: {ddp_opt.param_groups[0]["lr"]}\n' + ), f"Learning rates found in each optimizer did not align\nopt: {opt.param_groups[0]['lr']}\nDDP opt: {ddp_opt.param_groups[0]['lr']}\n" did_step = (((iteration + 1) % 2) == 0) or ((iteration + 1) == len(dataloader)) if accelerator.num_processes > 1: check_model_parameters( @@ -346,6 +346,7 @@ def main(): DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, DistributedType.MULTI_CPU, + DistributedType.MULTI_HPU, ): if state.local_process_index == 0: print("**Test Distributed `no_sync` context manager**") @@ -359,6 +360,7 @@ def main(): DistributedType.MULTI_MLU, DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, + DistributedType.MULTI_HPU, ): for split_batch in [True, False]: for dispatch_batches in [True, False]: @@ -383,6 +385,7 @@ def main(): DistributedType.MULTI_MLU, DistributedType.MULTI_SDAA, DistributedType.MULTI_MUSA, + DistributedType.MULTI_HPU, ): for split_batch in [True, False]: for dispatch_batches in [True, False]: diff --git a/src/accelerate/test_utils/testing.py b/src/accelerate/test_utils/testing.py index 2e80d3ea7cb..6bed0f8978c 100644 --- a/src/accelerate/test_utils/testing.py +++ b/src/accelerate/test_utils/testing.py @@ -27,12 +27,14 @@ from typing import List, Union from unittest import mock +import pytest import torch import accelerate -from ..state import AcceleratorState, PartialState +from ..state import AcceleratorState from ..utils import ( + check_cuda_fp8_capability, gather, is_bnb_available, is_clearml_available, @@ -41,6 +43,10 @@ is_datasets_available, is_deepspeed_available, is_dvclive_available, + is_fp8_available, + is_fp16_available, + is_habana_gaudi1, + is_hpu_available, is_import_timer_available, is_mlu_available, is_mps_available, @@ -85,6 +91,8 @@ def get_backend(): return "npu", torch.npu.device_count(), torch.npu.memory_allocated elif is_xpu_available(): return "xpu", torch.xpu.device_count(), torch.xpu.memory_allocated + elif is_hpu_available(): + return "hpu", torch.hpu.device_count(), torch.hpu.memory_allocated else: return "cpu", 1, lambda: 0 @@ -169,6 +177,16 @@ def require_cuda(test_case): return unittest.skipUnless(is_cuda_available() and not is_torch_xla_available(), "test requires a GPU")(test_case) +def require_cuda_or_hpu(test_case): + """ + Decorator marking a test that requires CUDA or HPU. These tests are skipped when there are no GPU available or when + TorchXLA is available. + """ + return unittest.skipUnless( + (is_cuda_available() and not is_torch_xla_available()) or is_hpu_available(), "test requires a GPU or HPU" + )(test_case) + + def require_xpu(test_case): """ Decorator marking a test that requires XPU. These tests are skipped when there are no XPU available. @@ -193,6 +211,39 @@ def require_non_xpu(test_case): return unittest.skipUnless(torch_device != "xpu", "test requires a non-XPU")(test_case) +def require_non_hpu(test_case): + """ + Decorator marking a test that should be skipped for HPU. + """ + return unittest.skipUnless(torch_device != "hpu", "test requires a non-HPU")(test_case) + + +def require_fp16(test_case): + """ + Decorator marking a test that requires FP16. These tests are skipped when FP16 is not supported. + """ + + return unittest.skipUnless(is_fp16_available(), "test requires FP16 support")(test_case) + + +def require_fp8(test_case): + """ + Decorator marking a test that requires FP8. These tests are skipped when FP8 is not supported. + """ + + # is_fp8_available only checks for libraries + # ideally it should check for device capability as well + fp8_is_available = is_fp8_available() + + if torch.cuda.is_available() and not check_cuda_fp8_capability(): + fp8_is_available = False + + if is_hpu_available() and is_habana_gaudi1(): + fp8_is_available = False + + return unittest.skipUnless(fp8_is_available, "test requires FP8 support")(test_case) + + def require_mlu(test_case): """ Decorator marking a test that requires MLU. These tests are skipped when there are no MLU available. @@ -301,9 +352,9 @@ def require_single_device(test_case): Decorator marking a test that requires a single device. These tests are skipped when there is no hardware accelerator available or number of devices is more than one. """ - return unittest.skipUnless(torch_device != "cpu" and device_count == 1, "test requires a hardware accelerator")( - test_case - ) + return unittest.skipUnless( + torch_device != "cpu" and device_count == 1, "test requires a single device accelerator" + )(test_case) def require_single_gpu(test_case): @@ -415,9 +466,10 @@ def require_pandas(test_case): def require_pippy(test_case): """ - Decorator marking a test that requires pippy installed. These tests are skipped when pippy isn't installed + Decorator marking a test that requires pippy installed. These tests are skipped when pippy isn't installed It is + also checked if the test is running on a Gaudi1 device which doesn't support pippy. """ - return unittest.skipUnless(is_pippy_available(), "test requires pippy")(test_case) + return unittest.skipUnless(is_pippy_available() and not is_habana_gaudi1(), "test requires pippy")(test_case) def require_import_timer(test_case): @@ -471,6 +523,18 @@ def require_torchdata_stateful_dataloader(test_case): )(test_case) +def run_first(test_case): + """ + Decorator marking a test with order(1). When pytest-order plugin is installed, tests marked with this decorator are + garanteed to run first. + + This is especially useful in some test settings like on a Gaudi instance where a Gaudi device can only be used by a + single process at a time. So we make sure all tests that run in a subprocess are launched first, to avoid device + allocation conflicts. + """ + return pytest.mark.order(1)(test_case) + + class TempDirTestCase(unittest.TestCase): """ A TestCase class that keeps a single `tempfile.TemporaryDirectory` open for the duration of the class, wipes its @@ -514,8 +578,7 @@ class AccelerateTestCase(unittest.TestCase): def tearDown(self): super().tearDown() # Reset the state of the AcceleratorState singleton. - AcceleratorState._reset_state() - PartialState._reset_state() + AcceleratorState._reset_state(True) class MockingTestCase(unittest.TestCase): diff --git a/src/accelerate/utils/__init__.py b/src/accelerate/utils/__init__.py index 46a5660868e..51298d03140 100644 --- a/src/accelerate/utils/__init__.py +++ b/src/accelerate/utils/__init__.py @@ -66,8 +66,8 @@ ) from .environment import ( are_libraries_initialized, + check_cuda_fp8_capability, check_cuda_p2p_ib_support, - check_fp8_capability, clear_environment, convert_dict_to_env_variables, get_cpu_distributed_information, @@ -98,6 +98,9 @@ is_deepspeed_available, is_dvclive_available, is_fp8_available, + is_fp16_available, + is_habana_gaudi1, + is_hpu_available, is_import_timer_available, is_ipex_available, is_lomo_available, diff --git a/src/accelerate/utils/constants.py b/src/accelerate/utils/constants.py index a9d840c896d..63f518355ba 100644 --- a/src/accelerate/utils/constants.py +++ b/src/accelerate/utils/constants.py @@ -86,4 +86,5 @@ "MULTI_MUSA", "MULTI_XPU", "MULTI_CPU", + "MULTI_HPU", ] diff --git a/src/accelerate/utils/dataclasses.py b/src/accelerate/utils/dataclasses.py index 0c3d7c438ad..d3e09c309e7 100644 --- a/src/accelerate/utils/dataclasses.py +++ b/src/accelerate/utils/dataclasses.py @@ -41,6 +41,7 @@ from .environment import parse_flag_from_env, str_to_bool from .imports import ( is_cuda_available, + is_hpu_available, is_mlu_available, is_msamp_available, is_musa_available, @@ -428,7 +429,7 @@ def __post_init__(self): # Literal -ProfilerActivity = Literal["cpu", "xpu", "mtia", "cuda"] +ProfilerActivity = Literal["cpu", "xpu", "mtia", "cuda", "hpu"] @dataclass @@ -456,7 +457,8 @@ class ProfileKwargs(KwargsHandler): Args: activities (`List[str]`, *optional*, default to `None`): - The list of activity groups to use in profiling. Must be one of `"cpu"`, `"xpu"`, `"mtia"`, or `"cuda"`. + The list of activity groups to use in profiling. Must be one of `"cpu"`, `"xpu"`, `"mtia"`, "hpu" or + `"cuda"`. schedule_option (`Dict[str, int]`, *optional*, default to `None`): The schedule option to use for the profiler. Available keys are `wait`, `warmup`, `active`, `repeat` and `skip_first`. The profiler will skip the first `skip_first` steps, then wait for `wait` steps, then do the @@ -506,11 +508,16 @@ def _get_profiler_activity(self, activity: ProfilerActivity) -> torch.profiler.P "cuda": torch.profiler.ProfilerActivity.CUDA, } + if is_hpu_available(): + profiler_activity_map["hpu"] = torch.profiler.ProfilerActivity.HPU + if is_torch_version(">=", XPU_PROFILING_AVAILABLE_PYTORCH_VERSION): - profiler_activity_map["xpu"] = torch.profiler.ProfilerActivity.XPU + if torch.xpu.is_available(): + profiler_activity_map["xpu"] = torch.profiler.ProfilerActivity.XPU if is_torch_version(">=", MITA_PROFILING_AVAILABLE_PYTORCH_VERSION): - profiler_activity_map["mtia"] = torch.profiler.ProfilerActivity.MTIA + if torch.mtia.is_available(): + profiler_activity_map["mtia"] = torch.profiler.ProfilerActivity.MTIA if activity not in profiler_activity_map: raise ValueError(f"Invalid profiler activity: {activity}. Must be one of {list(profiler_activity_map)}.") @@ -556,6 +563,7 @@ class DistributedType(str, enum.Enum): - **MULTI_MUSA** -- Distributed on multiple MUSAs. - **MULTI_NPU** -- Distributed on multiple NPUs. - **MULTI_XPU** -- Distributed on multiple XPUs. + - **MULTI_HPU** -- Distributed on multiple HPUs. - **DEEPSPEED** -- Using DeepSpeed. - **XLA** -- Using TorchXLA. """ @@ -574,6 +582,7 @@ class DistributedType(str, enum.Enum): TP = "TP" XLA = "XLA" MEGATRON_LM = "MEGATRON_LM" + MULTI_HPU = "MULTI_HPU" class SageMakerDistributedType(str, enum.Enum): @@ -656,6 +665,7 @@ class DynamoBackend(str, BaseEnum): - **IPEX** -- Uses IPEX for inference on CPU. Inference only. [Read more](https://github.com/intel/intel-extension-for-pytorch). - **TVM** -- Uses Apach TVM for inference optimizations. [Read more](https://tvm.apache.org/) + - **HPU_BACKEND** -- Uses HPU backend for inference optimizations. """ @@ -675,6 +685,7 @@ class DynamoBackend(str, BaseEnum): TORCHXLA_TRACE_ONCE = "TORCHXLA_TRACE_ONCE" IPEX = "IPEX" TVM = "TVM" + HPU_BACKEND = "HPU_BACKEND" class LoggerType(BaseEnum): @@ -724,6 +735,7 @@ class RNGType(BaseEnum): NPU = "npu" XLA = "xla" XPU = "xpu" + HPU = "hpu" GENERATOR = "generator" @@ -1725,9 +1737,11 @@ def __post_init__(self): device = torch.cuda.current_device() elif is_xpu_available(): device = torch.xpu.current_device() + elif is_hpu_available(): + device = torch.hpu.current_device() else: raise RuntimeError( - "There are currently no available devices found, must be one of 'XPU', 'CUDA', or 'NPU'." + "There are currently no available devices found, must be one of 'XPU', 'CUDA', 'MLU', 'NPU', 'MUSA', or 'HPU'." ) # Create a function that will be used to initialize the parameters of the model # when using `sync_module_states` @@ -1870,8 +1884,14 @@ def __post_init__(self): ) from torch.distributed.device_mesh import init_device_mesh + # support for other devices has to be investigated + if is_hpu_available(init_hccl=True): + device = "hpu" + else: + device = "cuda" + mesh_dim_name = "tp" - device = "cuda" # support for other devices has to be investigated + self.torch_device_mesh = init_device_mesh(device, (self.tp_size,), mesh_dim_names=(mesh_dim_name,)) diff --git a/src/accelerate/utils/environment.py b/src/accelerate/utils/environment.py index 1f64a144333..78c4863cf36 100644 --- a/src/accelerate/utils/environment.py +++ b/src/accelerate/utils/environment.py @@ -172,14 +172,26 @@ def check_cuda_p2p_ib_support(): return True -def check_fp8_capability(): +@lru_cache +def check_cuda_fp8_capability(): """ - Checks if all the current GPUs available support FP8. + Checks if the current GPU available supports FP8. - Notably must initialize `torch.cuda` to check. + Notably might initialize `torch.cuda` to check. """ - cuda_device_capacity = torch.cuda.get_device_capability() - return cuda_device_capacity >= (8, 9) + + try: + # try to get the compute capability from nvidia-smi + output = subprocess.check_output( + [_nvidia_smi(), "--query-gpu=compute_capability", "--format=csv,noheader"], universal_newlines=True + ) + output = output.strip() + # we take the first GPU's compute capability + compute_capability = tuple(map(int, output.split(os.linesep)[0].split("."))) + except Exception: + compute_capability = torch.cuda.get_device_capability() + + return compute_capability >= (8, 9) @dataclass diff --git a/src/accelerate/utils/imports.py b/src/accelerate/utils/imports.py index 12b79e75b01..f24f6988a89 100644 --- a/src/accelerate/utils/imports.py +++ b/src/accelerate/utils/imports.py @@ -102,17 +102,16 @@ def is_schedulefree_available(): def is_transformer_engine_available(): - return _is_package_available("transformer_engine", "transformer-engine") + if is_hpu_available(): + return _is_package_available("intel_transformer_engine", "intel-transformer-engine") + else: + return _is_package_available("transformer_engine", "transformer-engine") def is_lomo_available(): return _is_package_available("lomo_optim") -def is_fp8_available(): - return is_msamp_available() or is_transformer_engine_available() or is_torchao_available() - - def is_cuda_available(): """ Checks if `cuda` is available via an `nvml-based` check which won't trigger the drivers and leave cuda @@ -171,6 +170,19 @@ def is_bf16_available(ignore_tpu=False): return True +def is_fp16_available(): + "Checks if fp16 is supported" + if is_habana_gaudi1(): + return False + + return True + + +def is_fp8_available(): + "Checks if fp8 is supported" + return is_msamp_available() or is_transformer_engine_available() or is_torchao_available() + + def is_4bit_bnb_available(): package_exists = _is_package_available("bitsandbytes") if package_exists: @@ -404,6 +416,33 @@ def is_sdaa_available(check_device=False): return hasattr(torch, "sdaa") and torch.sdaa.is_available() +@lru_cache +def is_hpu_available(init_hccl=False): + "Checks if `torch.hpu` is installed and potentially if a HPU is in the environment" + if ( + importlib.util.find_spec("habana_frameworks") is None + or importlib.util.find_spec("habana_frameworks.torch") is None + ): + return False + + import habana_frameworks.torch # noqa: F401 + + if init_hccl: + import habana_frameworks.torch.distributed.hccl as hccl # noqa: F401 + + return hasattr(torch, "hpu") and torch.hpu.is_available() + + +def is_habana_gaudi1(): + if is_hpu_available(): + import habana_frameworks.torch.utils.experimental as htexp # noqa: F401 + + if htexp._get_device_type() == htexp.synDeviceType.synDeviceGaudi: + return True + + return False + + @lru_cache def is_xpu_available(check_device=False): """ diff --git a/src/accelerate/utils/launch.py b/src/accelerate/utils/launch.py index a7497d73781..65dbd9ab12c 100644 --- a/src/accelerate/utils/launch.py +++ b/src/accelerate/utils/launch.py @@ -27,6 +27,7 @@ DynamoBackend, PrecisionType, is_fp8_available, + is_hpu_available, is_ipex_available, is_mlu_available, is_musa_available, @@ -85,10 +86,9 @@ def setup_fp8_env(args: argparse.Namespace, current_env: Dict[str, str]): value = getattr(args, arg) if value is not None: if arg == "fp8_override_linear_precision": - values = value.strip("()").split(",") - current_env[prefix + "FP8_OVERRIDE_FPROP"] = values[0].strip() - current_env[prefix + "FP8_OVERRIDE_DGRAD"] = values[1].strip() - current_env[prefix + "FP8_OVERRIDE_WGRAD"] = values[2].strip() + current_env[prefix + "FP8_OVERRIDE_FPROP"] = value[0] + current_env[prefix + "FP8_OVERRIDE_DGRAD"] = value[1] + current_env[prefix + "FP8_OVERRIDE_WGRAD"] = value[2] else: current_env[f"{prefix}{arg.upper()}"] = str(getattr(args, arg)) return current_env @@ -142,6 +142,8 @@ def prepare_simple_launcher_cmd_env(args: argparse.Namespace) -> Tuple[List[str] current_env["MUSA_VISIBLE_DEVICES"] = args.gpu_ids elif is_npu_available(): current_env["ASCEND_RT_VISIBLE_DEVICES"] = args.gpu_ids + elif is_hpu_available(): + current_env["HABANA_VISIBLE_MODULES"] = args.gpu_ids else: current_env["CUDA_VISIBLE_DEVICES"] = args.gpu_ids if args.num_machines > 1: @@ -246,6 +248,8 @@ def prepare_multi_gpu_env(args: argparse.Namespace) -> Dict[str, str]: current_env["MUSA_VISIBLE_DEVICES"] = gpu_ids elif is_npu_available(): current_env["ASCEND_RT_VISIBLE_DEVICES"] = gpu_ids + elif is_hpu_available(): + current_env["HABANA_VISIBLE_MODULES"] = gpu_ids else: current_env["CUDA_VISIBLE_DEVICES"] = gpu_ids mixed_precision = args.mixed_precision.lower() @@ -414,6 +418,8 @@ def prepare_deepspeed_cmd_env(args: argparse.Namespace) -> Tuple[List[str], Dict current_env["MUSA_VISIBLE_DEVICES"] = gpu_ids elif is_npu_available(): current_env["ASCEND_RT_VISIBLE_DEVICES"] = gpu_ids + elif is_hpu_available(): + current_env["HABANA_VISIBLE_MODULES"] = gpu_ids else: current_env["CUDA_VISIBLE_DEVICES"] = gpu_ids try: diff --git a/src/accelerate/utils/memory.py b/src/accelerate/utils/memory.py index 6b3807013cf..ca3b88f5049 100644 --- a/src/accelerate/utils/memory.py +++ b/src/accelerate/utils/memory.py @@ -28,6 +28,7 @@ from .imports import ( is_cuda_available, + is_hpu_available, is_ipex_available, is_mlu_available, is_mps_available, @@ -61,6 +62,9 @@ def clear_device_cache(garbage_collection=False): torch.mps.empty_cache() elif is_cuda_available(): torch.cuda.empty_cache() + elif is_hpu_available(): + # torch.hpu.empty_cache() # not available on hpu as it reserves all device memory for the current process + pass def release_memory(*objects): @@ -106,6 +110,7 @@ def should_reduce_batch_size(exception: Exception) -> bool: "XPU out of memory.", # XPU OOM "cuDNN error: CUDNN_STATUS_NOT_SUPPORTED.", # CUDNN SNAFU "DefaultCPUAllocator: can't allocate memory", # CPU OOM + "FATAL ERROR :: MODULE:PT_DEVMEM Allocation failed", # HPU OOM ] if isinstance(exception, RuntimeError) and len(exception.args) == 1: return any(err in exception.args[0] for err in _statements) diff --git a/src/accelerate/utils/modeling.py b/src/accelerate/utils/modeling.py index e647a1850de..fb747b48c1c 100644 --- a/src/accelerate/utils/modeling.py +++ b/src/accelerate/utils/modeling.py @@ -33,6 +33,7 @@ from .constants import SAFE_WEIGHTS_NAME, WEIGHTS_NAME from .dataclasses import AutocastKwargs, CustomDtype, DistributedType from .imports import ( + is_hpu_available, is_mlu_available, is_mps_available, is_musa_available, @@ -92,15 +93,15 @@ def check_device_same(first_device, second_device): if first_device.type != second_device.type: return False - if first_device.type == "cuda" and first_device.index is None: + if first_device.type != "cpu" and first_device.index is None: # In case the first_device is a cuda device and have # the index attribute set to `None`, default it to `0` - first_device = torch.device("cuda", index=0) + first_device = torch.device(first_device.type, index=0) - if second_device.type == "cuda" and second_device.index is None: + if second_device.type != "cpu" and second_device.index is None: # In case the second_device is a cuda device and have # the index attribute set to `None`, default it to `0` - second_device = torch.device("cuda", index=0) + second_device = torch.device(second_device.type, index=0) return first_device == second_device @@ -200,8 +201,8 @@ def id_tensor_storage(tensor: torch.Tensor) -> Tuple[torch.device, int, int]: storage_ptr = tensor.untyped_storage().data_ptr() storage_size = tensor.untyped_storage().nbytes() except Exception: - # Fallback for torch==1.10 try: + # Fallback for torch==1.10 storage_ptr = tensor.storage().data_ptr() storage_size = tensor.storage().size() * _SIZE[tensor.dtype] except NotImplementedError: @@ -322,6 +323,8 @@ def set_module_tensor_to_device( device = f"musa:{device}" elif is_xpu_available(): device = f"xpu:{device}" + elif is_hpu_available(): + device = "hpu" if "xpu" in str(device) and not is_xpu_available(): raise ValueError(f'{device} is not available, you should use device="cpu" instead') if value is None: @@ -792,6 +795,14 @@ def get_max_memory(max_memory: Optional[Dict[Union[int, str], Union[int, str]]] except Exception: logger.info(f"Device {i} seems unavailable, Proceeding to check subsequent devices.") continue + elif is_hpu_available(): + for i in range(torch.hpu.device_count()): + try: + _ = torch.tensor(0, device=torch.device("hpu", i)) + max_memory[i] = torch.hpu.mem_get_info(i)[0] + except Exception: + logger.info(f"Device {i} seems unavailable, Proceeding to check subsequent devices.") + continue else: for i in range(torch.cuda.device_count()): try: @@ -826,6 +837,8 @@ def get_max_memory(max_memory: Optional[Dict[Union[int, str], Union[int, str]]] num_devices = torch.musa.device_count() elif is_xpu_available(): num_devices = torch.xpu.device_count() + elif is_hpu_available(): + num_devices = torch.hpu.device_count() else: num_devices = torch.cuda.device_count() for device in gpu_devices: @@ -957,6 +970,8 @@ def get_balanced_memory( expected_device_type = "musa" elif is_xpu_available(): expected_device_type = "xpu" + elif is_hpu_available(): + expected_device_type = "hpu" else: expected_device_type = "cuda" num_devices = len([d for d in max_memory if torch.device(d).type == expected_device_type and max_memory[d] > 0]) @@ -1639,6 +1654,8 @@ def load_state_dict(checkpoint_file, device_map=None): target_device = f"xpu:{device}" elif is_npu_available(): target_device = f"npu:{device}" + elif is_hpu_available(): + target_device = "hpu" return safe_load_file(checkpoint_file, device=target_device) @@ -1675,6 +1692,8 @@ def load_state_dict(checkpoint_file, device_map=None): target_device = f"xpu:{device}" elif is_npu_available(): target_device = f"npu:{device}" + elif is_hpu_available(): + target_device = "hpu" with safe_open(checkpoint_file, framework="pt", device=target_device) as f: for key in device_weights[device]: @@ -2021,6 +2040,7 @@ def get_mixed_precision_context_manager(native_amp: bool = False, autocast_kwarg DistributedType.MULTI_MUSA, DistributedType.MULTI_NPU, DistributedType.MULTI_XPU, + DistributedType.MULTI_HPU, DistributedType.FSDP, DistributedType.XLA, ]: @@ -2058,6 +2078,8 @@ def get_grad_scaler(distributed_type: DistributedType = None, **kwargs): return torch.musa.amp.GradScaler(**kwargs) elif is_npu_available(): return torch.npu.amp.GradScaler(**kwargs) + elif is_hpu_available(): + return torch.amp.GradScaler("hpu", **kwargs) elif is_xpu_available(): return torch.amp.GradScaler("xpu", **kwargs) else: diff --git a/src/accelerate/utils/random.py b/src/accelerate/utils/random.py index 656c117d9fe..4f4ca517b00 100644 --- a/src/accelerate/utils/random.py +++ b/src/accelerate/utils/random.py @@ -22,6 +22,7 @@ from .constants import CUDA_DISTRIBUTED_TYPES from .dataclasses import DistributedType, RNGType from .imports import ( + is_hpu_available, is_mlu_available, is_musa_available, is_npu_available, @@ -62,6 +63,8 @@ def set_seed(seed: int, device_specific: bool = False, deterministic: bool = Fal torch.sdaa.manual_seed_all(seed) elif is_musa_available(): torch.musa.manual_seed_all(seed) + elif is_hpu_available(): + torch.hpu.manual_seed_all(seed) else: torch.cuda.manual_seed_all(seed) # ^^ safe to call this function even if cuda is not available @@ -96,6 +99,9 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona elif rng_type == RNGType.XPU: assert is_xpu_available(), "Can't synchronize XPU seeds on an environment without XPUs." rng_state = torch.xpu.get_rng_state() + elif rng_type == RNGType.HPU: + assert is_hpu_available(), "Can't synchronize HPU seeds on an environment without HPUs." + rng_state = torch.hpu.get_rng_state() elif rng_type == RNGType.GENERATOR: assert generator is not None, "Need a generator to synchronize its seed." rng_state = generator.get_state() @@ -114,6 +120,7 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona or state.distributed_type == DistributedType.MULTI_MUSA or state.distributed_type == DistributedType.MULTI_NPU or state.distributed_type == DistributedType.MULTI_XPU + or state.distributed_type == DistributedType.MULTI_HPU ): rng_state = rng_state.to(state.device) torch.distributed.broadcast(rng_state, 0) @@ -136,6 +143,8 @@ def synchronize_rng_state(rng_type: Optional[RNGType] = None, generator: Optiona torch.musa.set_rng_state(rng_state) elif rng_type == RNGType.XPU: torch.xpu.set_rng_state(rng_state) + elif rng_state == RNGType.HPU: + torch.hpu.set_rng_state(rng_state) elif rng_type == RNGType.XLA: xm.set_rng_state(rng_state.item()) elif rng_type == RNGType.GENERATOR: diff --git a/src/accelerate/utils/transformer_engine.py b/src/accelerate/utils/transformer_engine.py index 0899c463de4..53159a010eb 100644 --- a/src/accelerate/utils/transformer_engine.py +++ b/src/accelerate/utils/transformer_engine.py @@ -16,7 +16,7 @@ import torch.nn as nn -from .imports import is_fp8_available +from .imports import is_fp8_available, is_hpu_available from .operations import GatheredParameters @@ -29,7 +29,11 @@ def convert_model(model, to_transformer_engine=True, _convert_linear=True, _conv """ if not is_fp8_available(): raise ImportError("Using `convert_model` requires transformer_engine to be installed.") - import transformer_engine.pytorch as te + + if is_hpu_available(): + import intel_transformer_engine as te + else: + import transformer_engine.pytorch as te for name, module in model.named_children(): if isinstance(module, nn.Linear) and to_transformer_engine and _convert_linear: @@ -88,11 +92,20 @@ def has_transformer_engine_layers(model): """ if not is_fp8_available(): raise ImportError("Using `has_transformer_engine_layers` requires transformer_engine to be installed.") - import transformer_engine.pytorch as te + + if is_hpu_available(): + import intel_transformer_engine as te + + module_cls_to_check = te.Linear + else: + import transformer_engine.pytorch as te + + module_cls_to_check = (te.LayerNorm, te.Linear, te.TransformerLayer) for m in model.modules(): - if isinstance(m, (te.LayerNorm, te.Linear, te.TransformerLayer)): + if isinstance(m, module_cls_to_check): return True + return False @@ -103,7 +116,11 @@ def contextual_fp8_autocast(model_forward, fp8_recipe, use_during_eval=False): """ if not is_fp8_available(): raise ImportError("Using `contextual_fp8_autocast` requires transformer_engine to be installed.") - from transformer_engine.pytorch import fp8_autocast + + if is_hpu_available(): + from intel_transformer_engine import fp8_autocast + else: + from transformer_engine.pytorch import fp8_autocast def forward(self, *args, **kwargs): enabled = use_during_eval or self.training @@ -122,7 +139,11 @@ def apply_fp8_autowrap(model, fp8_recipe_handler): """ if not is_fp8_available(): raise ImportError("Using `apply_fp8_autowrap` requires transformer_engine to be installed.") - import transformer_engine.common.recipe as te_recipe + + if is_hpu_available(): + import intel_transformer_engine.recipe as te_recipe + else: + import transformer_engine.common.recipe as te_recipe kwargs = fp8_recipe_handler.to_kwargs() if fp8_recipe_handler is not None else {} if "fp8_format" in kwargs: diff --git a/tests/deepspeed/test_deepspeed.py b/tests/deepspeed/test_deepspeed.py index 044f67f5727..b4ec28deaaa 100644 --- a/tests/deepspeed/test_deepspeed.py +++ b/tests/deepspeed/test_deepspeed.py @@ -23,9 +23,6 @@ from parameterized import parameterized from torch.utils.data import BatchSampler, DataLoader, RandomSampler, SequentialSampler from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, get_scheduler -from transformers.testing_utils import mockenv_context -from transformers.trainer_utils import set_seed -from transformers.utils import is_torch_bf16_available from accelerate.accelerator import Accelerator from accelerate.scheduler import AcceleratedScheduler @@ -36,13 +33,15 @@ execute_subprocess_async, path_in_accelerate_package, require_deepspeed, + require_fp16, require_huggingface_suite, require_multi_device, require_non_cpu, + run_first, slow, ) from accelerate.test_utils.training import RegressionDataset, RegressionModel -from accelerate.utils import patch_environment +from accelerate.utils import is_bf16_available, is_fp16_available, patch_environment, set_seed from accelerate.utils.dataclasses import DeepSpeedPlugin from accelerate.utils.deepspeed import ( DeepSpeedEngineWrapper, @@ -80,10 +79,12 @@ optims = [CUSTOM_OPTIMIZER, DS_OPTIMIZER] schedulers = [CUSTOM_SCHEDULER, DS_SCHEDULER] model_types = [NO_CONFIG, CONFIG_WITH_NO_HIDDEN_SIZE, CONFIG_WITH_HIDDEN_SIZE, CONFIG_WITH_HIDDEN_SIZES] -if is_torch_bf16_available(): - dtypes = [FP16, BF16] -else: - dtypes = [FP16] + +dtypes = [] +if is_bf16_available(): + dtypes.append(BF16) +if is_fp16_available(): + dtypes.append(FP16) def parameterized_custom_name_func(func, param_num, param): @@ -242,7 +243,7 @@ def test_deepspeed_plugin(self, stage): deepspeed_plugin.deepspeed_config_process(**kwargs) assert "`optimizer.params.lr` not found in kwargs." in str(cm.exception) - @parameterized.expand([FP16, BF16], name_func=parameterized_custom_name_func) + @parameterized.expand(dtypes, name_func=parameterized_custom_name_func) def test_accelerate_state_deepspeed(self, dtype): AcceleratorState._reset_state(True) deepspeed_plugin = DeepSpeedPlugin( @@ -254,7 +255,7 @@ def test_accelerate_state_deepspeed(self, dtype): zero3_save_16bit_model=True, zero3_init_flag=True, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): state = Accelerator(mixed_precision=dtype, deepspeed_plugin=deepspeed_plugin).state assert state.deepspeed_plugin.deepspeed_config[dtype]["enabled"] @@ -269,13 +270,14 @@ def test_init_zero3(self): zero3_init_flag=True, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin) # noqa: F841 from transformers.integrations import is_deepspeed_zero3_enabled assert is_deepspeed_zero3_enabled() @parameterized.expand(optim_scheduler_params, name_func=parameterized_custom_name_func) + @require_fp16 def test_prepare_deepspeed(self, optim_type, scheduler_type): # 1. Testing with one of the ZeRO Stages is enough to test the `_prepare_deepspeed` function. # Here we test using ZeRO Stage 2 with FP16 enabled. @@ -307,7 +309,7 @@ def test_prepare_deepspeed(self, optim_type, scheduler_type): zero3_save_16bit_model=False, zero3_init_flag=False, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin) train_set = RegressionDataset(length=80) @@ -365,7 +367,7 @@ def test_prepare_deepspeed(self, optim_type, scheduler_type): elif optim_type == DS_OPTIMIZER and scheduler_type == DS_SCHEDULER: # Test DeepSpeed optimizer + DeepSpeed scheduler deepspeed_plugin = DeepSpeedPlugin(hf_ds_config=self.ds_config_file[ZERO2]) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision="fp16") train_set = RegressionDataset(length=80) eval_set = RegressionDataset(length=20) @@ -422,7 +424,7 @@ def test_prepare_deepspeed(self, optim_type, scheduler_type): elif optim_type == CUSTOM_OPTIMIZER and scheduler_type == DS_SCHEDULER: # Test custom optimizer + DeepSpeed scheduler deepspeed_plugin = DeepSpeedPlugin(hf_ds_config=self.ds_config_file[ZERO2]) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision="fp16") train_set = RegressionDataset(length=80) eval_set = RegressionDataset(length=20) @@ -455,7 +457,7 @@ def test_prepare_deepspeed(self, optim_type, scheduler_type): elif optim_type == DS_OPTIMIZER and scheduler_type is CUSTOM_SCHEDULER: # Test deepspeed optimizer + custom scheduler deepspeed_plugin = DeepSpeedPlugin(hf_ds_config=self.ds_config_file[ZERO2]) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision="fp16") train_set = RegressionDataset(length=80) eval_set = RegressionDataset(length=20) @@ -522,7 +524,7 @@ def test_dataloader_with_batch_sampler(self): zero3_save_16bit_model=False, zero3_init_flag=False, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin) train_set = RegressionDataset(length=80) @@ -553,6 +555,7 @@ def test_dataloader_with_batch_sampler(self): in str(cm.exception) ) + @require_fp16 def test_save_checkpoints(self): deepspeed_plugin = DeepSpeedPlugin( hf_ds_config=self.ds_config_file[ZERO3], @@ -574,7 +577,7 @@ def test_save_checkpoints(self): "zero_optimization.stage3_gather_16bit_weights_on_model_save": False, } - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision="fp16") kwargs["train_batch_size"] = ( kwargs["train_micro_batch_size_per_gpu"] @@ -612,7 +615,7 @@ def test_autofill_dsconfig(self): del deepspeed_plugin.deepspeed_config["bf16"] del deepspeed_plugin.deepspeed_config["fp16"] - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin) train_set = RegressionDataset(length=80) eval_set = RegressionDataset(length=20) @@ -643,6 +646,7 @@ def test_autofill_dsconfig(self): assert not config["zero_optimization"]["stage3_gather_16bit_weights_on_model_save"] @parameterized.expand(model_types, name_func=parameterized_custom_name_func) + @require_fp16 def test_autofill_comm_buffers_dsconfig(self, model_type): deepspeed_plugin = DeepSpeedPlugin( hf_ds_config=self.ds_config_file[ZERO3], @@ -652,7 +656,7 @@ def test_autofill_comm_buffers_dsconfig(self, model_type): del deepspeed_plugin.deepspeed_config["fp16"] del deepspeed_plugin.deepspeed_config["optimizer"] del deepspeed_plugin.deepspeed_config["scheduler"] - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin) train_set = RegressionDataset(length=80) eval_set = RegressionDataset(length=20) @@ -698,7 +702,7 @@ def test_autofill_comm_buffers_dsconfig(self, model_type): assert zero_opt["stage3_prefetch_bucket_size"] == int((0.9 * hidden_size) * hidden_size) assert zero_opt["stage3_param_persistence_threshold"] == (10 * hidden_size) - @parameterized.expand([FP16, BF16], name_func=parameterized_custom_name_func) + @parameterized.expand(dtypes, name_func=parameterized_custom_name_func) def test_autofill_dsconfig_from_ds_plugin(self, dtype): ds_config = self.ds_config_dict["zero3"] if dtype == BF16: @@ -724,7 +728,7 @@ def test_autofill_dsconfig_from_ds_plugin(self, dtype): zero3_save_16bit_model=True, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision=dtype) config = accelerator.state.deepspeed_plugin.deepspeed_config assert config["gradient_clipping"] == 1.0 @@ -737,7 +741,7 @@ def test_autofill_dsconfig_from_ds_plugin(self, dtype): AcceleratorState._reset_state(True) diff_dtype = "bf16" if dtype == "fp16" else "fp16" - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): with self.assertRaises(ValueError) as cm: accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision=diff_dtype) assert ( @@ -748,7 +752,7 @@ def test_autofill_dsconfig_from_ds_plugin(self, dtype): # base case of passing in `gradient_accumulation_steps` to `DeepSpeedPlugin` AcceleratorState._reset_state(True) deepspeed_plugin = DeepSpeedPlugin(zero_stage=2, gradient_accumulation_steps=4) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(deepspeed_plugin=deepspeed_plugin, mixed_precision=dtype) deepspeed_plugin = accelerator.state.deepspeed_plugin assert deepspeed_plugin.deepspeed_config["gradient_accumulation_steps"] == 4 @@ -764,7 +768,7 @@ def test_autofill_dsconfig_from_ds_plugin(self, dtype): offload_param_device="cpu", zero3_save_16bit_model=True, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator( deepspeed_plugin=deepspeed_plugin, mixed_precision=dtype, gradient_accumulation_steps=8 ) @@ -787,7 +791,7 @@ def test_ds_config_assertions(self): "gradient_accumulation_steps,gradient_clipping,zero_stage,offload_optimizer_device,offload_param_device,zero3_save_16bit_model,mixed_precision" ) - with mockenv_context(**ambiguous_env): + with patch_environment(**ambiguous_env): with self.assertRaises(ValueError) as cm: deepspeed_plugin = DeepSpeedPlugin( hf_ds_config=self.ds_config_file[ZERO3], @@ -830,7 +834,7 @@ def test_ds_zero3_no_init_autofill(self): hf_ds_config=ds_config, zero3_init_flag=False, ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): _ = Accelerator(deepspeed_plugin=deepspeed_plugin) _ = AutoModelForCausalLM.from_pretrained("gpt2") @@ -842,6 +846,7 @@ def test_ds_config(self, stage): ) assert deepspeed_plugin.zero_stage == int(stage.replace("zero", "")) + @require_fp16 def test_prepare_deepspeed_prepare_moe(self): if compare_versions("transformers", "<", "4.40") and compare_versions("deepspeed", "<", "0.14"): return @@ -855,7 +860,7 @@ def test_prepare_deepspeed_prepare_moe(self): zero3_save_16bit_model=True, transformer_moe_cls_names="Qwen2MoeSparseMoeBlock", ) - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(mixed_precision="fp16", deepspeed_plugin=deepspeed_plugin) accelerator.state.deepspeed_plugin.deepspeed_config["train_micro_batch_size_per_gpu"] = 1 model = AutoModelForCausalLM.from_pretrained(QWEN_MOE) @@ -866,6 +871,8 @@ def test_prepare_deepspeed_prepare_moe(self): if isinstance(module, Qwen2MoeSparseMoeBlock): assert hasattr(module, "_z3_leaf") and module._z3_leaf + @run_first + @require_fp16 def test_basic_run(self): test_file_path = path_in_accelerate_package("test_utils", "scripts", "external_deps", "test_performance.py") with tempfile.TemporaryDirectory() as dirpath: @@ -890,9 +897,10 @@ def test_basic_run(self): execute_subprocess_async(cmd) +@slow +@run_first @require_deepspeed @require_multi_device -@slow class DeepSpeedIntegrationTest(TempDirTestCase): test_scripts_folder = path_in_accelerate_package("test_utils", "scripts", "external_deps") @@ -922,6 +930,7 @@ def setUp(self): self.n_train = 160 self.n_val = 160 + @require_fp16 def test_performance(self): self.test_file_path = self.test_scripts_folder / "test_performance.py" cmd = [ @@ -966,6 +975,7 @@ def test_performance(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd_stage) + @require_fp16 def test_checkpointing(self): self.test_file_path = self.test_scripts_folder / "test_checkpointing.py" cmd = [ @@ -1020,6 +1030,7 @@ def test_checkpointing(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd_stage) + @require_fp16 def test_peak_memory_usage(self): if compare_versions("deepspeed", ">", "0.12.6"): self.skipTest( diff --git a/tests/deepspeed/test_deepspeed_multiple_model.py b/tests/deepspeed/test_deepspeed_multiple_model.py index bddfb3c4b52..bfa7a3e00c6 100644 --- a/tests/deepspeed/test_deepspeed_multiple_model.py +++ b/tests/deepspeed/test_deepspeed_multiple_model.py @@ -29,6 +29,7 @@ require_huggingface_suite, require_multi_device, require_non_cpu, + run_first, slow, ) from accelerate.test_utils.training import RegressionDataset @@ -171,11 +172,12 @@ def test_prepare_multiple_models_zero3_inference(self): assert accelerator.deepspeed_engine_wrapped.engine is model1 + @run_first @require_huggingface_suite @require_multi_device @slow def test_train_multiple_models(self): self.test_file_path = self.test_scripts_folder / "test_ds_multiple_model.py" - args = ["--num_processes=2", "--num_machines=1", "--main_process_port=0", str(self.test_file_path)] + args = ["--num_processes=2", "--num_machines=1", str(self.test_file_path)] args = self.parser.parse_args(args) launch_command(args) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 083d32bfc26..657f6bf3559 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -18,8 +18,6 @@ import torch from transformers import AutoModel -from transformers.testing_utils import mockenv_context -from transformers.trainer_utils import set_seed from accelerate.accelerator import Accelerator from accelerate.state import AcceleratorState @@ -29,12 +27,14 @@ execute_subprocess_async, get_launch_command, path_in_accelerate_package, + require_fp16, require_multi_device, require_non_cpu, require_non_torch_xla, + run_first, slow, ) -from accelerate.utils import patch_environment +from accelerate.utils import is_bf16_available, is_fp16_available, is_hpu_available, patch_environment, set_seed from accelerate.utils.constants import ( FSDP_AUTO_WRAP_POLICY, FSDP_BACKWARD_PREFETCH, @@ -47,11 +47,17 @@ set_seed(42) + BERT_BASE_CASED = "bert-base-cased" LLAMA_TESTING = "hf-internal-testing/tiny-random-LlamaForCausalLM" FP16 = "fp16" BF16 = "bf16" -dtypes = [FP16, BF16] + +dtypes = [] +if is_fp16_available(): + dtypes.append(FP16) +if is_bf16_available(): + dtypes.append(BF16) @require_non_cpu @@ -77,7 +83,7 @@ def test_sharding_strategy(self): for i, strategy in enumerate(FSDP_SHARDING_STRATEGY): env = self.fsdp_env.copy() env["FSDP_SHARDING_STRATEGY"] = f"{i + 1}" - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() assert fsdp_plugin.sharding_strategy == ShardingStrategy(i + 1) fsdp_plugin = FullyShardedDataParallelPlugin(sharding_strategy=ShardingStrategy(i + 1)) @@ -87,7 +93,7 @@ def test_sharding_strategy(self): for i, strategy in enumerate(FSDP_SHARDING_STRATEGY): env = self.fsdp_env.copy() env["FSDP_SHARDING_STRATEGY"] = strategy - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() assert fsdp_plugin.sharding_strategy == ShardingStrategy(i + 1) fsdp_plugin = FullyShardedDataParallelPlugin(sharding_strategy=strategy) @@ -100,7 +106,7 @@ def test_backward_prefetch(self): expected_value = None if prefetch_policy == "NO_PREFETCH" else BackwardPrefetch(i + 1) env = self.fsdp_env.copy() env["FSDP_BACKWARD_PREFETCH"] = prefetch_policy - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() assert ( fsdp_plugin.backward_prefetch == expected_value @@ -121,7 +127,7 @@ def test_state_dict_type(self): for i, state_dict_type in enumerate(FSDP_STATE_DICT_TYPE): env = self.fsdp_env.copy() env["FSDP_STATE_DICT_TYPE"] = state_dict_type - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() assert fsdp_plugin.state_dict_type == StateDictType(i + 1) if state_dict_type == "FULL_STATE_DICT": @@ -158,7 +164,7 @@ def test_auto_wrap_policy(self): env["FSDP_MIN_NUM_PARAMS"] = "2000" min_num_params = 2000 # First test via env - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() fsdp_plugin.set_auto_wrap_policy(model) if policy == "NO_WRAP": @@ -181,7 +187,7 @@ def test_auto_wrap_policy(self): env = self.fsdp_env.copy() env["FSDP_AUTO_WRAP_POLICY"] = "TRANSFORMER_BASED_WRAP" env["FSDP_TRANSFORMER_CLS_TO_WRAP"] = "T5Layer" - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() with self.assertRaises(Exception) as cm: fsdp_plugin.set_auto_wrap_policy(model) @@ -198,7 +204,7 @@ def test_auto_wrap_policy(self): env = self.fsdp_env.copy() env["FSDP_AUTO_WRAP_POLICY"] = "SIZE_BASED_WRAP" env["FSDP_MIN_NUM_PARAMS"] = "0" - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() fsdp_plugin.set_auto_wrap_policy(model) assert fsdp_plugin.auto_wrap_policy is None @@ -217,7 +223,7 @@ def test_mixed_precision(self): for mp_dtype in dtypes: env = self.fsdp_env.copy() env["ACCELERATE_MIXED_PRECISION"] = mp_dtype - with mockenv_context(**env): + with patch_environment(**env): accelerator = Accelerator() if mp_dtype == "fp16": dtype = torch.float16 @@ -234,7 +240,7 @@ def test_mixed_precision(self): mixed_precision_policy={"param_dtype": dtype, "reduce_dtype": dtype, "buffer_dtype": dtype} ) assert plugin.mixed_precision_policy == mp_policy - with mockenv_context(**self.dist_env): + with patch_environment(**self.dist_env): accelerator = Accelerator(fsdp_plugin=plugin) assert accelerator.state.fsdp_plugin.mixed_precision_policy == mp_policy AcceleratorState._reset_state(True) @@ -252,7 +258,7 @@ def test_mixed_precision_buffer_autocast_override(self): env = self.fsdp_env.copy() env["ACCELERATE_MIXED_PRECISION"] = mp_dtype - with mockenv_context(**env): + with patch_environment(**env): accelerator = Accelerator() accelerator.state.fsdp_plugin.set_mixed_precision(dtype, buffer_autocast=True, override=True) assert accelerator.state.fsdp_plugin.mixed_precision_policy == mp_policy @@ -268,7 +274,7 @@ def test_cpu_offload(self): for flag in [True, False]: env = self.fsdp_env.copy() env["FSDP_OFFLOAD_PARAMS"] = str(flag).lower() - with mockenv_context(**env): + with patch_environment(**env): fsdp_plugin = FullyShardedDataParallelPlugin() assert fsdp_plugin.cpu_offload == CPUOffload(offload_params=flag) @@ -286,6 +292,7 @@ def test_cpu_ram_efficient_loading(self): assert os.environ.get("FSDP_CPU_RAM_EFFICIENT_LOADING") == "False" +@run_first # Skip this test when TorchXLA is available because accelerate.launch does not support TorchXLA FSDP. @require_non_torch_xla @require_multi_device @@ -295,7 +302,7 @@ class FSDPIntegrationTest(TempDirTestCase): def setUp(self): super().setUp() - self.performance_lower_bound = 0.82 + self.performance_lower_bound = 0.70 if is_hpu_available() else 0.82 self.performance_configs = [ "fsdp_shard_grad_op_transformer_based_wrap", "fsdp_full_shard_transformer_based_wrap", @@ -311,6 +318,7 @@ def setUp(self): self.n_train = 160 self.n_val = 160 + @require_fp16 def test_performance(self): self.test_file_path = self.test_scripts_folder / "test_performance.py" cmd = get_launch_command(num_processes=2, num_machines=1, machine_rank=0, use_fsdp=True) @@ -346,9 +354,11 @@ def test_performance(self): f"--performance_lower_bound={self.performance_lower_bound}", ] ) + with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd_config) + @require_fp16 def test_checkpointing(self): self.test_file_path = self.test_scripts_folder / "test_checkpointing.py" cmd = get_launch_command( @@ -394,6 +404,7 @@ def test_checkpointing(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd_config) + @require_fp16 def test_peak_memory_usage(self): self.test_file_path = self.test_scripts_folder / "test_peak_memory_usage.py" cmd = get_launch_command(num_processes=2, num_machines=1, machine_rank=0) diff --git a/tests/test_accelerator.py b/tests/test_accelerator.py index 63afe18c4ca..d8cc638b0d9 100644 --- a/tests/test_accelerator.py +++ b/tests/test_accelerator.py @@ -31,9 +31,12 @@ from accelerate.test_utils import ( require_bnb, require_cuda_or_xpu, + require_fp8, + require_fp16, require_huggingface_suite, require_multi_device, require_non_cpu, + require_non_hpu, require_transformer_engine, slow, torch_device, @@ -173,7 +176,7 @@ def test_accelerator_state_after_reset(self): def test_accelerator_can_be_reinstantiated(self): _ = Accelerator() assert PartialState._shared_state["_cpu"] is False - assert PartialState._shared_state["device"].type in ["cuda", "mps", "npu", "xpu", "xla"] + assert PartialState._shared_state["device"].type in ["cuda", "mps", "npu", "xpu", "xla", "hpu"] with self.assertRaises(ValueError): _ = Accelerator(cpu=True) @@ -215,6 +218,7 @@ def test_prepared_objects_are_referenced(self): assert prepared_train_dl in accelerator._dataloaders assert prepared_valid_dl in accelerator._dataloaders + @require_non_hpu # hpu does not support empty_cache def test_free_memory_dereferences_prepared_components(self): accelerator = Accelerator() # Free up refs with empty_cache() and gc.collect() @@ -238,6 +242,7 @@ def test_free_memory_dereferences_prepared_components(self): assert len(accelerator._optimizers) == 0 assert len(accelerator._schedulers) == 0 assert len(accelerator._dataloaders) == 0 + # The less-than comes *specifically* from CUDA CPU things/won't be present on CPU builds assert free_cpu_ram_after <= free_cpu_ram_before @@ -497,6 +502,7 @@ def test_accelerator_bnb_cpu_error(self): model = accelerator.prepare(model) @require_non_torch_xla + @require_non_hpu # bnb is not supported on HPU @slow @require_bnb @require_multi_device @@ -532,9 +538,8 @@ def test_accelerator_bnb_multi_device(self): with self.assertRaises(ValueError): _ = accelerator.prepare(model) - PartialState._reset_state() - @require_non_torch_xla + @require_non_hpu # bnb is not supported on HPU @slow @require_bnb @require_multi_device @@ -566,6 +571,7 @@ def test_accelerator_cpu_flag_prepare(self): accelerator = Accelerator(cpu=True) _ = accelerator.prepare(sgd) + @require_fp8 @require_transformer_engine def test_can_unwrap_model_te(self): model, optimizer, *_ = create_components() @@ -582,6 +588,7 @@ def test_can_unwrap_model_te(self): model_loaded = pickle.loads(pickle.dumps(model)) model_loaded(inputs) + @require_fp16 @require_non_cpu def test_can_unwrap_model_fp16(self): # test for a regression introduced in #872 diff --git a/tests/test_big_modeling.py b/tests/test_big_modeling.py index 40006998945..e4c131dcc9b 100644 --- a/tests/test_big_modeling.py +++ b/tests/test_big_modeling.py @@ -40,16 +40,24 @@ require_multi_device, require_multi_gpu, require_non_cpu, + require_non_hpu, require_non_torch_xla, slow, torch_device, ) -from accelerate.utils import offload_state_dict +from accelerate.utils import is_hpu_available, offload_state_dict logger = logging.getLogger(__name__) torch_device = f"{torch_device}:0" if torch_device != "cpu" else "cpu" +if is_hpu_available(): + ATOL = 1e-4 + RTOL = 1e-4 +else: + ATOL = 1e-5 + RTOL = 1e-5 + class ModelForTest(nn.Module): def __init__(self): @@ -199,14 +207,14 @@ def test_cpu_offload(self): cpu_offload(model, execution_device=device) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) # Clean up for next test. remove_hook_from_submodules(model) cpu_offload(model, execution_device=device, offload_buffers=True) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) def test_cpu_offload_with_unused_submodules(self): model = ModelWithUnusedSubModulesForTest() @@ -217,7 +225,7 @@ def test_cpu_offload_with_unused_submodules(self): cpu_offload(model, execution_device=device, preload_module_classes=["ModuleWithUnusedSubModules"]) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) # Clean up for next test. remove_hook_from_submodules(model) @@ -229,7 +237,7 @@ def test_cpu_offload_with_unused_submodules(self): preload_module_classes=["ModuleWithUnusedSubModules"], ) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @slow @require_non_cpu @@ -252,7 +260,7 @@ def test_disk_offload(self): with TemporaryDirectory() as tmp_dir: disk_offload(model, tmp_dir, execution_device=device) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) # Clean up for next test. remove_hook_from_submodules(model) @@ -260,7 +268,7 @@ def test_disk_offload(self): with TemporaryDirectory() as tmp_dir: disk_offload(model, tmp_dir, execution_device=device, offload_buffers=True) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) def test_disk_offload_with_unused_submodules(self): model = ModelWithUnusedSubModulesForTest() @@ -274,7 +282,7 @@ def test_disk_offload_with_unused_submodules(self): model, tmp_dir, execution_device=device, preload_module_classes=["ModuleWithUnusedSubModules"] ) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) # Clean up for next test. remove_hook_from_submodules(model) @@ -288,7 +296,7 @@ def test_disk_offload_with_unused_submodules(self): preload_module_classes=["ModuleWithUnusedSubModules"], ) output = model(x) - assert torch.allclose(expected, output.cpu(), 1e-4, 1e-5), f"Expected: {expected}, Actual: {output.cpu()}" + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @slow @require_non_cpu @@ -325,8 +333,8 @@ def test_dispatch_model_and_remove_hook(self): cm.records[0].message, ) output_bis = model(x.to(torch_device)) - assert torch.allclose(expected, output.cpu(), atol=1e-5) - assert torch.allclose(expected, output_bis.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) + torch.testing.assert_close(expected, output_bis.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model(self): @@ -339,7 +347,7 @@ def test_dispatch_model(self): with TemporaryDirectory() as tmp_dir: dispatch_model(model, device_map, offload_dir=tmp_dir) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model_with_non_persistent_buffers(self): @@ -351,7 +359,7 @@ def test_dispatch_model_with_non_persistent_buffers(self): with TemporaryDirectory() as tmp_dir: dispatch_model(model, device_map, offload_dir=tmp_dir, offload_buffers=True) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model_tied_weights(self): @@ -412,7 +420,7 @@ def test_dispatch_model_tied_weights_memory(self): with torch.no_grad(): output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_cuda def test_dispatch_model_tied_weights_memory_with_nested_offload_cpu(self): @@ -491,7 +499,7 @@ def forward(self, x): except Exception as e: raise e - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) torch.cuda.empty_cache() @@ -587,7 +595,7 @@ def forward(self, x): except Exception as e: raise e - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) torch.cuda.empty_cache() @@ -608,9 +616,11 @@ def forward(self, x): assert (free_memory_bytes_after_infer - free_memory_bytes_after_dispatch) * 1e-6 < 130 + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_dispatch_model_multi_devices(self): model = BiggerModelForTest() + device_map = {"linear1": "cpu", "linear2": "disk", "batchnorm": "cpu", "linear3": 0, "linear4": 1} x = torch.randn(2, 3) @@ -619,7 +629,7 @@ def test_dispatch_model_multi_devices(self): with TemporaryDirectory() as tmp_dir: dispatch_model(model, device_map, offload_dir=tmp_dir) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model_copy(self): @@ -638,7 +648,7 @@ def test_dispatch_model_copy(self): assert original_model.id == original_output_id assert copied_model.id == copied_output_id assert copied_model.linear1.forward is not original_model.linear1.forward - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model_move_offloaded_model(self): @@ -649,6 +659,7 @@ def test_dispatch_model_move_offloaded_model(self): with self.assertRaises(RuntimeError): model.to(0) + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_dispatch_model_move_model_warning(self): model = ModelForTest() @@ -664,6 +675,7 @@ def test_dispatch_model_move_model_warning(self): model(x) @slow + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_dispatch_model_gpt2_on_two_devices(self): tokenizer = AutoTokenizer.from_pretrained("gpt2") @@ -718,11 +730,13 @@ def test_dispatch_model_with_unused_submodules(self): model, device_map, offload_dir=tmp_dir, preload_module_classes=["ModuleWithUnusedSubModules"] ) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_dispatch_model_with_unused_submodules_multi_device(self): model = ModelWithUnusedSubModulesForTest() + device_map = {"linear1": "cpu", "linear2": "disk", "batchnorm": "cpu", "linear3": 0, "linear4": 1} x = torch.randn(2, 3) @@ -733,7 +747,7 @@ def test_dispatch_model_with_unused_submodules_multi_device(self): model, device_map, offload_dir=tmp_dir, preload_module_classes=["ModuleWithUnusedSubModules"] ) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_dispatch_model_force_hooks(self): @@ -745,7 +759,7 @@ def test_dispatch_model_force_hooks(self): dispatch_model(model, device_map, force_hooks=True) output = model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_load_checkpoint_and_dispatch(self): @@ -767,11 +781,13 @@ def test_load_checkpoint_and_dispatch(self): assert new_model.linear2.weight.device == torch.device(torch_device) output = new_model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_load_checkpoint_and_dispatch_multi_device(self): model = BiggerModelForTest() + device_map = {"linear1": "cpu", "linear2": "cpu", "batchnorm": 0, "linear3": 0, "linear4": 1} x = torch.randn(2, 3) @@ -791,7 +807,7 @@ def test_load_checkpoint_and_dispatch_multi_device(self): assert new_model.linear4.weight.device == torch.device(torch_device.replace(":0", ":1")) output = new_model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_load_checkpoint_and_dispatch_with_unused_submodules(self): @@ -817,11 +833,13 @@ def test_load_checkpoint_and_dispatch_with_unused_submodules(self): assert new_model.linear4.linear.weight.device == torch.device(torch_device) output = new_model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_load_checkpoint_and_dispatch_multi_device_with_unused_submodules(self): model = ModelWithUnusedSubModulesForTest() + device_map = {"linear1": "cpu", "linear2": "cpu", "batchnorm": 0, "linear3": 0, "linear4": 1} x = torch.randn(2, 3) @@ -843,7 +861,7 @@ def test_load_checkpoint_and_dispatch_multi_device_with_unused_submodules(self): assert new_model.linear4.linear.weight.device == torch.device(torch_device.replace(":0", ":1")) output = new_model(x) - assert torch.allclose(expected, output.cpu(), atol=1e-5) + torch.testing.assert_close(expected, output.cpu(), atol=ATOL, rtol=RTOL) @require_non_cpu def test_cpu_offload_with_hook(self): @@ -875,9 +893,10 @@ def test_cpu_offload_with_hook(self): hook2.offload() assert model2.weight.device == torch.device("cpu") - @require_non_torch_xla @slow @require_bnb + @require_non_hpu # bnb is not supported on hpu + @require_non_torch_xla @require_multi_device def test_dispatch_model_bnb(self): """Tests that `dispatch_model` quantizes int8 layers""" diff --git a/tests/test_cli.py b/tests/test_cli.py index 28945a23515..503c2605fc9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -28,9 +28,11 @@ capture_call_output, path_in_accelerate_package, require_multi_device, + require_non_hpu, require_timm, require_transformers, run_command, + run_first, ) from accelerate.utils import patch_environment from accelerate.utils.launch import prepare_simple_launcher_cmd_env @@ -64,6 +66,7 @@ def tearDownClass(cls): if cls.changed_path.is_file(): cls.changed_path.rename(cls.config_path) + @run_first def test_no_config(self): args = ["--monitor_interval", "0.1", str(self.test_file_path)] if torch.cuda.is_available() and (torch.cuda.device_count() > 1): @@ -71,6 +74,7 @@ def test_no_config(self): args = self.parser.parse_args(["--monitor_interval", "0.1", str(self.test_file_path)]) launch_command(args) + @run_first def test_config_compatibility(self): invalid_configs = ["fp8", "invalid", "mpi", "sagemaker"] for config in sorted(self.test_config_path.glob("**/*.yaml")): @@ -80,6 +84,7 @@ def test_config_compatibility(self): args = self.parser.parse_args(["--config_file", str(config), str(self.test_file_path)]) launch_command(args) + @run_first def test_invalid_keys(self): config_path = self.test_config_path / "invalid_keys.yaml" with self.assertRaises( @@ -89,10 +94,13 @@ def test_invalid_keys(self): args = self.parser.parse_args(["--config_file", str(config_path), str(self.test_file_path)]) launch_command(args) + @run_first def test_accelerate_test(self): args = accelerate_test_cmd.test_command_parser().parse_args([]) accelerate_test_cmd.test_command(args) + @run_first + @require_non_hpu @require_multi_device def test_notebook_launcher(self): """ @@ -206,7 +214,7 @@ def test_duplicate_entities(self): args = self.parser.parse_args(["test.py"]) for arg in args.__dict__: if "_" in arg: - bad_arg = f'--{arg.replace("_", "-")}' + bad_arg = f"--{arg.replace('_', '-')}" # Need an exception for `num-processes` since it's in the docstring if bad_arg == "--num-processes": assert help_return.count(bad_arg) == 1, f"Found {bad_arg} in `accelerate launch -h`" diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index 4763b9f6cf8..9378cdbb673 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -13,7 +13,6 @@ # limitations under the License. import random -import unittest import weakref import pytest @@ -34,7 +33,7 @@ skip_first_batches, ) from accelerate.state import GradientState -from accelerate.test_utils.testing import require_torchdata_stateful_dataloader +from accelerate.test_utils.testing import AccelerateTestCase, require_torchdata_stateful_dataloader from accelerate.utils import is_torchdata_stateful_dataloader_available @@ -96,7 +95,7 @@ def set_epoch(self, epoch): self.epoch = epoch -class DataLoaderTester(unittest.TestCase): +class DataLoaderTester(AccelerateTestCase): def check_batch_sampler_shards(self, batch_sampler, expected, split_batches=False, even_batches=True): batch_sampler_shards = [ BatchSamplerShard(batch_sampler, 2, i, split_batches=split_batches, even_batches=even_batches) @@ -529,7 +528,7 @@ def __call__(self, *args, **kwds): assert gradient_state_ref() is None -class StatefulDataLoaderTester(unittest.TestCase): +class StatefulDataLoaderTester(AccelerateTestCase): @require_torchdata_stateful_dataloader def test_skip_data_loader(self): dataloader = SkipDataLoader(list(range(16)), batch_size=4, skip_batches=2, use_stateful_dataloader=True) diff --git a/tests/test_examples.py b/tests/test_examples.py index b3d207e01c0..18c246dedce 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -19,7 +19,7 @@ import tempfile import unittest from pathlib import Path -from unittest import mock, skip +from unittest import mock import torch @@ -27,14 +27,16 @@ from accelerate.test_utils.testing import ( TempDirTestCase, get_launch_command, + is_hpu_available, + require_fp16, require_huggingface_suite, require_multi_device, - require_multi_gpu, require_non_xpu, require_pippy, require_schedulefree, require_trackers, run_command, + run_first, slow, ) from accelerate.utils import write_basic_config @@ -147,6 +149,7 @@ def test_cv_examples(self): @mock.patch.dict(os.environ, {"TESTING_MOCKED_DATALOADERS": "1"}) @require_huggingface_suite +@run_first class FeatureExamplesTests(TempDirTestCase): clear_on_setup = False @@ -197,10 +200,13 @@ def test_load_states_by_steps(self): --resume_from_checkpoint {self.tmpdir / "step_2"} """.split() output = run_command(self.launch_args + testargs, return_stdout=True) - if torch.cuda.is_available(): + if is_hpu_available(): + num_processes = torch.hpu.device_count() + elif torch.cuda.is_available(): num_processes = torch.cuda.device_count() else: num_processes = 1 + if num_processes > 1: assert "epoch 0:" not in output assert "epoch 1:" in output @@ -266,34 +272,34 @@ def test_profiler(self): testargs = ["examples/by_feature/profiler.py"] run_command(self.launch_args + testargs) + @require_fp16 @require_multi_device def test_ddp_comm_hook(self): testargs = ["examples/by_feature/ddp_comm_hook.py", "--ddp_comm_hook", "fp16"] run_command(self.launch_args + testargs) - @skip( - reason="stable-diffusion-v1-5 is no longer available. Potentially `Comfy-Org/stable-diffusion-v1-5-archive` once diffusers support is added." - ) + @require_fp16 @require_multi_device def test_distributed_inference_examples_stable_diffusion(self): testargs = ["examples/inference/distributed/stable_diffusion.py"] run_command(self.launch_args + testargs) + @require_fp16 @require_multi_device def test_distributed_inference_examples_phi2(self): testargs = ["examples/inference/distributed/phi2.py"] run_command(self.launch_args + testargs) - @require_non_xpu @require_pippy - @require_multi_gpu + @require_non_xpu + @require_multi_device def test_pippy_examples_bert(self): testargs = ["examples/inference/pippy/bert.py"] run_command(self.launch_args + testargs) - @require_non_xpu @require_pippy - @require_multi_gpu + @require_non_xpu + @require_multi_device def test_pippy_examples_gpt2(self): testargs = ["examples/inference/pippy/gpt2.py"] run_command(self.launch_args + testargs) diff --git a/tests/test_fp8.py b/tests/test_fp8.py index 7e3814c35f2..66645fd8559 100644 --- a/tests/test_fp8.py +++ b/tests/test_fp8.py @@ -23,10 +23,13 @@ from accelerate.test_utils import ( get_launch_command, require_cuda, + require_cuda_or_hpu, require_huggingface_suite, + require_multi_device, require_multi_gpu, require_torchao, require_transformer_engine, + run_first, ) from accelerate.test_utils.testing import require_deepspeed, run_command from accelerate.utils import ( @@ -71,22 +74,23 @@ def can_convert_ao_model(): assert has_ao_layers(model) +@run_first +@require_cuda_or_hpu @require_transformer_engine class TestTransformerEngine(unittest.TestCase): - @require_cuda def test_can_prepare_model_single_gpu(self): command = get_launch_command(num_processes=1, monitor_interval=0.1) command += ["-m", "tests.test_fp8"] run_command(command) - @require_multi_gpu + @require_multi_device def test_can_prepare_model_multi_gpu(self): command = get_launch_command(num_processes=2, monitor_interval=0.1) command += ["-m", "tests.test_fp8"] run_command(command) @require_deepspeed - @require_multi_gpu + @require_multi_device def test_can_prepare_model_multigpu_deepspeed(self): for zero_stage in [1, 2, 3]: os.environ["ZERO_STAGE"] = str(zero_stage) diff --git a/tests/test_grad_sync.py b/tests/test_grad_sync.py index 26b00333c69..c89f6e3a74d 100644 --- a/tests/test_grad_sync.py +++ b/tests/test_grad_sync.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest - from accelerate import debug_launcher from accelerate.test_utils import ( DEFAULT_LAUNCH_COMMAND, @@ -23,12 +21,14 @@ require_cpu, require_multi_device, require_non_cpu, + run_first, test_sync, ) +from accelerate.test_utils.testing import AccelerateTestCase from accelerate.utils import patch_environment -class SyncScheduler(unittest.TestCase): +class SyncScheduler(AccelerateTestCase): test_file_path = path_in_accelerate_package("test_utils", "scripts", "test_sync.py") @require_cpu @@ -43,6 +43,7 @@ def test_gradient_sync_cpu_multi(self): def test_gradient_sync_gpu(self): test_sync.main() + @run_first @require_multi_device def test_gradient_sync_gpu_multi(self): print(f"Found {device_count} devices.") diff --git a/tests/test_hooks.py b/tests/test_hooks.py index 13e83b7d9a7..306d18edf52 100644 --- a/tests/test_hooks.py +++ b/tests/test_hooks.py @@ -28,7 +28,7 @@ remove_hook_from_module, remove_hook_from_submodules, ) -from accelerate.test_utils import require_multi_device, torch_device +from accelerate.test_utils import require_multi_device, require_non_hpu, torch_device torch_device = f"{torch_device}:0" if torch_device != "cpu" else "cpu" @@ -153,6 +153,7 @@ def test_no_grad_in_hook(self): output1 = test_model(x) assert not output1.requires_grad + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_align_devices_as_model_parallelism(self): model = ModelForTest() diff --git a/tests/test_kwargs_handlers.py b/tests/test_kwargs_handlers.py index d0415dab4bb..a6239af6df4 100644 --- a/tests/test_kwargs_handlers.py +++ b/tests/test_kwargs_handlers.py @@ -14,7 +14,6 @@ import inspect import os -import unittest from dataclasses import dataclass import torch @@ -25,11 +24,19 @@ DEFAULT_LAUNCH_COMMAND, execute_subprocess_async, path_in_accelerate_package, + require_fp16, require_multi_device, require_non_cpu, + run_first, +) +from accelerate.test_utils.testing import AccelerateTestCase, slow +from accelerate.utils import ( + AutocastKwargs, + KwargsHandler, + ProfileKwargs, + TorchDynamoPlugin, + clear_environment, ) -from accelerate.test_utils.testing import slow -from accelerate.utils import AutocastKwargs, KwargsHandler, ProfileKwargs, TorchDynamoPlugin, clear_environment from accelerate.utils.dataclasses import DistributedType @@ -40,7 +47,7 @@ class MockClass(KwargsHandler): c: float = 3.0 -class KwargsHandlerTester(unittest.TestCase): +class KwargsHandlerTester(AccelerateTestCase): def test_kwargs_handler(self): # If no defaults are changed, `to_kwargs` returns an empty dict. assert MockClass().to_kwargs() == {} @@ -48,6 +55,7 @@ def test_kwargs_handler(self): assert MockClass(a=2, b=True).to_kwargs() == {"a": 2, "b": True} assert MockClass(a=2, c=2.25).to_kwargs() == {"a": 2, "c": 2.25} + @require_fp16 @require_non_cpu def test_grad_scaler_kwargs(self): # If no defaults are changed, `to_kwargs` returns an empty dict. @@ -66,11 +74,13 @@ def test_grad_scaler_kwargs(self): assert scaler._growth_interval == 2000 assert scaler._enabled is True + @run_first @require_multi_device def test_ddp_kwargs(self): cmd = DEFAULT_LAUNCH_COMMAND + [inspect.getfile(self.__class__)] execute_subprocess_async(cmd) + @require_fp16 @require_non_cpu def test_autocast_kwargs(self): kwargs = AutocastKwargs(enabled=False) @@ -153,6 +163,7 @@ def test_torch_dynamo_plugin(self): assert dynamo_plugin_kwargs == {"backend": "aot_ts_nvfuser", "mode": "reduce-overhead"} assert os.environ.get(prefix + "BACKEND") != "aot_ts_nvfuser" + @run_first @require_multi_device def test_ddp_comm_hook(self): cmd = DEFAULT_LAUNCH_COMMAND + [path_in_accelerate_package("test_utils", "scripts", "test_ddp_comm_hook.py")] diff --git a/tests/test_logging.py b/tests/test_logging.py index a91c609ddc0..f1f6948da98 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -19,6 +19,7 @@ from accelerate import Accelerator from accelerate.logging import get_logger +from accelerate.state import AcceleratorState def current_lineno() -> int: @@ -41,7 +42,9 @@ def log(self, level, msg, *args, **kwargs): @pytest.fixture(scope="module") def accelerator(): - return Accelerator() + accelerator = Accelerator() + yield accelerator + AcceleratorState._reset_state(True) @pytest.mark.usefixtures("accelerator") diff --git a/tests/test_memory_utils.py b/tests/test_memory_utils.py index 4b8f74f19c0..4f5ca529a81 100644 --- a/tests/test_memory_utils.py +++ b/tests/test_memory_utils.py @@ -16,7 +16,12 @@ from torch import nn -from accelerate.test_utils import memory_allocated_func, require_non_cpu, require_non_torch_xla, torch_device +from accelerate.test_utils import ( + memory_allocated_func, + require_non_cpu, + require_non_torch_xla, + torch_device, +) from accelerate.utils.memory import find_executable_batch_size, release_memory @@ -35,6 +40,15 @@ def forward(self, x): return self.linear2(self.batchnorm(self.linear1(x))) +class BigModelForTest(ModelForTest): + def __init__(self): + super().__init__() + self.linear3 = nn.Linear(5, 1000) + + def forward(self, x): + return self.linear3(super().forward(x)) + + class MemoryTest(unittest.TestCase): def test_memory_implicit(self): batch_sizes = [] @@ -108,7 +122,14 @@ def mock_training_loop_function(batch_size): @require_non_torch_xla def test_release_memory(self): starting_memory = memory_allocated_func() - model = ModelForTest() + + if torch_device.startswith("hpu"): + # hpu has a minimum memory allocation that cannot be released, + # we need to surpass it by using a bigger model (>5767296 bytes) + model = BigModelForTest() + else: + model = ModelForTest() + model.to(torch_device) assert memory_allocated_func() > starting_memory model = release_memory(model) diff --git a/tests/test_metrics.py b/tests/test_metrics.py index d953e2edee7..c42acdb52ae 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -27,6 +27,7 @@ require_huggingface_suite, require_multi_device, require_single_device, + run_first, ) from accelerate.utils import patch_environment @@ -53,6 +54,7 @@ def test_metric_cpu_multi(self): def test_metric_accelerator(self): self.test_metrics.main() + @run_first @require_multi_device def test_metric_accelerator_multi(self): print(f"Found {device_count} devices.") diff --git a/tests/test_modeling_utils.py b/tests/test_modeling_utils.py index e436917414d..724a59f83d6 100644 --- a/tests/test_modeling_utils.py +++ b/tests/test_modeling_utils.py @@ -31,6 +31,7 @@ require_huggingface_suite, require_multi_device, require_non_cpu, + require_non_hpu, torch_device, ) from accelerate.utils.modeling import ( @@ -181,6 +182,7 @@ def test_set_module_tensor_to_meta_and_gpu(self): model = ModelForTest().to(torch_device) self.check_set_module_tensor_for_device(model, torch_device, "meta") + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_set_module_tensor_between_gpus(self): model = ModelForTest().to(torch_device) @@ -447,6 +449,7 @@ def test_load_checkpoint_in_model_disk_offload(self): assert model.batchnorm.running_mean.device == torch.device("meta") assert model.linear2.weight.device == torch.device("cpu") + @require_non_hpu # hpu does not support device indexing "hpu:1" @require_multi_device def test_load_checkpoint_in_model_two_gpu(self): device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": 1} diff --git a/tests/test_multigpu.py b/tests/test_multigpu.py index ccb72c80364..bda1178d12b 100644 --- a/tests/test_multigpu.py +++ b/tests/test_multigpu.py @@ -28,14 +28,15 @@ path_in_accelerate_package, require_huggingface_suite, require_multi_device, - require_multi_gpu, + require_non_hpu, require_non_torch_xla, require_non_xpu, require_pippy, require_torchvision, + run_first, torch_device, ) -from accelerate.utils import patch_environment +from accelerate.utils import is_hpu_available, patch_environment class MultiDeviceTester(unittest.TestCase): @@ -45,6 +46,7 @@ class MultiDeviceTester(unittest.TestCase): pippy_file_path = path_in_accelerate_package("test_utils", "scripts", "external_deps", "test_pippy.py") merge_weights_file_path = path_in_accelerate_package("test_utils", "scripts", "test_merge_weights.py") + @run_first @require_multi_device def test_multi_device(self): print(f"Found {device_count} devices.") @@ -52,6 +54,7 @@ def test_multi_device(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd) + @run_first @require_multi_device def test_multi_device_ops(self): print(f"Found {device_count} devices.") @@ -59,6 +62,7 @@ def test_multi_device_ops(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd) + @run_first @require_multi_device def test_pad_across_processes(self): print(f"Found {device_count} devices.") @@ -66,13 +70,18 @@ def test_pad_across_processes(self): with patch_environment(omp_num_threads=1): execute_subprocess_async(cmd) + @run_first + @require_non_hpu # Synapse detected a device critical error that requires a restart @require_multi_device def test_multi_device_merge_fsdp_weights(self): print(f"Found {device_count} devices.") cmd = DEFAULT_LAUNCH_COMMAND + [self.merge_weights_file_path] - with patch_environment(omp_num_threads=1): + + env_kwargs = dict(omp_num_threads=1) + with patch_environment(**env_kwargs): execute_subprocess_async(cmd) + @run_first @require_non_torch_xla @require_multi_device def test_distributed_data_loop(self): @@ -82,6 +91,7 @@ def test_distributed_data_loop(self): """ print(f"Found {device_count} devices, using 2 devices only") cmd = get_launch_command(num_processes=2) + [self.data_loop_file_path] + env_kwargs = dict(omp_num_threads=1) if torch_device == "xpu": env_kwargs.update(ze_affinity_mask="0,1") @@ -93,13 +103,15 @@ def test_distributed_data_loop(self): env_kwargs.update(sdaa_visible_devices="0,1") else: env_kwargs.update(cuda_visible_devices="0,1") + with patch_environment(**env_kwargs): execute_subprocess_async(cmd) - @require_non_xpu - @require_multi_gpu + @run_first @require_pippy + @require_non_xpu @require_torchvision + @require_multi_device @require_huggingface_suite def test_pippy(self): """ @@ -121,12 +133,13 @@ def test_pippy(self): tensor1 = accelerator.pad_across_processes(tensor) if tensor1.shape[0] != accelerator.state.num_processes + 1: error_msg += f"Found shape {tensor1.shape} but should have {accelerator.state.num_processes + 1} at dim 0." - if not torch.equal(tensor1[: accelerator.state.process_index + 2], tensor): + index = accelerator.state.process_index + 2 + if not torch.equal(tensor1[:index], tensor): error_msg += "Tensors have different values." - if not torch.all(tensor1[accelerator.state.process_index + 2 :] == 0): + if not torch.all(tensor1[index:] == 0): error_msg += "Padding was not done with the right value (0)." - tensor2 = accelerator.pad_across_processes(tensor, pad_first=True) + tensor2 = accelerator.pad_across_processes(tensor.clone(), pad_first=True) if tensor2.shape[0] != accelerator.state.num_processes + 1: error_msg += f"Found shape {tensor2.shape} but should have {accelerator.state.num_processes + 1} at dim 0." index = accelerator.state.num_processes - accelerator.state.process_index - 1 @@ -152,7 +165,11 @@ def __init__(self): def forward(self, x): return self.linear2(self.batchnorm(self.linear1(x))) - device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": 1} + if is_hpu_available(): + device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": 0} + else: + device_map = {"linear1": 0, "batchnorm": "cpu", "linear2": 1} + model = ModelForTest() dispatch_model(model, device_map=device_map) with assert_exception(ValueError, "You can't train a model that has been loaded with"): diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index 4f74f5ded44..8bb324f0ec5 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -13,17 +13,16 @@ # limitations under the License. import pickle -import unittest import torch from accelerate import Accelerator -from accelerate.state import AcceleratorState -from accelerate.test_utils import require_cpu, require_non_cpu +from accelerate.test_utils import require_cpu, require_fp16, require_non_cpu +from accelerate.test_utils.testing import AccelerateTestCase @require_cpu -class CPUOptimizerTester(unittest.TestCase): +class CPUOptimizerTester(AccelerateTestCase): def test_accelerated_optimizer_pickling(self): model = torch.nn.Linear(10, 10) optimizer = torch.optim.SGD(model.parameters(), 0.1) @@ -33,11 +32,11 @@ def test_accelerated_optimizer_pickling(self): pickle.loads(pickle.dumps(optimizer)) except Exception as e: self.fail(f"Accelerated optimizer pickling failed with {e}") - AcceleratorState._reset_state() +@require_fp16 @require_non_cpu -class OptimizerTester(unittest.TestCase): +class OptimizerTester(AccelerateTestCase): def test_accelerated_optimizer_step_was_skipped(self): model = torch.nn.Linear(5, 5) optimizer = torch.optim.SGD(model.parameters(), 0.1) @@ -81,5 +80,3 @@ def test_accelerated_optimizer_step_was_skipped(self): optimizer.step() assert optimizer.step_was_skipped is False - - AcceleratorState._reset_state() diff --git a/tests/test_quantization.py b/tests/test_quantization.py index 602118fa08d..9a0a6839fa7 100644 --- a/tests/test_quantization.py +++ b/tests/test_quantization.py @@ -28,6 +28,7 @@ require_non_torch_xla, slow, ) +from accelerate.test_utils.testing import AccelerateTestCase from accelerate.utils.bnb import load_and_quantize_model from accelerate.utils.dataclasses import BnbQuantizationConfig from accelerate.utils.memory import clear_device_cache @@ -44,7 +45,7 @@ def test_BnbQuantizationConfig(self): @require_cuda_or_xpu @require_bnb @require_huggingface_suite -class MixedInt8EmptyModelTest(unittest.TestCase): +class MixedInt8EmptyModelTest(AccelerateTestCase): # We keep the constants inside the init function and model loading inside setUp function # We need to test on relatively large models (aka >1b parameters otherwise the quantiztion may not work as expected) diff --git a/tests/test_state_checkpointing.py b/tests/test_state_checkpointing.py index ed50eb7e7fb..160e0a25483 100644 --- a/tests/test_state_checkpointing.py +++ b/tests/test_state_checkpointing.py @@ -18,7 +18,6 @@ import random import shutil import tempfile -import unittest import uuid from contextlib import contextmanager @@ -34,8 +33,10 @@ execute_subprocess_async, require_non_cpu, require_non_torch_xla, + run_first, ) -from accelerate.utils import DistributedType, ProjectConfiguration, set_seed +from accelerate.test_utils.testing import AccelerateTestCase +from accelerate.utils import DistributedType, ProjectConfiguration, patch_environment, set_seed logger = logging.getLogger(__name__) @@ -94,7 +95,7 @@ def parameterized_custom_name_func(func, param_num, param): @parameterized_class(("use_safetensors",), [[True], [False]], class_name_func=parameterized_custom_name_func) -class CheckpointTest(unittest.TestCase): +class CheckpointTest(AccelerateTestCase): def check_adam_state(self, state1, state2, distributed_type): # For DistributedType.XLA, the `accelerator.save_state` function calls `xm._maybe_convert_to_cpu` before saving. # As a result, all tuple values are converted to lists. Therefore, we need to convert them back here. @@ -375,18 +376,15 @@ def test_checkpoint_deletion(self): assert os.path.exists(os.path.join(tmpdir, "checkpoints", "checkpoint_9")) assert os.path.exists(os.path.join(tmpdir, "checkpoints", "checkpoint_10")) + @run_first @require_non_cpu @require_non_torch_xla def test_map_location(self): cmd = DEFAULT_LAUNCH_COMMAND + [inspect.getfile(self.__class__)] - execute_subprocess_async( - cmd, - env={ - **os.environ, - "USE_SAFETENSORS": str(self.use_safetensors), - "OMP_NUM_THREADS": "1", - }, - ) + + env_kwargs = dict(use_safe_tensors=str(self.use_safetensors), omp_num_threads="1") + with patch_environment(**env_kwargs): + execute_subprocess_async(cmd) if __name__ == "__main__": diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index a1125a9262a..fc7fb54e2e8 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -22,15 +22,17 @@ require_non_torch_xla, require_tp, require_transformers, + run_first, slow, ) from accelerate.utils import patch_environment @require_non_torch_xla -@require_tp @require_multi_device @require_transformers +@require_tp +@run_first @slow class TPIntegrationTest(TempDirTestCase): test_scripts_folder = path_in_accelerate_package("test_utils", "scripts", "external_deps")