Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions kv_cache_manager/data_storage/data_storage_backend.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "kv_cache_manager/common/error_code.h"
#include "kv_cache_manager/data_storage/common_define.h"
Expand All @@ -13,6 +15,23 @@

namespace kv_cache_manager {

// Block keys for a single spec
struct SpecBlockKeys {
std::string spec_name; // e.g., "tp0", "Linear_TP0"
int64_t spec_size; // Size per key for this spec
std::vector<int64_t> block_keys; // Raw key values
std::vector<size_t> original_key_indices; // Mapping to caller's keys array indices
};

// Request object for creating blocks, extensible for future features (e.g., lifecycle groups)
struct CreateBlocksRequest {
std::string instance_id;
std::vector<SpecBlockKeys> spec_block_keys;
};

// Per-spec create result, aligned 1:1 with CreateBlocksRequest::spec_block_keys
using SpecCreateResult = std::vector<std::pair<ErrorCode, DataStorageUri>>;

class DataStorageBackend {
public:
DataStorageBackend() = delete;
Expand Down Expand Up @@ -42,10 +61,9 @@ class DataStorageBackend {
}
virtual ErrorCode DoOpen(const StorageConfig &config, const std::string &trace_id) = 0;
virtual ErrorCode Close() = 0;
virtual std::vector<std::pair<ErrorCode, DataStorageUri>> Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) = 0;
virtual std::vector<SpecCreateResult> Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) = 0;
virtual std::vector<ErrorCode>
Delete(const std::vector<DataStorageUri> &storage_uris, const std::string &trace_id, std::function<void()> cb) = 0;
virtual std::vector<bool> Exist(const std::vector<DataStorageUri> &storage_uris) = 0;
Expand Down
29 changes: 17 additions & 12 deletions kv_cache_manager/data_storage/data_storage_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ std::shared_ptr<DataStorageBackend> DataStorageManager::CreateStorageBackend(con
}
}

