[Distributed] Extend QuantizationModifier to support distributed activation calibration#2391
[Distributed] Extend QuantizationModifier to support distributed activation calibration#2391Etelis wants to merge 9 commits intovllm-project:mainfrom
Conversation
|
👋 Hi! Thank you for contributing to llm-compressor. Please add the ready label when the PR is ready for review. Note: This is required to complete the testing suite, please only add the label once the PR is code complete and local testing has been performed. |
Summary of ChangesHello @Etelis, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Code Review
The pull request introduces distributed support for the QuantizationModifier, enabling weight calibration and activation observer synchronization across multiple GPUs. This is a significant improvement for scaling quantization to large models. The implementation uses a greedy bin-packing algorithm for load balancing weight calibration, which is a solid choice. However, the current approach to synchronization involves a large number of individual collective communication calls (all-reduces and broadcasts) within loops, which will likely become a performance bottleneck due to network latency. Additionally, there are a few issues with device indexing in multi-node environments that should be addressed to ensure robustness.
| for _, module in match_named_modules(model, self.resolved_targets, self.ignore): | ||
| for base_name in ("input", "output", "q", "k", "v"): | ||
| observer = getattr(module, f"{base_name}_observer", None) | ||
| if observer is None: | ||
| continue | ||
|
|
||
| # all-reduce accumulated min/max across ranks | ||
| if ( | ||
| hasattr(observer, "past_min_vals") | ||
| and observer.past_min_vals is not None | ||
| ): | ||
| observer.past_min_vals = all_reduce_min(observer.past_min_vals) | ||
| if ( | ||
| hasattr(observer, "past_max_vals") | ||
| and observer.past_max_vals is not None | ||
| ): | ||
| observer.past_max_vals = all_reduce_max(observer.past_max_vals) | ||
|
|
||
| # all-reduce global min/max (TENSOR_GROUP strategy) | ||
| if ( | ||
| hasattr(observer, "past_global_min_vals") | ||
| and observer.past_global_min_vals is not None | ||
| ): | ||
| observer.past_global_min_vals = all_reduce_min( | ||
| observer.past_global_min_vals | ||
| ) | ||
| if ( | ||
| hasattr(observer, "past_global_max_vals") | ||
| and observer.past_global_max_vals is not None | ||
| ): | ||
| observer.past_global_max_vals = all_reduce_max( | ||
| observer.past_global_max_vals | ||
| ) | ||
|
|
||
| recompute_qparams_from_observer(module, base_name) |
There was a problem hiding this comment.
The current implementation of _sync_activation_observers performs multiple all_reduce operations per module inside a nested loop. For a typical transformer model, this can result in hundreds or even thousands of small collective communication calls. In distributed settings, the latency overhead of many small calls is much higher than a single large call.
Consider aggregating all tensors that need reduction into a single list, concatenating them into one or two large buffers (e.g., one for MIN and one for MAX), performing a single all_reduce on each buffer, and then unpacking the results back into the observers. This will significantly improve performance on high-latency networks.
| return | ||
|
|
||
| # NCCL requires each rank to use its own GPU | ||
| device = torch.device(f"cuda:{dist.get_rank()}") |
There was a problem hiding this comment.
dist.get_rank() returns the global rank of the process. In multi-node environments, this rank will exceed the number of GPUs available on a single node (e.g., rank 8 on the second node of an 8-GPU cluster). Using the global rank as a CUDA device index will result in an 'invalid device ordinal' error. Use torch.cuda.current_device() instead to ensure the correct local GPU is targeted.
| device = torch.device(f"cuda:{dist.get_rank()}") | |
| device = torch.device(torch.cuda.current_device()) |
| dist.init_process_group(backend="nccl") | ||
| rank = dist.get_rank() | ||
| world_size = dist.get_world_size() | ||
| torch.cuda.set_device(rank) |
There was a problem hiding this comment.
Add shared utility functions for multi-GPU weight calibration and activation observer synchronization. All functions are no-ops when torch.distributed is not initialized. Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Add a helper function to recompute scale and zero_point from an observer's accumulated min/max after DDP all-reduce synchronization. Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Refactor QuantizationModifier.on_start to support distributed weight calibration. Each rank calibrates a subset of modules (assigned by greedy bin-packing on weight size) and broadcasts results to all ranks. Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
Use each rank's own GPU device for NCCL broadcast instead of the module's execution device, which may be CPU or shared across ranks when the model is not GPU-resident. Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
61c255a to
72ed4b2
Compare
72ed4b2 to
9975edc
Compare
| @@ -0,0 +1,194 @@ | |||
| """ | |||
| Distributed utilities for multi-GPU (DDP) calibration and optimization. | |||
There was a problem hiding this comment.
Consolidate with src/llmcompressor/utils/dist.py
There was a problem hiding this comment.
Done. Deleted distributed.py entirely — all functions were either for weight partitioning (removed) or all_reduce wrappers (inlined into Observer.synchronize()). No functions needed to move to dist.py.
| # fuse global_scales (all ranks, idempotent) | ||
| for module in model.modules(): | ||
| update_fused_layer_weight_global_scales(module) |
There was a problem hiding this comment.
This part should be tested with NVFP4. The problem is that there may be cases where submodules which need to be fused are assigned to different ranks. This is why I suggested breaking this out for @GOavi101 to focus on.
There was a problem hiding this comment.
My suggestion is to do the weight calibration independently on each rank for now, and focus on the activation calibration.
There was a problem hiding this comment.
Done. Weight calibration now runs identically on every rank — no partitioning or broadcasting. PR focuses exclusively on activation observer synchronization.
| return tensor | ||
|
|
||
| device = tensor.device | ||
| if device.type == "cpu": |
There was a problem hiding this comment.
Why does the tensor need to be moved to the gpu?
I also don't see observer values being on the cpu as a common case, I'm wondering when you ever saw this?
There was a problem hiding this comment.
Removed. The CPU-to-GPU movement and the all_reduce wrapper are both gone — Observer.synchronize() now calls dist.all_reduce directly on the observer tensors (which are always on GPU in practice).
| update_offload_parameter(module, param_name, tensor) | ||
|
|
||
|
|
||
| def all_reduce_min(tensor: torch.Tensor) -> torch.Tensor: |
There was a problem hiding this comment.
I think the code would be a bit clearer if we didn't break this function out, and instead just used dist.all_reduce directly.
There was a problem hiding this comment.
Done. Removed all_reduce_min/all_reduce_max wrappers. Observer.synchronize() calls dist.all_reduce directly with the appropriate ReduceOp.
|
|
||
| # all-reduce accumulated min/max across ranks | ||
| if ( | ||
| hasattr(observer, "past_min_vals") |
There was a problem hiding this comment.
Instead of having many hasattr calls, which is not very maintainable, consider implementing a synchronize() method on the observers directly. I also think that you need to synchronize global scales.
In addition, having many synchronization ops has lots of runtime cost. Consider implementing synchronize() with a return value of the comms, similar to the GPTQ implementation
There was a problem hiding this comment.
Done. Added synchronize() on the Observer base class that returns a list of dist.Work handles (matching the GPTQ async pattern). All all_reduce ops are batched and waited on once via wait_for_comms. Also added recompute_global_scale() and recompute_qparams() to encapsulate the recomputation from accumulated state. Memoryless observers (no past_* attributes) return empty list/None automatically via getattr with defaults.
| if not self.ended_: | ||
| self.on_end(state, None) | ||
|
|
||
| def _sync_activation_observers(self, model): |
There was a problem hiding this comment.
I think that you'll want to implement this method on the QuantizationMixin, that way DDP activation calibration logic can be shared with other modifiers like GPTQModifier
There was a problem hiding this comment.
Done. sync_activation_observers() is now on QuantizationMixin, so both QuantizationModifier and GPTQModifier can use it.
| calibrate_activations(module, value_states, base_name="v") | ||
|
|
||
|
|
||
| def recompute_qparams_from_observer(module: Module, base_name: str): |
There was a problem hiding this comment.
Is this function not redundant with call_observer?
There was a problem hiding this comment.
Not directly redundant — call_observer needs a tensor value to run the forward pass through the observer, while recompute_qparams_from_observer recomputes scale/zp from already-accumulated past_min_vals/past_max_vals state (needed after DDP sync). Removed recompute_qparams_from_observer and moved the logic into Observer.recompute_qparams() and Observer.recompute_global_scale() methods instead.
| dist.init_process_group(backend="nccl") | ||
| rank = dist.get_rank() | ||
| world_size = dist.get_world_size() | ||
| torch.cuda.set_device(rank) |
There was a problem hiding this comment.
Please use init_distributed util
There was a problem hiding this comment.
Done. Example now uses init_dist() from compressed_tensors.offload, along with load_offloaded_model(), get_rank_partition(), and dispatch_model() — matching the existing llama3_ddp_example.py pattern.
Remove distributed weight calibration (partition, broadcast, rank assignment) and focus exclusively on activation observer synchronization. Key changes: - Add synchronize(), recompute_qparams(), recompute_global_scale() to Observer base class for clean DDP interface - Move sync_activation_observers() to QuantizationMixin for reuse by both QuantizationModifier and GPTQModifier - Batch all async all_reduce ops and wait once, matching GPTQ pattern - Delete distributed.py (consolidated into Observer methods + dist.py) - Remove recompute_qparams_from_observer from calibration.py - Align example with existing DDP patterns (init_dist, get_rank_partition) - Update unit and multi-GPU tests for new observer-based sync Signed-off-by: Itay Etelis <itay.etelis@ibm.com>
|
All review comments have been addressed:
Runtime & eval results added to PR description (Llama-3-8B, W8A8 static activations, 256 samples, A100-80GB):
|
kylesayrs
left a comment
There was a problem hiding this comment.
I think this code looks really great, the benchmarks look great as well. A couple notes from me:
- The fact that the 4x DDP setup does not increase perplexity gives me confidence that syncing once per epoch (rather than once per batch) is good enough, nice work.
- From your speedup benchmarks, it seems like repeating work (
calculate_q/gparams) across ranks is not too much of a cost. That seems to match expectations as well, nice work.
I'll make sure this code gets merged as part of the next LLM Compressor release.
| for attr, op in [ | ||
| ("past_min_vals", dist.ReduceOp.MIN), | ||
| ("past_max_vals", dist.ReduceOp.MAX), | ||
| ("past_global_min_vals", dist.ReduceOp.MIN), | ||
| ("past_global_max_vals", dist.ReduceOp.MAX), | ||
| ]: |
There was a problem hiding this comment.
I agree that I think this approach is more elegant than reimplementing for each subclass
There was a problem hiding this comment.
dont we need to do this for every subclass? this strategy only makes sense for an single observer: static_minmax
memoryless_minmax and memoryless_mse it also works i guess, because they don't have any of those values
minmax (moving average) and mse (moving average) - it makes no sense, you probably need to average across ranks, though it wouldn't be hard in theory to do:
There was a problem hiding this comment.
Yeah let's just average for now
| self.on_start(state, None) | ||
|
|
||
| if event.type_ == EventType.SEQUENTIAL_EPOCH_END: | ||
| QuantizationMixin.sync_activation_observers(self, state.model) |
There was a problem hiding this comment.
I think we'll need to add these to GPTQ and AWQ, right?
| ]: | ||
| val = getattr(self, attr, None) | ||
| if val is not None: | ||
| comms.append(dist.all_reduce(val, op=op, async_op=True)) |
There was a problem hiding this comment.
Think we need the fp8 trick here from GPTQ base.py
Closes #2220
Adds DDP support to
QuantizationModifierfor activation observer synchronization across multiple GPUs during calibration.At
SEQUENTIAL_EPOCH_ENDandCALIBRATION_EPOCH_END, activation observer min/max values are all-reduced across ranks. Scale/zp are then recomputed from the global statistics so all ranks have identical quantization parameters.Changes
synchronize(),recompute_qparams(),recompute_global_scale()toObserverbase classsync_activation_observers()toQuantizationMixin(shared byQuantizationModifierandGPTQModifier)dist.all_reduceoperations and wait once, matching GPTQ DDP patternrecompute_qparams_from_observerfromcalibration.py(now encapsulated in Observer methods)init_dist,get_rank_partition)Runtime & Evaluation Results
Model:
Meta-Llama-3-8B-Instruct, W8A8 (static input activations), 256 calibration samplesTest plan
pytest tests/llmcompressor/utils/test_distributed.py(8 tests)torchrun --nproc_per_node=2 -m pytest tests/llmcompressor/modifiers/quantization/test_quantization_ddp.py(2 tests)