-
Notifications
You must be signed in to change notification settings - Fork 49
Enhance memory management for CUDA and NCCL #571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Implements comprehensive CUDA memory management and NCCL error handling to resolve Out of Memory (OOM) crashes during 70B model evaluation and distributed checkpoint operations.
WalkthroughAdds environment defaults for CUDA/NCCL, introduces a GPU memory check/cleanup helper, integrates memory hygiene and OOM/NCCL handling across evaluation, gradient application, and checkpoint save/load, and adds detailed memory logging. Minor non-semantic reformatting is included. All changes are confined to neurons/validator.py. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant V as Validator
participant GPU as CUDA Device
participant Log as Logger
rect rgba(230,245,255,0.5)
note over V,GPU: Baseline Evaluation Gate
V->>GPU: _check_memory_and_cleanup("baseline")
alt sufficient memory
V->>V: run baseline eval
else insufficient
V->>Log: warn "skip baseline"
V-->>V: continue without baseline
end
end
loop For each UID
rect rgba(240,255,230,0.5)
V->>GPU: _check_memory_and_cleanup("uid")
alt sufficient memory
V->>V: evaluate UID
alt CUDA OOM during eval
V->>GPU: empty_cache + GC
V->>V: retry once
alt retry success
V->>Log: info "retry succeeded"
else retry OOM
V->>Log: warn "skip UID"
end
end
V->>GPU: post-step cleanup (empty_cache + GC)
else insufficient
V->>Log: warn "skip UID"
end
end
end
sequenceDiagram
autonumber
participant V as Validator
participant FS as Filesystem/Checkpoint
participant GPU as CUDA Device
participant Log as Logger
rect rgba(255,245,230,0.5)
note over V,FS: Save FSDP Checkpoint
V->>GPU: _check_memory_and_cleanup("save")
alt sufficient
V->>FS: save local shard
alt save/upload OOM/NCCL
V->>GPU: cleanup (empty_cache + GC)
V->>Log: warn "skip checkpoint"
else success
V->>Log: info "checkpoint saved"
end
else insufficient
V->>Log: warn "skip checkpoint (mem)"
end
end
rect rgba(255,250,230,0.5)
note over V,FS: Load Checkpoint
V->>GPU: cleanup + _check_memory_and_cleanup("load")
alt sufficient
V->>FS: load checkpoint
alt load OOM/NCCL
V->>GPU: aggressive cleanup
V->>FS: retry load once
alt retry fails
V->>Log: warn "continue without checkpoint"
else retry success
V->>Log: info "loaded"
end
end
else insufficient
V->>Log: error "insufficient memory"
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
neurons/validator.py (1)
1697-1698: Guard unconditional CUDA cache calls.
torch.cuda.empty_cache()on CPU‑only builds raises. Wrap withtorch.cuda.is_available()everywhere it’s unguarded.Examples:
- torch.cuda.empty_cache() + if torch.cuda.is_available(): + torch.cuda.empty_cache()Apply at the listed locations.
Also applies to: 1811-1812, 1958-1959, 2351-2352, 2407-2408, 3158-3159, 3211-3212
🧹 Nitpick comments (1)
neurons/validator.py (1)
161-166: Minor: avoid repeated inline gc imports.Import
gconce at module scope and reuse to reduce import churn.Also applies to: 177-182
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
neurons/validator.py(17 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
neurons/validator.py (4)
src/tplr/logging.py (2)
log_with_context(207-223)log_with_context(290-309)src/tplr/sharded_sampler.py (1)
set_window_uid(74-79)neurons/trainer.py (1)
evaluate_model(340-392)src/tplr/dcp_checkpoint.py (3)
save_local_async(222-268)upload(271-404)download_and_load(657-680)
🪛 GitHub Actions: CI
neurons/validator.py
[error] 69-69: I001 Import block is un-sorted or un-formatted.
🔇 Additional comments (2)
neurons/validator.py (2)
3394-3405: Checkpoint load robustness LGTM.Pre‑clear, memory check, and single retry with aggressive cleanup are appropriate.
Also applies to: 3409-3442
2642-2645: Vectorized inactivity penalty LGTM.Cleaner and efficient in‑place scaling for positive scores only.
| # Set CUDA memory allocator configuration to prevent fragmentation | ||
| import os | ||
| if not os.environ.get('PYTORCH_CUDA_ALLOC_CONF'): | ||
| os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True' | ||
|
|
||
| # Set NCCL environment variables for better memory management and debugging | ||
| if not os.environ.get('NCCL_DEBUG'): | ||
| os.environ['NCCL_DEBUG'] = 'WARN' # Change to INFO for more verbose debugging | ||
| if not os.environ.get('NCCL_IB_DISABLE'): | ||
| os.environ['NCCL_IB_DISABLE'] = '1' # Disable InfiniBand to use Ethernet | ||
| if not os.environ.get('NCCL_SOCKET_IFNAME'): | ||
| os.environ['NCCL_SOCKET_IFNAME'] = 'eth0' # Use ethernet interface | ||
| if not os.environ.get('NCCL_P2P_DISABLE'): | ||
| os.environ['NCCL_P2P_DISABLE'] = '1' # Disable P2P to reduce memory pressure | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix env var ordering, avoid hard-coded NCCL defaults, and remove duplicate import (CI I001).
- PYTORCH_CUDA_ALLOC_CONF and NCCL vars must be set before any CUDA use; here they’re set after torch.cuda.manual_seed_all and cudnn toggles, so they may be ignored and/or ineffective.
- Forcing NCCL_IB_DISABLE=1, NCCL_P2P_DISABLE=1, and NCCL_SOCKET_IFNAME=eth0 by default can severely degrade performance or break networking on many setups.
- Duplicate
import oshere triggers the CI isort error.
Action:
- Move this env configuration above the first Torch/CUDA usage (ideally before importing torch).
- Don’t force IB/P2P/interface defaults globally; make them opt-in via config/env.
- Drop the duplicate import to fix CI.
Apply this minimal diff to fix the CI failure now (removes the duplicate import); follow-up placement change is shown below:
-# Set CUDA memory allocator configuration to prevent fragmentation
-import os
-if not os.environ.get('PYTORCH_CUDA_ALLOC_CONF'):
+# Set CUDA memory allocator configuration to prevent fragmentation
+if not os.environ.get('PYTORCH_CUDA_ALLOC_CONF'):
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'Recommended relocation (outside this hunk): place the env setup immediately after the stdlib imports and before any torch import/usage:
# --- stdlib imports ---
import os
# ... other stdlib imports ...
# Must run BEFORE importing torch / using CUDA
os.environ.setdefault('PYTORCH_CUDA_ALLOC_CONF', 'expandable_segments:True')
# Make NCCL tweaks opt‑in to avoid breaking fast paths by default
if os.environ.get('TPLR_ENABLE_NCCL_TWEAKS') == '1':
os.environ.setdefault('NCCL_DEBUG', 'WARN')
# Only set these if you explicitly opt‑in; otherwise leave system defaults
if os.environ.get('TPLR_DISABLE_NCCL_IB') == '1':
os.environ.setdefault('NCCL_IB_DISABLE', '1')
if os.environ.get('TPLR_DISABLE_NCCL_P2P') == '1':
os.environ.setdefault('NCCL_P2P_DISABLE', '1')
# Don’t hard‑code an interface; allow override via TPLR_NCCL_SOCKET_IFNAME
if os.environ.get('TPLR_NCCL_SOCKET_IFNAME'):
os.environ.setdefault('NCCL_SOCKET_IFNAME', os.environ['TPLR_NCCL_SOCKET_IFNAME'])
# Now import torch / use CUDA
import torch🧰 Tools
🪛 GitHub Actions: CI
[error] 69-69: I001 Import block is un-sorted or un-formatted.
🤖 Prompt for AI Agents
In neurons/validator.py around lines 68-82 there is a duplicate import os and
environment variable setup for PYTORCH_CUDA_ALLOC_CONF and NCCL that is placed
after CUDA/torch usage and force-sets NCCL defaults; remove the duplicate import
to resolve the CI isort error, and make the NCCL tweaks opt-in and non-invasive
by using os.environ.setdefault for PYTORCH_CUDA_ALLOC_CONF and gating
NCCL_IB_DISABLE, NCCL_P2P_DISABLE and NCCL_SOCKET_IFNAME behind explicit opt-in
flags (e.g. TPLR_ENABLE_NCCL_TWEAKS and per-feature flags), and plan to relocate
this entire env setup to run before any torch import/usage (immediately after
stdlib imports) in a follow-up commit.
| def _check_memory_and_cleanup(self, context: str = "unknown") -> bool: | ||
| """Check GPU memory usage and perform cleanup if needed. | ||
| Returns True if memory is available, False if critically low. | ||
| """ | ||
| if not torch.cuda.is_available(): | ||
| return True | ||
|
|
||
| # Get memory info | ||
| memory_allocated = torch.cuda.memory_allocated() | ||
| memory_reserved = torch.cuda.memory_reserved() | ||
| memory_total = torch.cuda.get_device_properties(0).total_memory | ||
|
|
||
| # Calculate usage percentages | ||
| allocated_pct = (memory_allocated / memory_total) * 100 | ||
| reserved_pct = (memory_reserved / memory_total) * 100 | ||
|
|
||
| # Log memory usage | ||
| if self.is_master: | ||
| tplr.log_with_context( | ||
| level="debug", | ||
| message=f"Memory check ({context}) - Allocated: {allocated_pct:.1f}% ({memory_allocated/1024**3:.2f}GB), " | ||
| f"Reserved: {reserved_pct:.1f}% ({memory_reserved/1024**3:.2f}GB)", | ||
| sync_window=self.sync_window, | ||
| current_window=self.current_window, | ||
| ) | ||
|
|
||
| # If allocated memory is over 85%, perform cleanup | ||
| if allocated_pct > 85.0: | ||
| if self.is_master: | ||
| tplr.log_with_context( | ||
| level="warning", | ||
| message=f"High memory usage detected ({allocated_pct:.1f}%), performing cleanup", | ||
| sync_window=self.sync_window, | ||
| current_window=self.current_window, | ||
| ) | ||
| torch.cuda.empty_cache() | ||
| import gc | ||
| gc.collect() | ||
| torch.cuda.synchronize() | ||
|
|
||
| # Check again after cleanup | ||
| new_allocated = torch.cuda.memory_allocated() | ||
| new_allocated_pct = (new_allocated / memory_total) * 100 | ||
|
|
||
| # If still critically low after cleanup, return False | ||
| if new_allocated_pct > 90.0: | ||
| if self.is_master: | ||
| tplr.log_with_context( | ||
| level="error", | ||
| message=f"Critical memory usage even after cleanup ({new_allocated_pct:.1f}%)", | ||
| sync_window=self.sync_window, | ||
| current_window=self.current_window, | ||
| ) | ||
| return False | ||
|
|
||
| return True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory percentages computed against the wrong device (device 0).
On multi‑GPU, memory_allocated() uses the current device, but get_device_properties(0) is always GPU 0. This skews the percentage and decisions.
Apply:
- # Get memory info
- memory_allocated = torch.cuda.memory_allocated()
- memory_reserved = torch.cuda.memory_reserved()
- memory_total = torch.cuda.get_device_properties(0).total_memory
+ # Get memory info on the current device
+ dev = torch.cuda.current_device()
+ memory_allocated = torch.cuda.memory_allocated(device=dev)
+ memory_reserved = torch.cuda.memory_reserved(device=dev)
+ memory_total = torch.cuda.get_device_properties(dev).total_memoryOptional: consider thresholding on reserved_pct as well, and making thresholds tunable via hparams.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _check_memory_and_cleanup(self, context: str = "unknown") -> bool: | |
| """Check GPU memory usage and perform cleanup if needed. | |
| Returns True if memory is available, False if critically low. | |
| """ | |
| if not torch.cuda.is_available(): | |
| return True | |
| # Get memory info | |
| memory_allocated = torch.cuda.memory_allocated() | |
| memory_reserved = torch.cuda.memory_reserved() | |
| memory_total = torch.cuda.get_device_properties(0).total_memory | |
| # Calculate usage percentages | |
| allocated_pct = (memory_allocated / memory_total) * 100 | |
| reserved_pct = (memory_reserved / memory_total) * 100 | |
| # Log memory usage | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="debug", | |
| message=f"Memory check ({context}) - Allocated: {allocated_pct:.1f}% ({memory_allocated/1024**3:.2f}GB), " | |
| f"Reserved: {reserved_pct:.1f}% ({memory_reserved/1024**3:.2f}GB)", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| # If allocated memory is over 85%, perform cleanup | |
| if allocated_pct > 85.0: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="warning", | |
| message=f"High memory usage detected ({allocated_pct:.1f}%), performing cleanup", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| torch.cuda.empty_cache() | |
| import gc | |
| gc.collect() | |
| torch.cuda.synchronize() | |
| # Check again after cleanup | |
| new_allocated = torch.cuda.memory_allocated() | |
| new_allocated_pct = (new_allocated / memory_total) * 100 | |
| # If still critically low after cleanup, return False | |
| if new_allocated_pct > 90.0: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="error", | |
| message=f"Critical memory usage even after cleanup ({new_allocated_pct:.1f}%)", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| return False | |
| return True | |
| def _check_memory_and_cleanup(self, context: str = "unknown") -> bool: | |
| """Check GPU memory usage and perform cleanup if needed. | |
| Returns True if memory is available, False if critically low. | |
| """ | |
| if not torch.cuda.is_available(): | |
| return True | |
| # Get memory info on the current device | |
| dev = torch.cuda.current_device() | |
| memory_allocated = torch.cuda.memory_allocated(device=dev) | |
| memory_reserved = torch.cuda.memory_reserved(device=dev) | |
| memory_total = torch.cuda.get_device_properties(dev).total_memory | |
| # Calculate usage percentages | |
| allocated_pct = (memory_allocated / memory_total) * 100 | |
| reserved_pct = (memory_reserved / memory_total) * 100 | |
| # Log memory usage | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="debug", | |
| message=f"Memory check ({context}) - Allocated: {allocated_pct:.1f}% ({memory_allocated/1024**3:.2f}GB), " | |
| f"Reserved: {reserved_pct:.1f}% ({memory_reserved/1024**3:.2f}GB)", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| # If allocated memory is over 85%, perform cleanup | |
| if allocated_pct > 85.0: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="warning", | |
| message=f"High memory usage detected ({allocated_pct:.1f}%), performing cleanup", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| torch.cuda.empty_cache() | |
| import gc | |
| gc.collect() | |
| torch.cuda.synchronize() | |
| # Check again after cleanup | |
| new_allocated = torch.cuda.memory_allocated(device=dev) | |
| new_allocated_pct = (new_allocated / memory_total) * 100 | |
| # If still critically low after cleanup, return False | |
| if new_allocated_pct > 90.0: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="error", | |
| message=f"Critical memory usage even after cleanup ({new_allocated_pct:.1f}%)", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| return False | |
| return True |
🤖 Prompt for AI Agents
In neurons/validator.py around lines 184-240, the code calculates GPU memory
percentages against device 0 (get_device_properties(0)) while
torch.cuda.memory_allocated() and memory_reserved() use the current CUDA device,
producing incorrect percentages on multi-GPU. Fix by querying the actual current
device (e.g., torch.cuda.current_device() or torch.cuda.device_index()) and use
that device id for get_device_properties(...); recompute memory_total from that
device so allocated_pct and reserved_pct are correct. Also make the cleanup
thresholds (allocated and optional reserved) configurable via validator
hparams/params and, if desired, add a reserved_pct check mirroring the
allocated_pct logic.
| # Check memory before baseline evaluation | ||
| if not self._check_memory_and_cleanup("before_baseline_eval"): | ||
| if self.is_master: | ||
| tplr.log_with_context( | ||
| level="critical", | ||
| message="Insufficient memory for baseline evaluation. Skipping evaluation window.", | ||
| sync_window=self.sync_window, | ||
| current_window=self.current_window, | ||
| ) | ||
| continue | ||
|
|
||
| try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid rank divergence: baseline memory check must be a consensus, not per‑rank early-continue.
A single rank hitting continue here will desynchronize later barriers and can deadlock. Use an all‑rank gate and keep ranks aligned.
- # Check memory before baseline evaluation
- if not self._check_memory_and_cleanup("before_baseline_eval"):
- if self.is_master:
- tplr.log_with_context(
- level="critical",
- message="Insufficient memory for baseline evaluation. Skipping evaluation window.",
- sync_window=self.sync_window,
- current_window=self.current_window,
- )
- continue
+ # Check memory before baseline evaluation (global consensus)
+ mem_ok_local = self._check_memory_and_cleanup("before_baseline_eval")
+ mem_ok_global = dist_helper.all_ok(mem_ok_local, self.device, tag="mem_ok_baseline")
+ if not mem_ok_global:
+ if self.is_master:
+ tplr.log_with_context(
+ level="critical",
+ message="Insufficient memory for baseline evaluation on at least one rank. Skipping this window.",
+ sync_window=self.sync_window,
+ current_window=self.current_window,
+ )
+ dist_helper.safe_barrier("bail_baseline_eval", self.local_rank)
+ self.global_step += 1
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Check memory before baseline evaluation | |
| if not self._check_memory_and_cleanup("before_baseline_eval"): | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="critical", | |
| message="Insufficient memory for baseline evaluation. Skipping evaluation window.", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| continue | |
| try: | |
| # Check memory before baseline evaluation (global consensus) | |
| mem_ok_local = self._check_memory_and_cleanup("before_baseline_eval") | |
| mem_ok_global = dist_helper.all_ok(mem_ok_local, self.device, tag="mem_ok_baseline") | |
| if not mem_ok_global: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="critical", | |
| message="Insufficient memory for baseline evaluation on at least one rank. Skipping this window.", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| ) | |
| dist_helper.safe_barrier("bail_baseline_eval", self.local_rank) | |
| self.global_step += 1 | |
| continue | |
| try: |
🤖 Prompt for AI Agents
In neurons/validator.py around lines 1318 to 1329, the memory-check currently
does per-rank early-continue which can desynchronize ranks and deadlock; change
it to perform a collective consensus (e.g., all-reduce/any-reduce or broadcast a
boolean) so every rank knows if any rank failed the memory check, then have all
ranks skip the baseline evaluation in lockstep; only the master should log the
message once when the consensus indicates insufficient memory; ensure existing
barrier/synchronization points remain intact and do not exit early on a
single-rank result.
| # Check memory before each UID evaluation | ||
| if not self._check_memory_and_cleanup(f"before_uid_{eval_uid}"): | ||
| if self.is_master: | ||
| tplr.log_with_context( | ||
| level="warning", | ||
| message=f"Insufficient memory for UID {eval_uid} evaluation. Skipping this UID.", | ||
| sync_window=self.sync_window, | ||
| current_window=self.current_window, | ||
| eval_uid=eval_uid, | ||
| ) | ||
| continue | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per‑UID memory precheck must also be global and still hit the end‑of‑iter barrier.
Early continue on a subset of ranks will skip the per‑iteration "end_eval_iter" barrier and can hang others.
- # Check memory before each UID evaluation
- if not self._check_memory_and_cleanup(f"before_uid_{eval_uid}"):
- if self.is_master:
- tplr.log_with_context(
- level="warning",
- message=f"Insufficient memory for UID {eval_uid} evaluation. Skipping this UID.",
- sync_window=self.sync_window,
- current_window=self.current_window,
- eval_uid=eval_uid,
- )
- continue
+ # Check memory before each UID evaluation (global consensus)
+ mem_ok_local = self._check_memory_and_cleanup(f"before_uid_{eval_uid}")
+ mem_ok_global = dist_helper.all_ok(mem_ok_local, self.device, tag=f"mem_ok_uid_{eval_uid}")
+ if not mem_ok_global:
+ if self.is_master:
+ tplr.log_with_context(
+ level="warning",
+ message=f"Insufficient memory on at least one rank for UID {eval_uid}. Skipping UID.",
+ sync_window=self.sync_window,
+ current_window=self.current_window,
+ eval_uid=eval_uid,
+ )
+ # Mark attempted so counters remain consistent
+ uids_attempted_this_window.add(eval_uid)
+ self.peers_last_eval_window[eval_uid] = self.sync_window
+ # Align with the per‑iteration barrier then skip
+ dist_helper.safe_barrier("end_eval_iter", self.local_rank)
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Check memory before each UID evaluation | |
| if not self._check_memory_and_cleanup(f"before_uid_{eval_uid}"): | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="warning", | |
| message=f"Insufficient memory for UID {eval_uid} evaluation. Skipping this UID.", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| eval_uid=eval_uid, | |
| ) | |
| continue | |
| # Check memory before each UID evaluation (global consensus) | |
| mem_ok_local = self._check_memory_and_cleanup(f"before_uid_{eval_uid}") | |
| mem_ok_global = dist_helper.all_ok(mem_ok_local, self.device, tag=f"mem_ok_uid_{eval_uid}") | |
| if not mem_ok_global: | |
| if self.is_master: | |
| tplr.log_with_context( | |
| level="warning", | |
| message=f"Insufficient memory on at least one rank for UID {eval_uid}. Skipping UID.", | |
| sync_window=self.sync_window, | |
| current_window=self.current_window, | |
| eval_uid=eval_uid, | |
| ) | |
| # Mark attempted so counters remain consistent | |
| uids_attempted_this_window.add(eval_uid) | |
| self.peers_last_eval_window[eval_uid] = self.sync_window | |
| # Align with the per-iteration barrier then skip | |
| dist_helper.safe_barrier("end_eval_iter", self.local_rank) | |
| continue |
🤖 Prompt for AI Agents
In neurons/validator.py around lines 1421-1432, the per-UID memory check
currently does a local early continue which can cause some ranks to skip the
per-iteration "end_eval_iter" barrier and hang others; instead, compute a local
boolean (local_ok = self._check_memory_and_cleanup(...)), perform a global
reduction (e.g., all-reduce AND across ranks) to produce global_ok, log the
warning only on master when global_ok is False, then ensure all ranks hit the
end-of-iteration barrier before skipping that UID (i.e., if not global_ok then
call the barrier and continue), so no rank leaves the barrier early.
| try: | ||
| # Check memory before FSDP checkpoint save | ||
| self._check_memory_and_cleanup(operation="FSDP_checkpoint_save") | ||
|
|
||
| handle = await self.ckpt.save_local_async( | ||
| model=self.model, | ||
| window=self.sync_window, | ||
| sync_window=self.sync_window, | ||
| topology="FSDP", | ||
| ) | ||
|
|
||
| # Schedule an upload that will wait for the save to finish, then upload in background | ||
| await self.ckpt.upload( | ||
| window=self.sync_window, | ||
| background=True, | ||
| delete_local_on_success=True, | ||
| wait_for=handle, | ||
| ) | ||
| # Schedule an upload that will wait for the save to finish, then upload in background | ||
| await self.ckpt.upload( | ||
| window=self.sync_window, | ||
| background=True, | ||
| delete_local_on_success=True, | ||
| wait_for=handle, | ||
| ) | ||
|
|
||
| # Clean up after checkpoint save | ||
| torch.cuda.empty_cache() | ||
|
|
||
| except (torch.cuda.OutOfMemoryError, RuntimeError) as e: | ||
| if "CUDA out of memory" in str(e) or "NCCL" in str(e): | ||
| tplr.logger.warning(f"CUDA/NCCL error during FSDP checkpoint save at step {self.global_step}: {e}") | ||
| # Force memory cleanup and continue | ||
| torch.cuda.empty_cache() | ||
| if torch.cuda.is_available(): | ||
| torch.cuda.synchronize() | ||
| tplr.logger.info("Continuing without checkpoint save due to memory/NCCL constraints") | ||
| else: | ||
| raise | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong parameter name in _check_memory_and_cleanup and skip logic not honored.
_check_memory_and_cleanuptakescontext, notoperation→ this will raiseTypeError.- If memory is critically low, we should skip the save path to avoid failures.
Apply:
- # Check memory before FSDP checkpoint save
- self._check_memory_and_cleanup(operation="FSDP_checkpoint_save")
-
- handle = await self.ckpt.save_local_async(
- model=self.model,
- window=self.sync_window,
- sync_window=self.sync_window,
- topology="FSDP",
- )
+ # Check memory before FSDP checkpoint save
+ mem_ok = self._check_memory_and_cleanup(context="FSDP_checkpoint_save")
+ if not mem_ok:
+ tplr.logger.warning("Skipping FSDP checkpoint save due to low GPU memory.")
+ else:
+ handle = await self.ckpt.save_local_async(
+ model=self.model,
+ window=self.sync_window,
+ sync_window=self.sync_window,
+ topology="FSDP",
+ )
- # Schedule an upload that will wait for the save to finish, then upload in background
- await self.ckpt.upload(
- window=self.sync_window,
- background=True,
- delete_local_on_success=True,
- wait_for=handle,
- )
+ # Schedule an upload that will wait for the save to finish, then upload in background
+ await self.ckpt.upload(
+ window=self.sync_window,
+ background=True,
+ delete_local_on_success=True,
+ wait_for=handle,
+ )
- # Clean up after checkpoint save
- torch.cuda.empty_cache()
+ # Clean up after checkpoint save
+ if torch.cuda.is_available():
+ torch.cuda.empty_cache()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| # Check memory before FSDP checkpoint save | |
| self._check_memory_and_cleanup(operation="FSDP_checkpoint_save") | |
| handle = await self.ckpt.save_local_async( | |
| model=self.model, | |
| window=self.sync_window, | |
| sync_window=self.sync_window, | |
| topology="FSDP", | |
| ) | |
| # Schedule an upload that will wait for the save to finish, then upload in background | |
| await self.ckpt.upload( | |
| window=self.sync_window, | |
| background=True, | |
| delete_local_on_success=True, | |
| wait_for=handle, | |
| ) | |
| # Schedule an upload that will wait for the save to finish, then upload in background | |
| await self.ckpt.upload( | |
| window=self.sync_window, | |
| background=True, | |
| delete_local_on_success=True, | |
| wait_for=handle, | |
| ) | |
| # Clean up after checkpoint save | |
| torch.cuda.empty_cache() | |
| except (torch.cuda.OutOfMemoryError, RuntimeError) as e: | |
| if "CUDA out of memory" in str(e) or "NCCL" in str(e): | |
| tplr.logger.warning(f"CUDA/NCCL error during FSDP checkpoint save at step {self.global_step}: {e}") | |
| # Force memory cleanup and continue | |
| torch.cuda.empty_cache() | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| tplr.logger.info("Continuing without checkpoint save due to memory/NCCL constraints") | |
| else: | |
| raise | |
| try: | |
| # Check memory before FSDP checkpoint save | |
| mem_ok = self._check_memory_and_cleanup(context="FSDP_checkpoint_save") | |
| if not mem_ok: | |
| tplr.logger.warning("Skipping FSDP checkpoint save due to low GPU memory.") | |
| else: | |
| handle = await self.ckpt.save_local_async( | |
| model=self.model, | |
| window=self.sync_window, | |
| sync_window=self.sync_window, | |
| topology="FSDP", | |
| ) | |
| # Schedule an upload that will wait for the save to finish, then upload in background | |
| await self.ckpt.upload( | |
| window=self.sync_window, | |
| background=True, | |
| delete_local_on_success=True, | |
| wait_for=handle, | |
| ) | |
| # Clean up after checkpoint save | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| except (torch.cuda.OutOfMemoryError, RuntimeError) as e: | |
| if "CUDA out of memory" in str(e) or "NCCL" in str(e): | |
| tplr.logger.warning(f"CUDA/NCCL error during FSDP checkpoint save at step {self.global_step}: {e}") | |
| # Force memory cleanup and continue | |
| torch.cuda.empty_cache() | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| tplr.logger.info("Continuing without checkpoint save due to memory/NCCL constraints") | |
| else: | |
| raise |
Description
Implements comprehensive CUDA memory management and NCCL error handling to resolve Out of Memory (OOM) crashes during 70B model evaluation and distributed checkpoint operations.
Related Issue(s)
CUDA OOM during FSDP all-gather operations in evaluate_model calls
NCCL "unhandled cuda error" failures during distributed checkpoint loading
Memory fragmentation causing gradual memory exhaustion across training windows
Type of Change
Branch Naming
Commit Messages
Code Quality
Testing
Documentation
If this is a breaking change
Screenshots/Examples
Additional Notes
Summary by CodeRabbit