Skip to content

Commit ce41e54

Browse files
authored
[releases/2026/0][Legacy] Remove disconnected requests from execution queue (#3968) (#3973)
1 parent a7da8df commit ce41e54

File tree

6 files changed

+50
-18
lines changed

6 files changed

+50
-18
lines changed

src/llm/language_model/legacy/legacy_executor.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,22 @@ size_t LegacyExecutor::requestsQueueSize() {
3232

3333
void LegacyExecutor::processRequest() {
3434
OVMS_PROFILE_FUNCTION();
35-
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation started");
36-
try {
37-
requests.front()->results = pipe->generate(requests.front()->inputIds, requests.front()->generationConfigBuilder->getConfig(), requests.front()->textStreamer);
38-
} catch (std::exception& e) {
39-
requests.front()->success = false;
40-
SPDLOG_LOGGER_ERROR(llm_executor_logger, "LLM pipeline generation failed: {}.", e.what());
35+
auto& requestExecutionContext = requests.front();
36+
if (requestExecutionContext->clientDisconnected) {
37+
requestExecutionContext->success = false;
38+
SPDLOG_LOGGER_DEBUG(llm_executor_logger, "Client disconnected, skipping request processing.");
39+
} else {
40+
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation started");
41+
try {
42+
requestExecutionContext->results = pipe->generate(requestExecutionContext->inputIds, requestExecutionContext->generationConfigBuilder->getConfig(), requestExecutionContext->textStreamer);
43+
} catch (std::exception& e) {
44+
requestExecutionContext->success = false;
45+
SPDLOG_LOGGER_ERROR(llm_executor_logger, "LLM pipeline generation failed: {}.", e.what());
46+
}
47+
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended");
4148
}
42-
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended");
43-
requests.front()->readySignal.set_value();
44-
requests.front()->executionInProgress.notify_one();
49+
requestExecutionContext->readySignal.set_value();
50+
requestExecutionContext->executionInProgress.notify_one();
4551
std::unique_lock<std::mutex> lock(queueMutex);
4652
requests.pop();
4753
}

src/llm/language_model/legacy/servable.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ absl::Status LegacyServable::prepareInputs(std::shared_ptr<GenAiServableExecutio
127127

128128
absl::Status LegacyServable::scheduleExecution(std::shared_ptr<GenAiServableExecutionContext>& executionContext) {
129129
auto legacyExecutionContext = std::static_pointer_cast<LegacyServableExecutionContext>(executionContext);
130+
std::weak_ptr<LegacyServableExecutionContext> weakContext = legacyExecutionContext;
131+
legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() {
132+
if (auto context = weakContext.lock()) {
133+
context->clientDisconnected = true;
134+
}
135+
});
130136
if (legacyExecutionContext->payload.client->isDisconnected()) {
131137
return absl::CancelledError();
132138
}

src/llm/language_model/legacy/servable.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ struct LegacyServableExecutionContext : public GenAiServableExecutionContext {
3333
std::condition_variable executionInProgress;
3434
// Workaround needed to pass generation config to the executor that requires it
3535
ov::genai::GenerationConfig baseGenerationConfig;
36-
bool success = true;
36+
bool success{true};
37+
38+
// Disconnection handling
39+
std::atomic<bool> clientDisconnected{false};
3740
};
3841

3942
struct LegacyServableProperties : public GenAiServableProperties {

src/llm/visual_language_model/legacy/legacy_executor.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,22 @@ size_t VisualLanguageModelLegacyExecutor::requestsQueueSize() {
3333

3434
void VisualLanguageModelLegacyExecutor::processRequest() {
3535
OVMS_PROFILE_FUNCTION();
36-
try {
37-
requests.front()->results = pipe->generate(requests.front()->inputText, requests.front()->inputImages, requests.front()->generationConfigBuilder->getConfig(), requests.front()->textStreamer);
38-
} catch (std::exception& e) {
39-
requests.front()->success = false;
40-
SPDLOG_LOGGER_ERROR(llm_executor_logger, "VLM pipeline generation failed: {}.", e.what());
36+
auto& requestExecutionContext = requests.front();
37+
if (requestExecutionContext->clientDisconnected) {
38+
requestExecutionContext->success = false;
39+
SPDLOG_LOGGER_DEBUG(llm_executor_logger, "Client disconnected, skipping request processing.");
40+
} else {
41+
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation started");
42+
try {
43+
requestExecutionContext->results = pipe->generate(requestExecutionContext->inputText, requestExecutionContext->inputImages, requestExecutionContext->generationConfigBuilder->getConfig(), requestExecutionContext->textStreamer);
44+
} catch (std::exception& e) {
45+
requestExecutionContext->success = false;
46+
SPDLOG_LOGGER_ERROR(llm_executor_logger, "VLM pipeline generation failed: {}.", e.what());
47+
}
48+
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended");
4149
}
42-
requests.front()->readySignal.set_value();
43-
requests.front()->executionInProgress.notify_one();
50+
requestExecutionContext->readySignal.set_value();
51+
requestExecutionContext->executionInProgress.notify_one();
4452
std::unique_lock<std::mutex> lock(queueMutex);
4553
requests.pop();
4654
}

src/llm/visual_language_model/legacy/servable.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ absl::Status VisualLanguageModelLegacyServable::parseRequest(std::shared_ptr<Gen
120120

121121
absl::Status VisualLanguageModelLegacyServable::scheduleExecution(std::shared_ptr<GenAiServableExecutionContext>& executionContext) {
122122
auto legacyExecutionContext = std::static_pointer_cast<VisualLanguageModelLegacyServableExecutionContext>(executionContext);
123+
std::weak_ptr<VisualLanguageModelLegacyServableExecutionContext> weakContext = legacyExecutionContext;
124+
legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() {
125+
if (auto context = weakContext.lock()) {
126+
context->clientDisconnected = true;
127+
}
128+
});
123129
if (legacyExecutionContext->payload.client->isDisconnected()) {
124130
return absl::CancelledError();
125131
}

src/llm/visual_language_model/legacy/servable.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ struct VisualLanguageModelLegacyServableExecutionContext : public GenAiServableE
3636
std::string inputText;
3737
// Workaround needed to pass generation config to the executor that requires it
3838
ov::genai::GenerationConfig baseGenerationConfig;
39-
bool success = true;
39+
bool success{true};
40+
41+
// Disconnection handling
42+
std::atomic<bool> clientDisconnected{false};
4043
};
4144

4245
struct VisualLanguageModelLegacyServableProperties : public GenAiServableProperties {

0 commit comments

Comments
 (0)