Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions src/llm/language_model/continuous_batching/llm_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@

namespace ovms {
struct LLMExecutor {
bool isDynamicKVCache;
// For logging purposes we could have more information about graph and node here
std::mutex mutex;
std::condition_variable cv;
std::shared_ptr<ov::genai::ContinuousBatchingPipeline> pipe = nullptr;

LLMExecutor(std::shared_ptr<ov::genai::ContinuousBatchingPipeline> pipe) {
LLMExecutor(std::shared_ptr<ov::genai::ContinuousBatchingPipeline> pipe, bool isDynamicKVCacheSet = false) {
this->pipe = std::move(pipe);
this->isDynamicKVCache = isDynamicKVCacheSet;
}

bool hasRequests() {
Expand All @@ -59,12 +61,48 @@ struct LLMExecutor {
cv.notify_one();
}

std::string formatCacheInfo(float cacheUsage, size_t cacheBytes, bool isCacheDynamic) {
std::ostringstream oss;
oss << std::fixed << std::setprecision(1);
if (isCacheDynamic) {
oss << formatBytes(cacheBytes);
} else {
oss << cacheUsage << "% of " << formatBytes(cacheBytes);
}

return oss.str();
}

std::string formatBytes(size_t bytes)
{
const double KB = 1024.0;
const double MB = KB * 1024.0;
const double GB = MB * 1024.0;
const double TB = GB * 1024.0;

std::ostringstream oss;
oss << std::fixed << std::setprecision(1);

if (bytes >= TB)
oss << (bytes / TB) << " TB";
else if (bytes >= GB)
oss << (bytes / GB) << " GB";
else if (bytes >= MB)
oss << (bytes / MB) << " MB";
else if (bytes >= KB)
oss << (bytes / KB) << " KB";
else
oss << bytes << " B";

return oss.str();
}

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
void printMetrics() {
ov::genai::PipelineMetrics metrics = pipe->get_metrics();
SPDLOG_LOGGER_INFO(llm_executor_logger, "All requests: {}; Scheduled requests: {}; Cache usage {:.1f}%;",
metrics.requests, metrics.scheduled_requests, metrics.cache_usage);
SPDLOG_LOGGER_INFO(llm_executor_logger, "All requests: {}; Scheduled requests: {}; Cache usage {};",
metrics.requests, metrics.scheduled_requests, formatCacheInfo(metrics.cache_usage, metrics.kv_cache_usage_in_bytes, this->isDynamicKVCache));
}
};
#pragma GCC diagnostic pop
Expand Down Expand Up @@ -98,8 +136,8 @@ class LLMExecutorWrapper {
}

public:
LLMExecutorWrapper(std::shared_ptr<ov::genai::ContinuousBatchingPipeline> pipe) :
llmExecutor(std::move(pipe)) {
LLMExecutorWrapper(std::shared_ptr<ov::genai::ContinuousBatchingPipeline> pipe, bool isDynamicKVCache = false) :
llmExecutor(std::move(pipe), isDynamicKVCache) {
llmExecutorThread = std::thread(LLMExecutorWrapper::run, &llmExecutor, &finishExecutorThread);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Status ContinuousBatchingServableInitializer::initialize(std::shared_ptr<GenAiSe
}
properties->maxModelLength = parseMaxModelLength(parsedModelsPath);

properties->llmExecutorWrapper = std::make_shared<LLMExecutorWrapper>(properties->pipeline);
properties->llmExecutorWrapper = std::make_shared<LLMExecutorWrapper>(properties->pipeline, properties->schedulerConfig.cache_size == 0);

return StatusCode::OK;
}
Expand Down