Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kv_connectors/llmd_fs_backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The llmd-fs-backend extends the native [vLLM Offloading Connector](#offloading-connector-docs) to support a file system backend.
The llmd-fs-backend extends the native [vLLM Offloading Connector](#offloading-connector-docs) to support a file system and object store backends.
This backend provides a shared-storage offloading layer for vLLM. It moves KV-cache blocks between GPU and shared storage efficiently using:

- GPU block transfers using GPU DMA (default) or optional GPU-kernel-based copying using GPU SMs.
Expand All @@ -11,7 +11,7 @@ This backend provides a shared-storage offloading layer for vLLM. It moves KV-ca
- NUMA-aware CPU scheduling of worker threads
- Atomic file writes and reads

The fs connector (an offloading connector with a file system backend) is suitable for shared storage, as well as a local disk.
The fs connector is suitable for shared storage, as well as a local disk.

For architectural clarity, the fs backend is not responsible for cleanup. It is up to the storage system to manage this.
For simple setups, see the **Storage Cleanup** section.
Expand Down Expand Up @@ -66,7 +66,7 @@ pip install -e .
- `block_size`: number of tokens stored per file (must be in granulaity of GPU block size).
- `threads_per_gpu`: number of I/O threads per GPU
- `max_staging_memory_gb`: total staging memory limit
- `gds_mode`: GPUDirect Storage mode (default: `disabled`). See [GPUDirect Storage (GDS)](./docs/gds.md) for options, requirements, and verification.
- `backend`: POSIX, OBJ, or various GDS options (default: `POSIX`). See [GPUDirect Storage (GDS)](./docs/gds.md) for the GDS options, requirements, and verification.

### Environment variables
- `STORAGE_LOG_LEVEL`: set the log level for both C++ and Python (`trace`, `debug`, `info`, `warn`, `error`). Default: `info`
Expand Down
24 changes: 12 additions & 12 deletions kv_connectors/llmd_fs_backend/docs/gds.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ If `libcufile.so` is not present at runtime, the connector falls back to CPU sta

## GDS modes

| `gds_mode` value | Read | Write |
| `backend` value | Read | Write |
|---|---|---|
| `disabled` (default) | CPU staging | CPU staging |
| `read_only` | GDS direct | CPU staging |
| `write_only` | CPU staging | GDS direct |
| `read_write` | GDS direct | GDS direct |
| `bb_read_only` | GDS + Bounce Buffer | CPU staging |
| `bb_write_only` | CPU staging | GDS + Bounce Buffer |
| `bb_read_write` | GDS + Bounce Buffer | GDS + Bounce Buffer |
| `POSIX` (default) | CPU staging | CPU staging |
| `GDS_READ` | GDS direct | CPU staging |
| `GDS_WRITE` | CPU staging | GDS direct |
| `GDS` | GDS direct | GDS direct |
| `GDS_BB_READ` | GDS + Bounce Buffer | CPU staging |
| `GDS_BB_WRITE` | CPU staging | GDS + Bounce Buffer |
| `GDS_BB` | GDS + Bounce Buffer | GDS + Bounce Buffer |

**Bounce Buffer (BB) modes** use GDS with an intermediate RDMA-registered GPU buffer
instead of registering each KV cache block directly. This is useful when the number of
Expand All @@ -50,7 +50,7 @@ extra GPU-to-GPU copy but avoiding per-block registration overhead.

## Configuration

Add `gds_mode` to `kv_connector_extra_config` in your vLLM config:
Add `backend` to `kv_connector_extra_config` in your vLLM config:

```yaml
--kv-transfer-config '{
Expand All @@ -62,12 +62,12 @@ Add `gds_mode` to `kv_connector_extra_config` in your vLLM config:
"shared_storage_path": "/mnt/nvme/kv-cache/",
"block_size": 256,
"threads_per_gpu": "64",
"gds_mode": "read_write"
"backend": "GDS"
}
}'
```

If you are unsure which mode to use, start with `read_write` for NVMe local disks or `bb_read_write` for shared storage.
If you are unsure which backend to use, start with `GDS` for NVMe local disks or `GDS_BB` for shared storage.

## Verify GDS is active

Expand Down Expand Up @@ -132,7 +132,7 @@ ip addr show # note the IPs of those interfaces
### Filesystem does not support O_DIRECT

- NFS, FUSE-based mounts, and tmpfs do not support O_DIRECT
- Use `bb_read_write` (Bounce Buffer mode) as a fallback for these filesystems
- Use `GDS_BB` (Bounce Buffer mode) as a fallback for these filesystems
- For full GDS performance, use a local NVMe or NVMe-oF mount

### GDS falls back silently to CPU
Expand Down
43 changes: 43 additions & 0 deletions kv_connectors/llmd_fs_backend/docs/object_store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Object Storage Guide

Object storage enables usage of an object storage backend for caching kv data.

The object store support is built on top of NIXL and boto3.

## Requirements

- NVIDIA GPU with CUDA libraries installed (required by NIXL)
- NIXL library (will be installed by the python wheel during the build)
- boto3 library (will be installed by the python wheel during the build)
- an object store (S3, Ceph, Noobaa, MinIO, etc.)

## Build

```bash
pip install -e .
```

## Configuration

Add the object store configuration and specify the object store backend in `kv_connector_extra_config` in your vLLM config:

```yaml
--kv-transfer-config '{
"kv_connector": "OffloadingConnector",
"kv_role": "kv_both",
"kv_connector_extra_config": {
"spec_name": "SharedStorageOffloadingSpec",
"spec_module_path": "llmd_fs_backend.spec",
"shared_storage_path": "/mnt/nvme/kv-cache/",
"block_size": 256,
"threads_per_gpu": "64",
"bucket": "testing1",
"scheme": "https",
"endpoint_url": "172.30.228.75:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin",
"ca-bundle": "/root/tls.crt",
"backend": "OBJ",
}
}'
```
76 changes: 76 additions & 0 deletions kv_connectors/llmd_fs_backend/llmd_fs_backend/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2025 The llm-d Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Factory for instantiating storage offload engine backends.

Backend names and their meanings:
"POSIX" - POSIX engine, GDS disabled
"POSIX_GDS_READ" - GDS for reads only
"POSIX_GDS_WRITE" - GDS for writes only
"POSIX_GDS" - GDS for reads and writes
"POSIX_BB_READ" - bounce-buffer GDS for reads only
"POSIX_BB_WRITE" - bounce-buffer GDS for writes only
"POSIX_BB" - bounce-buffer GDS for reads and writes
"OBJ" - NIXL S3 object store backend
"""

import storage_offload
from llmd_nixl.obj_backend import ObjBackend

# Maps backend name -> gds_mode string for C++ POSIX engine variants.
_POSIX_GDS_MODES = {
"POSIX": "disabled",
"GDS_READ": "read_only",
"GDS_WRITE": "write_only",
"GDS": "read_write",
"GDS_BB_READ": "bb_read_only",
"GDS_BB_WRITE": "bb_write_only",
"GDS_BB": "bb_read_write",
}

# C++ POSIX backends that do not use a CPU staging buffer (full GDS path).
_POSIX_NO_STAGING = {"GDS", "GDS_BB"}


def posix_uses_no_staging(backend: str) -> bool:
"""Return True if the backend uses full GDS (no CPU staging buffer)."""
return backend in _POSIX_NO_STAGING


def make_storage_engine(backend: str, **kwargs):
"""
Instantiate the correct storage engine for the given backend name.
Args:
backend: See module docstring for valid values.
**kwargs: Forwarded to the backend constructor.
"""
if backend in _POSIX_GDS_MODES:
return storage_offload.StorageOffloadEngine(
kwargs["io_threads"],
kwargs["gpu_blocks_per_file"],
kwargs["tensors"],
kwargs.get("read_preferring_workers", 1),
_POSIX_GDS_MODES[backend],
)

_backends = {
"OBJ": ObjBackend,
}
cls = _backends.get(backend)
if cls is None:
raise ValueError(
f"Unknown backend {backend!r}. "
f"Valid options: {list(_POSIX_GDS_MODES) + list(_backends)}"
)
return cls(**kwargs)
61 changes: 55 additions & 6 deletions kv_connectors/llmd_fs_backend/llmd_fs_backend/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from collections.abc import Iterable

import os
import boto3
from botocore.exceptions import ClientError

from vllm.logger import init_logger
from vllm.v1.core.kv_cache_utils import BlockHash
from vllm.v1.kv_offload.abstract import (
Expand All @@ -34,8 +37,37 @@ class SharedStorageOffloadingManager(OffloadingManager):
SharedStorageOffloadingManager manages KV offloading to a shared storage medium.
"""

def __init__(self, file_mapper: FileMapper) -> None:
LOOKUP_MODE_FILE = "file"
LOOKUP_MODE_OBJECT_STORE = "object_store"
LOOKUP_MODE_DICT = "dict"

def __init__(
self,
file_mapper: FileMapper,
bucket: str,
endpoint_override: str,
access_key: str,
secret_key: str,
lookup_mode: str = "file",
scheme: str = "http",
ca_bundle: str = "",
) -> None:
self.file_mapper: FileMapper = file_mapper
self.bucket = bucket
self.lookup_mode = lookup_mode
self._stored_keys: set[str] = set()

if lookup_mode == self.LOOKUP_MODE_OBJECT_STORE:
# boto3 uses a full URL (scheme + host:port); nixl takes host:port
# separately via endpoint_override + scheme params.
endpoint_url = f"{scheme}://{endpoint_override}"
self.s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
verify=ca_bundle if ca_bundle else True,
)

# ----------------------------------------------------------------------
# Lookup
Expand All @@ -46,10 +78,24 @@ def lookup(self, block_hashes: Iterable[BlockHash]) -> int:
"""
hit_count = 0
for block_hash in block_hashes:
file_path = self.file_mapper.get_file_name(block_hash)
if not os.path.exists(file_path):
break
key = self.file_mapper.get_file_name(block_hash)
if self.lookup_mode == self.LOOKUP_MODE_DICT:
# this is good only for local lookup
# or for identifying fastest possible lookup latency
if key not in self._stored_keys:
break
elif self.lookup_mode == self.LOOKUP_MODE_OBJECT_STORE:
try:
self.s3.head_object(Bucket=self.bucket, Key=key)
except ClientError:
break
elif self.lookup_mode == self.LOOKUP_MODE_FILE:
if not os.path.exists(key):
break
else:
raise ValueError(f"Unknown lookup_mode: {self.lookup_mode!r}")
hit_count += 1
logger.debug("lookup: %d", hit_count)
return hit_count

# ----------------------------------------------------------------------
Expand Down Expand Up @@ -98,5 +144,8 @@ def prepare_store(
def complete_store(self, block_hashes: Iterable[BlockHash], success: bool = True):
"""
For shared storage, storing is stateless - no action needed.
In dict lookup mode, record successfully stored keys.
"""
pass
if success and self.lookup_mode == self.LOOKUP_MODE_DICT:
for block_hash in block_hashes:
self._stored_keys.add(self.file_mapper.get_file_name(block_hash))
28 changes: 23 additions & 5 deletions kv_connectors/llmd_fs_backend/llmd_fs_backend/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
"max_staging_memory_gb", DEFAULT_MAX_STAGING_MEMORY_GB
)
) # Max staging CPU buffer in GB
# GDS mode: disabled, read_only, write_only, read_write,
# bb_read_only, bb_write_only, bb_read_write
self.gds_mode = str(self.extra_config.get("gds_mode", "disabled"))

self.offloaded_block_size = int(
self.extra_config.get("block_size", DEFAULT_STORAGE_BLOCK_SIZE)
Expand All @@ -82,6 +79,8 @@ def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
)
)

self.backend = self.extra_config.get("backend", "POSIX")

parallel_config = vllm_config.parallel_config
tp_size = parallel_config.tensor_parallel_size
pp_size = parallel_config.pipeline_parallel_size
Expand All @@ -105,7 +104,20 @@ def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
def get_manager(self) -> OffloadingManager:
assert self.vllm_config.parallel_config.rank == 0, "Scheduler rank should be 0"
if not self._manager:
self._manager = SharedStorageOffloadingManager(file_mapper=self.file_mapper)
self._manager = SharedStorageOffloadingManager(
file_mapper=self.file_mapper,
bucket=self.extra_config.get("bucket", ""),
endpoint_override=self.extra_config.get("endpoint_override", ""),
scheme=self.extra_config.get("scheme", "http"),
access_key=self.extra_config.get("access_key", ""),
secret_key=self.extra_config.get("secret_key", ""),
lookup_mode=self.extra_config.get(
"lookup_mode",
SharedStorageOffloadingManager.LOOKUP_MODE_OBJECT_STORE if self.backend == "OBJ"
else SharedStorageOffloadingManager.LOOKUP_MODE_FILE,
),
ca_bundle=self.extra_config.get("ca_bundle", ""),
)
return self._manager

def get_handlers(
Expand All @@ -122,7 +134,13 @@ def get_handlers(
kv_caches=kv_caches,
threads_per_gpu=self.threads_per_gpu,
max_staging_memory_gb=self.max_staging_memory_gb,
gds_mode=self.gds_mode,
backend=self.backend,
bucket=self.extra_config.get("bucket", ""),
endpoint_override=self.extra_config.get("endpoint_override", ""),
scheme=self.extra_config.get("scheme", "http"),
access_key=self.extra_config.get("access_key", ""),
secret_key=self.extra_config.get("secret_key", ""),
ca_bundle=self.extra_config.get("ca_bundle", ""),
)

assert self._handlers is not None
Expand Down
Loading