Feature Request: Distributed Model Streamer for Multi-GPU/Multi-Node Loading
Problem
The current model streamer implementation is highly effective for single-process model loading and is already integrated with vLLM's sharded model loader. However, vLLM's default loader behavior for distributed inference (e.g., using torch.distributed for tensor parallelism) is to have each process load the entire model from storage independently, after which the model is sharded in memory. This can be inefficient, especially when reading from remote storage like S3, as it doesn't leverage the aggregate bandwidth of multiple nodes and creates a significant bottleneck where each node redundantly downloads data it will discard.
We need a mechanism to coordinate the reading workload across all processes in the distributed group from the beginning, avoiding these redundant downloads.
Solution
We propose the introduction of a DistributedFileStreamer that orchestrates model loading across multiple workers. This new component would sit between the SafetensorsStreamer and the underlying FileStreamer and would be responsible for partitioning the read workload and broadcasting the data to all participating GPUs.
The primary proposed architecture is a Multiple Readers Mode:
- The read workload is partitioned among all participating processes.
- Each rank is responsible for reading a specific subset of the model's tensors from storage using its local
FileStreamer.
- Processes then broadcast their tensors to the rest of the group in a round-robin or "turn-based" fashion.
This approach is preferred because it can leverage the aggregate bandwidth of multiple nodes, leading to significantly faster overall model loading times. It is also a more scalable design and is potentially better suited for technologies like GPUDirect Storage (GDS), where each GPU can read its partition directly. The main challenges to address are managing the synchronization between processes and potential workload imbalance if tensor sizes are highly variable.
Alternative Approach: Single Reader Mode
A simpler alternative would be a "Single Reader Mode," where one designated process (e.g., rank=0) handles all I/O from the storage backend and then broadcasts each tensor to the other ranks. While easier to implement, this approach is fundamentally limited by the network bandwidth of that single node and does not scale, which is why the Multiple Readers mode is the primary proposal for a high-performance solution.
Proposed API and Implementation Changes
To support this, several components would need to be updated:
-
SafetensorsStreamer:
- Should accept a
device parameter to specify the target device for the final tensor (cpu or cuda).
- vLLM is already aware of the device type.
- Checks if
torch.distributed.is_initialized() to select DistributedFileStreamer or FileStreamer if not initialized
- Receives a "buffer" tensor from the
DistributedFileStreamer (or FileStreamer) get_chunks and reshapes to the final tensor according to the metadata
-
FileStreamer:
- Should be updated to yield a
torch.Tensor directly, instead of a NumPy ndarray, to support both cpu and cuda device types
stream_files(device_type): Initializes the process, partitions the workload and starts reading from storage to cpu
get_chunks(): when yielding the next tensor for cuda device type - copy the cpu tensor to the cuda device
-
DistributedFileStreamer (New Component):
stream_files(device_type): Initializes the process, partitions the workload and starts reading from storage to cpu
get_chunks(): The iterator that yields alll the tensors. This method will contain the core broadcasting logic.
Broadcasting Mechanism (get_chunks in Multi-Reader Mode)
Since torch.distributed.broadcast only transfers tensor data, a mechanism to communicate metadata (shape, dtype) is required before each broadcast.
As part of partitioning the entire workload, the DistributedFileStreamer will create a unique index per tensor, together with a map of the unique index to the metadata of the tensor. This index will be broadcast before the tensor data, to enable each process to build the corresponding tensor.
Since each process is reading full tensors, the number of tensors each process is reading may be different. This is taken into account in the broadcasting round robin.
Note that the DistributedFileStreamer and the FileStreamer return one dimensional buffer tensors. These tensors will be reshaped to the final tensors by the SafetensorsStreamer
A proposed "broadcasting in turns" workflow is outlined in the pseudocode below:
# Pseudocode for the stream_files() setup phase on each rank
# Each rank gets the full list of tensor metadata, mapping a global index to each tensor's properties.
# The metadata shape is one dimensional tensor and the dtype is torch.int64 (not the final shape and type)
tensor_metadata_map = parse_all_safetensors_files_globally()
# Each rank independently calculates the same global reading plan.
(reading_lists_per_rank, num_tensors_per_rank) = partition_tensors(tensor_metadata_map, world_size)
# Create a deterministic broadcast schedule that all ranks agree on.
# This handles unbalanced workloads correctly by building an ordered list of which rank broadcasts when.
# Example: if num_tensors_per_rank is [3, 2], the plan would be [0, 1, 0, 1, 0].
broadcast_plan = create_broadcast_plan(num_tensors_per_rank)
# Each rank gets its own specific list of tensors to read.
my_reading_list = reading_lists_per_rank[my_rank]
file_streamer.stream_files(my_reading_list)
# Each rank also knows the total number of tensors to expect.
num_total_tensors = len(tensor_metadata_map)
# Pseudocode for the get_chunks() iterator on each rank
# The loop iterates once for each tensor that will be broadcast across the cluster.
counter = 0
while counter < num_total_tensors
# Determine which rank is responsible for broadcasting this tensor based on the pre-computed global plan.
current_broadcasting_rank = broadcast_plan[counter]
tensor_to_yield = None
if current_broadcasting_rank == my_rank:
# My turn to read and broadcast.
# The plan ensures this rank has a tensor to read at this point in its sequence.
file_tensor_index, cpu_tensor = file_streamer.get_next_tensor()
gpu_tensor = cpu_tensor.to(f'cuda:{my_rank}')
# Determine the global index of the tensor I just read.
global_tensor_index = get_global_tensor_index(file_tensor_index) # Look up in the mapping generated in the stream_files() stage
# 1. Broadcast the METADATA (the global index of the tensor we are about to send).
# This tells receivers which tensor is coming so they can prepare the correct buffer.
metadata_tensor = torch.tensor([global_tensor_index], dtype=torch.int64, device=f'cuda:{my_rank}')
dist.broadcast(metadata_tensor, src=my_rank)
# 2. Broadcast the actual TENSOR DATA.
dist.broadcast(gpu_tensor, src=my_rank)
tensor_to_yield = gpu_tensor
# If this rank has no more tensors to read, it still participates in the broadcast
# but will act like a receiver for all subsequent turns.
# The logic naturally handles this by falling into the 'else' block on future iterations.
else:
# Not my turn, I need to receive.
# 1. Receive the METADATA (the global index of the incoming tensor).
metadata_tensor = torch.zeros(1, dtype=torch.int64, device=f'cuda:{my_rank}')
dist.broadcast(metadata_tensor, src=current_broadcasting_rank)
received_global_tensor_index = metadata_tensor.item()
# 2. shape and dtype (not the final tensor yet - this will be handled later in the `SafetensorsStreamer`)
shape, dtype = tensor_metadata_map[received_global_tensor_index]
tensor_to_yield = torch.zeros(shape, dtype, device=f'cuda:{my_rank}')
# 3. receive tensor data
dist.broadcast(tensor_to_yield, src=current_broadcasting_rank)
counter += 1
num_completed_tensors += 1
yield tensor_to_yield
Additional Considerations:
-
GPU Memory : The distributed approach requires at least one additional buffer in GPU memory to receive broadcast data - the streamer should validate that there is enough free GPU memory
-
CPU Memory: The distributed approach requires at least one additional buffer in CPU memory to receive broadcast data before it can be copied to the GPU (if using the Gloo backend) in addition to the CPU memory buffer allocated by the SafetensorsStreamer
-
Storage vs. Network Trade-off: A third approach is for each node to read the entire model from storage independently. This is a trade-off between storage read bandwidth and inter-node communication bandwidth. It might be preferable if inter-node communication is slow but each node has a fast link to storage.
This proposal outlines a path toward a more efficient, distributed model loading mechanism. We would appreciate feedback on the proposed design
Feature Request: Distributed Model Streamer for Multi-GPU/Multi-Node Loading
Problem
The current model streamer implementation is highly effective for single-process model loading and is already integrated with vLLM's sharded model loader. However, vLLM's default loader behavior for distributed inference (e.g., using
torch.distributedfor tensor parallelism) is to have each process load the entire model from storage independently, after which the model is sharded in memory. This can be inefficient, especially when reading from remote storage like S3, as it doesn't leverage the aggregate bandwidth of multiple nodes and creates a significant bottleneck where each node redundantly downloads data it will discard.We need a mechanism to coordinate the reading workload across all processes in the distributed group from the beginning, avoiding these redundant downloads.
Solution
We propose the introduction of a
DistributedFileStreamerthat orchestrates model loading across multiple workers. This new component would sit between theSafetensorsStreamerand the underlyingFileStreamerand would be responsible for partitioning the read workload and broadcasting the data to all participating GPUs.The primary proposed architecture is a Multiple Readers Mode:
FileStreamer.This approach is preferred because it can leverage the aggregate bandwidth of multiple nodes, leading to significantly faster overall model loading times. It is also a more scalable design and is potentially better suited for technologies like GPUDirect Storage (GDS), where each GPU can read its partition directly. The main challenges to address are managing the synchronization between processes and potential workload imbalance if tensor sizes are highly variable.
Alternative Approach: Single Reader Mode
A simpler alternative would be a "Single Reader Mode," where one designated process (e.g.,
rank=0) handles all I/O from the storage backend and then broadcasts each tensor to the other ranks. While easier to implement, this approach is fundamentally limited by the network bandwidth of that single node and does not scale, which is why the Multiple Readers mode is the primary proposal for a high-performance solution.Proposed API and Implementation Changes
To support this, several components would need to be updated:
SafetensorsStreamer:deviceparameter to specify the target device for the final tensor (cpuorcuda).torch.distributed.is_initialized()to selectDistributedFileStreamerorFileStreamerif not initializedDistributedFileStreamer(orFileStreamer)get_chunksand reshapes to the final tensor according to the metadataFileStreamer:torch.Tensordirectly, instead of a NumPyndarray, to support bothcpuandcudadevice typesstream_files(device_type): Initializes the process, partitions the workload and starts reading from storage tocpuget_chunks(): when yielding the next tensor forcudadevice type - copy thecputensor to thecudadeviceDistributedFileStreamer(New Component):stream_files(device_type): Initializes the process, partitions the workload and starts reading from storage tocpuget_chunks(): The iterator that yields alll the tensors. This method will contain the core broadcasting logic.Broadcasting Mechanism (
get_chunksin Multi-Reader Mode)Since
torch.distributed.broadcastonly transfers tensor data, a mechanism to communicate metadata (shape, dtype) is required before each broadcast.As part of partitioning the entire workload, the
DistributedFileStreamerwill create a unique index per tensor, together with a map of the unique index to the metadata of the tensor. This index will be broadcast before the tensor data, to enable each process to build the corresponding tensor.Since each process is reading full tensors, the number of tensors each process is reading may be different. This is taken into account in the broadcasting round robin.
Note that the
DistributedFileStreamerand theFileStreamerreturn one dimensional buffer tensors. These tensors will be reshaped to the final tensors by theSafetensorsStreamerA proposed "broadcasting in turns" workflow is outlined in the pseudocode below:
Additional Considerations:
GPU Memory : The distributed approach requires at least one additional buffer in GPU memory to receive broadcast data - the streamer should validate that there is enough free GPU memory
CPU Memory: The distributed approach requires at least one additional buffer in CPU memory to receive broadcast data before it can be copied to the GPU (if using the Gloo backend) in addition to the CPU memory buffer allocated by the
SafetensorsStreamerStorage vs. Network Trade-off: A third approach is for each node to read the entire model from storage independently. This is a trade-off between storage read bandwidth and inter-node communication bandwidth. It might be preferable if inter-node communication is slow but each node has a fast link to storage.
This proposal outlines a path toward a more efficient, distributed model loading mechanism. We would appreciate feedback on the proposed design