std::vector<std::pair<ErrorCode, DataStorageUri>> DataStorageManager::Create(RequestContext *request_context,
const std::string &unique_name,
const std::vector<std::string> &keys,
size_t size_per_key,
std::function<void()> cb) {
std::vector<SpecCreateResult> DataStorageManager::Create(RequestContext *request_context,
const std::string &unique_name,
const CreateBlocksRequest &request,
std::function<void()> cb) {
SPAN_TRACER(request_context);
std::shared_lock<std::shared_mutex> lock(rw_lock_);
const std::string &trace_id = request_context->trace_id();
Expand All @@ -190,18 +189,24 @@ std::vector<std::pair<ErrorCode, DataStorageUri>> DataStorageManager::Create(Req
auto storage_backend = iter->second;
const auto dsmc = storage_backend->GetMetricsCollector();
KVCM_METRICS_COLLECTOR_CHRONO_MARK_BEGIN(dsmc, DataStorageCreate);
std::vector<std::pair<ErrorCode, DataStorageUri>> create_result =
storage_backend->Create(keys, size_per_key, trace_id, cb);
auto create_result = storage_backend->Create(request, trace_id, cb);
KVCM_METRICS_COLLECTOR_CHRONO_MARK_END(dsmc, DataStorageCreate);
KVCM_METRICS_COLLECTOR_SET_METRICS(dsmc, data_storage, create_keys_qps, keys.size());
// Count total keys from all specs for metrics
size_t total_keys = 0;
for (const auto &spec_block : request.spec_block_keys) {
total_keys += spec_block.block_keys.size();
}
KVCM_METRICS_COLLECTOR_SET_METRICS(dsmc, data_storage, create_keys_qps, total_keys);
if (request_context) {
request_context->GetMetricsCollectorsVehicle().AddMetricsCollector(dsmc);
}
std::for_each(create_result.begin(), create_result.end(), [&unique_name](auto &pair) {
if (pair.first == EC_OK) {
pair.second.SetHostName(unique_name);
for (auto &spec_result : create_result) {
for (auto &[ec, uri] : spec_result) {
if (ec == EC_OK) {
uri.SetHostName(unique_name);
}
}
});
}
return create_result;
}

Expand Down
9 changes: 4 additions & 5 deletions kv_cache_manager/data_storage/data_storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ class DataStorageManager {
ErrorCode UnRegisterStorage(const std::string &name);
ErrorCode DoCleanup();

std::vector<std::pair<ErrorCode, DataStorageUri>> Create(RequestContext *request_context,
const std::string &unique_name,
const std::vector<std::string> &keys,
size_t size_per_key,
std::function<void()> cb);
std::vector<SpecCreateResult> Create(RequestContext *request_context,
const std::string &unique_name,
const CreateBlocksRequest &request,
std::function<void()> cb);

std::vector<ErrorCode> Delete(RequestContext *request_context,
const std::string &unique_name,
Expand Down
76 changes: 47 additions & 29 deletions kv_cache_manager/data_storage/hf3fs_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,59 @@ ErrorCode Hf3fsBackend::Close() {
return EC_OK;
};

std::vector<std::pair<ErrorCode, DataStorageUri>> Hf3fsBackend::Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<std::pair<ErrorCode, DataStorageUri>> result;
std::vector<std::vector<std::string>> batches;
std::vector<SpecCreateResult> Hf3fsBackend::Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<SpecCreateResult> result;
int32_t batch_size = spec_.key_count_per_file();
batch_size = batch_size <= 0 ? 1 : batch_size;
size_t total_key_count = keys.size();
for (size_t start = 0; start < total_key_count; start += batch_size) {
size_t end = std::min(start + batch_size, total_key_count);
batches.emplace_back(keys.begin() + start, keys.begin() + end);
}
for (auto &batch : batches) {
DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
if (batch.size() > 1) {
std::string combine_key = StringUtil::Join(batch, "|");
std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42));
storage_uri.SetPath(base_path_ / (batch[0] + "_" + hash_str));
} else {
storage_uri.SetPath(base_path_ / batch[0]);
}
storage_uri.SetParam("size", std::to_string(size_per_key));
for (size_t j = 0; j < batch.size(); ++j) {
if (batch_size > 1) {
storage_uri.SetParam("blkid", std::to_string(j));

// Process each spec independently - never mix keys from different specs in a batch
for (const auto &spec_block : request.spec_block_keys) {
SpecCreateResult spec_result;
const auto &block_keys = spec_block.block_keys;
size_t total_key_count = block_keys.size();

// Batch within this spec only
for (size_t start = 0; start < total_key_count; start += batch_size) {
size_t end = std::min(start + batch_size, total_key_count);

// Build block key strings for this batch
std::vector<std::string> batch_keys;
batch_keys.reserve(end - start);
for (size_t i = start; i < end; ++i) {
std::string block_key = request.instance_id + "/" + spec_block.spec_name + "/" +
StringUtil::Uint64ToHex(block_keys[i]);
batch_keys.push_back(std::move(block_key));
}
result.push_back({EC_OK, storage_uri});
if (spec_.touch_file_when_create()) {
Hf3fsDataStorageItem item = Hf3fsDataStorageItem::FromUri(storage_uri);
TouchFile(item.file_path);

// Generate URI for this batch
DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
if (batch_keys.size() > 1) {
std::string combine_key = StringUtil::Join(batch_keys, "|");
std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42));
storage_uri.SetPath(base_path_ / (batch_keys[0] + "_" + hash_str));
} else {
storage_uri.SetPath(base_path_ / batch_keys[0]);
}
storage_uri.SetParam("size", std::to_string(spec_block.spec_size));

// Add result entry for each key in the batch
for (size_t j = 0; j < batch_keys.size(); ++j) {
if (batch_size > 1) {
storage_uri.SetParam("blkid", std::to_string(j));
}
spec_result.push_back({EC_OK, storage_uri});
if (spec_.touch_file_when_create()) {
Hf3fsDataStorageItem item = Hf3fsDataStorageItem::FromUri(storage_uri);
TouchFile(item.file_path);
}
}
}
result.push_back(std::move(spec_result));
}

if (cb) {
cb();
}
Expand Down
7 changes: 3 additions & 4 deletions kv_cache_manager/data_storage/hf3fs_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ class Hf3fsBackend : public DataStorageBackend {
ErrorCode DoOpen(const StorageConfig &storage_config, const std::string &trace_id) override;
ErrorCode Close() override;

std::vector<std::pair<ErrorCode, DataStorageUri>> Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<SpecCreateResult> Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<ErrorCode> Delete(const std::vector<DataStorageUri> &storage_uris,
const std::string &trace_id,
std::function<void()> cb) override;
Expand Down
36 changes: 23 additions & 13 deletions kv_cache_manager/data_storage/mooncake_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <utility>

#include "kv_cache_manager/common/logger.h"
#include "kv_cache_manager/common/string_util.h"
#include "kv_cache_manager/metrics/metrics_registry.h"

namespace kv_cache_manager {
Expand Down Expand Up @@ -80,23 +81,32 @@ ErrorCode MooncakeBackend::Close() {
return EC_OK;
};

std::vector<std::pair<ErrorCode, DataStorageUri>> MooncakeBackend::Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<std::pair<ErrorCode, DataStorageUri>> result;
for (int i = 0; i < keys.size(); i++) {
DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
storage_uri.SetPath("/");
storage_uri.SetParam("key", keys[i]);
storage_uri.SetParam("size", std::to_string(size_per_key));
result.push_back({EC_OK, storage_uri});
std::vector<SpecCreateResult> MooncakeBackend::Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<SpecCreateResult> result;

// Process all specs and keys - Mooncake doesn't batch, create one URI per key
for (const auto &spec_block : request.spec_block_keys) {
SpecCreateResult spec_result;
for (size_t i = 0; i < spec_block.block_keys.size(); i++) {
// Build the key string: instance_id/spec_name/hex(key)
std::string key_str = request.instance_id + "/" + spec_block.spec_name + "/" +
StringUtil::Uint64ToHex(spec_block.block_keys[i]);

DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
storage_uri.SetPath("/");
storage_uri.SetParam("key", key_str);
storage_uri.SetParam("size", std::to_string(spec_block.spec_size));
spec_result.push_back({EC_OK, storage_uri});
}
result.push_back(std::move(spec_result));
}

if (cb) {
cb();
}
// not supported yet
return result;
}
std::vector<ErrorCode> MooncakeBackend::Delete(const std::vector<DataStorageUri> &storage_uris,
Expand Down
7 changes: 3 additions & 4 deletions kv_cache_manager/data_storage/mooncake_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ class MooncakeBackend : public DataStorageBackend {

ErrorCode Close() override;

std::vector<std::pair<ErrorCode, DataStorageUri>> Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<SpecCreateResult> Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<ErrorCode> Delete(const std::vector<DataStorageUri> &storage_uris,
const std::string &trace_id,
std::function<void()> cb) override;
Expand Down
70 changes: 44 additions & 26 deletions kv_cache_manager/data_storage/nfs_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,55 @@ ErrorCode NfsBackend::Close() {
return EC_OK;
};

std::vector<std::pair<ErrorCode, DataStorageUri>> NfsBackend::Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<std::pair<ErrorCode, DataStorageUri>> result;
std::vector<std::vector<std::string>> batches;
std::vector<SpecCreateResult> NfsBackend::Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) {
std::vector<SpecCreateResult> result;
int32_t batch_size = spec_.key_count_per_file();
batch_size = batch_size <= 0 ? 1 : batch_size;
size_t total_key_count = keys.size();
for (size_t start = 0; start < total_key_count; start += batch_size) {
size_t end = std::min(start + batch_size, total_key_count);
batches.emplace_back(keys.begin() + start, keys.begin() + end);
}
for (auto &batch : batches) {
DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
if (batch.size() > 1) {
std::string combine_key = StringUtil::Join(batch, "|");
std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42));
storage_uri.SetPath(spec_.root_path() + batch[0] + "_" + hash_str);
} else {
storage_uri.SetPath(spec_.root_path() + batch[0]);
}
storage_uri.SetParam("size", std::to_string(size_per_key));
for (size_t j = 0; j < batch.size(); ++j) {
if (batch_size > 1) {
storage_uri.SetParam("blkid", std::to_string(j));

// Process each spec independently - never mix keys from different specs in a batch
for (const auto &spec_block : request.spec_block_keys) {
SpecCreateResult spec_result;
const auto &block_keys = spec_block.block_keys;
size_t total_key_count = block_keys.size();

// Batch within this spec only
for (size_t start = 0; start < total_key_count; start += batch_size) {
size_t end = std::min(start + batch_size, total_key_count);

// Build block key strings for this batch
std::vector<std::string> batch_keys;
batch_keys.reserve(end - start);
for (size_t i = start; i < end; ++i) {
std::string block_key = request.instance_id + "/" + spec_block.spec_name + "/" +
StringUtil::Uint64ToHex(block_keys[i]);
batch_keys.push_back(std::move(block_key));
}

// Generate URI for this batch
DataStorageUri storage_uri;
storage_uri.SetProtocol(ToString(GetType()));
if (batch_keys.size() > 1) {
std::string combine_key = StringUtil::Join(batch_keys, "|");
std::string hash_str = StringUtil::Uint64ToHex(Hash64(combine_key.c_str(), combine_key.size(), 42));
storage_uri.SetPath(spec_.root_path() + batch_keys[0] + "_" + hash_str);
} else {
storage_uri.SetPath(spec_.root_path() + batch_keys[0]);
}
storage_uri.SetParam("size", std::to_string(spec_block.spec_size));

// Add result entry for each key in the batch
for (size_t j = 0; j < batch_keys.size(); ++j) {
if (batch_size > 1) {
storage_uri.SetParam("blkid", std::to_string(j));
}
spec_result.push_back({EC_OK, storage_uri});
}
result.push_back({EC_OK, storage_uri});
}
result.push_back(std::move(spec_result));
}

if (cb) {
cb();
}
Expand Down
7 changes: 3 additions & 4 deletions kv_cache_manager/data_storage/nfs_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ class NfsBackend : public DataStorageBackend {
ErrorCode DoOpen(const StorageConfig &storage_config, const std::string &trace_id) override;
ErrorCode Close() override;

std::vector<std::pair<ErrorCode, DataStorageUri>> Create(const std::vector<std::string> &keys,
size_t size_per_key,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<SpecCreateResult> Create(const CreateBlocksRequest &request,
const std::string &trace_id,
std::function<void()> cb) override;
std::vector<ErrorCode> Delete(const std::vector<DataStorageUri> &storage_uris,
const std::string &trace_id,
std::function<void()> cb) override;
Expand Down
Loading
Loading