Skip to content

Commit e42011a

Browse files
committed
init
1 parent 22d1ec7 commit e42011a

File tree

4 files changed

+24
-4
lines changed

4 files changed

+24
-4
lines changed

src/llm/language_model/legacy/servable.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ absl::Status LegacyServable::parseRequest(std::shared_ptr<GenAiServableExecution
8383

8484
if (legacyExecutionContext->apiHandler->isStream()) {
8585
legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string
86-
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput](std::string text) {
86+
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) {
8787
SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text);
88+
if (clientDisconnected.load()) {
89+
executionInProgress.notify_one();
90+
return ov::genai::StreamingStatus::CANCEL;
91+
}
8892
{
8993
std::lock_guard<std::mutex> lock(mutex);
9094
lastStreamerCallbackOutput += text;
@@ -130,10 +134,11 @@ absl::Status LegacyServable::scheduleExecution(std::shared_ptr<GenAiServableExec
130134
std::weak_ptr<LegacyServableExecutionContext> weakContext = legacyExecutionContext;
131135
legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() {
132136
if (auto context = weakContext.lock()) {
133-
context->clientDisconnected = true;
137+
context->signalDisconnection();
134138
}
135139
});
136140
if (legacyExecutionContext->payload.client->isDisconnected()) {
141+
legacyExecutionContext->signalDisconnection();
137142
return absl::CancelledError();
138143
}
139144
properties->legacyExecutor->addRequest(legacyExecutionContext);

src/llm/language_model/legacy/servable.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ struct LegacyServableExecutionContext : public GenAiServableExecutionContext {
3737

3838
// Disconnection handling
3939
std::atomic<bool> clientDisconnected{false};
40+
41+
void signalDisconnection() {
42+
clientDisconnected = true;
43+
executionInProgress.notify_all();
44+
}
4045
};
4146

4247
struct LegacyServableProperties : public GenAiServableProperties {

src/llm/visual_language_model/legacy/servable.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,12 @@ absl::Status VisualLanguageModelLegacyServable::parseRequest(std::shared_ptr<Gen
9292

9393
if (legacyExecutionContext->apiHandler->isStream()) {
9494
legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string
95-
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput](std::string text) {
95+
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) {
9696
SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text);
97+
if (clientDisconnected.load()) {
98+
executionInProgress.notify_one();
99+
return ov::genai::StreamingStatus::CANCEL;
100+
}
97101
{
98102
std::lock_guard<std::mutex> lock(mutex);
99103
lastStreamerCallbackOutput += text;
@@ -123,10 +127,11 @@ absl::Status VisualLanguageModelLegacyServable::scheduleExecution(std::shared_pt
123127
std::weak_ptr<VisualLanguageModelLegacyServableExecutionContext> weakContext = legacyExecutionContext;
124128
legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() {
125129
if (auto context = weakContext.lock()) {
126-
context->clientDisconnected = true;
130+
context->signalDisconnection();
127131
}
128132
});
129133
if (legacyExecutionContext->payload.client->isDisconnected()) {
134+
legacyExecutionContext->signalDisconnection();
130135
return absl::CancelledError();
131136
}
132137
properties->legacyExecutor->addRequest(legacyExecutionContext);

src/llm/visual_language_model/legacy/servable.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ struct VisualLanguageModelLegacyServableExecutionContext : public GenAiServableE
4040

4141
// Disconnection handling
4242
std::atomic<bool> clientDisconnected{false};
43+
44+
void signalDisconnection() {
45+
clientDisconnected = true;
46+
executionInProgress.notify_all();
47+
}
4348
};
4449

4550
struct VisualLanguageModelLegacyServableProperties : public GenAiServableProperties {

0 commit comments

Comments
 (0)