Skip to content

Commit 3583b5c

Browse files
mltheuserMalte Heuser
andauthored
KG-616 fix streaming + tool call issue (#1262)
Related to [KG-616](https://youtrack.jetbrains.com/issue/KG-616) ## Motivation and Context This PR addresses critical issues encountered with SSE streaming for both Google and OpenRouter clients, ensuring robust functionality and better error handling. 1. **Google Client Fix:** * **Problem:** The [GoogleLLMClient](cci:2://file:///Users/ku76uh/Developer/jetbrains/fork/koog/prompt/prompt-executor/prompt-executor-clients/prompt-executor-google-client/src/commonMain/kotlin/ai/koog/prompt/executor/clients/google/GoogleLLMClient.kt:98:0-797:1) was ignoring the `tools` parameter during streaming requests ([executeStreaming](cci:1://file:///Users/ku76uh/Developer/jetbrains/fork/koog/prompt/prompt-executor/prompt-executor-model/src/commonMain/kotlin/ai/koog/prompt/executor/model/PromptExecutor.kt:36:4-48:24)), explicitly passing `emptyList()` instead. This broke tool usage for streaming calls. * **Solution:** We corrected this to specificially pass the `tools` parameter to [createGoogleRequest](cci:1://file:///Users/ku76uh/Developer/jetbrains/fork/koog/prompt/prompt-executor/prompt-executor-clients/prompt-executor-google-client/src/commonMain/kotlin/ai/koog/prompt/executor/clients/google/GoogleLLMClient.kt:277:4-446:5). 2. **OpenRouter Integration Test Fix:** * **Problem:** Integration tests for OpenRouter streaming were failing with `Socket is not connected` errors. This was traced back to the default `Apache5` HTTP client engine used in tests, which has known connection pooling issues with OpenRouter's non-standard Keep-Alive comments (e.g., `: OPENROUTER PROCESSING`). * **Solution:** We switched the integration test runtime dependency from `ktor-client-apache5` to `ktor-client-cio`. The CIO (Coroutine-based I/O) engine handles these SSE keep-alive comments correctly, resolving the socket closure issues. 3. **Enhanced SSE Error Handling:** * **Problem:** When a `SSEClientException` occurred (e.g., due to a 400 Bad Request or streaming interruption), the error handling logic in [KtorKoogHttpClient](cci:2://file:///Users/ku76uh/Developer/jetbrains/fork/koog/http-client/http-client-ktor/src/commonMain/kotlin/ai/koog/http/client/ktor/KtorKoogHttpClient.kt:49:0-191:1) would attempt to read `e.response?.bodyAsText()` directly. If the response body was already consumed or unavailable (common in streaming errors), this would throw a `DoubleReceiveException`, masking the original error. * **Solution:** We wrapped the `bodyAsText()` call in a `try-catch` block. This allows us to successfully capture the error body for handshake failures (like 400 Bad Request) while gracefully handling cases where the body is unavailable (falling back to `null`), ensuring the original `SSEClientException` is always propagated with maximum available context. ## Breaking Changes None. These are internal fixes and test configuration updates. --- #### Type of the changes - [ ] New feature (non-breaking change which adds functionality) - [x] Bug fix (non-breaking change which fixes an issue) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [x] Tests improvement - [ ] Refactoring #### Checklist - [x] The pull request has a description of the proposed change - [x] I read the [Contributing Guidelines](https://github.com/JetBrains/koog/blob/main/CONTRIBUTING.md) before opening the pull request - [x] The pull request uses **`develop`** as the base branch - [x] Tests for the changes have been added - [x] All new and existing tests passed ##### Additional steps for pull requests adding a new feature - [x] An issue describing the proposed change exists - [x] The pull request includes a link to the issue - [ ] The change was discussed and approved in the issue - [ ] Docs have been added / updated --------- Co-authored-by: Malte Heuser <[email protected]>
1 parent 9072fea commit 3583b5c

File tree

6 files changed

+76
-4
lines changed

6 files changed

+76
-4
lines changed

http-client/http-client-ktor/src/commonMain/kotlin/ai/koog/http/client/ktor/KtorKoogHttpClient.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,17 @@ public class KtorKoogHttpClient internal constructor(
161161
}
162162
}
163163
} catch (e: SSEClientException) {
164+
val errorBody = try {
165+
e.response?.bodyAsText()
166+
} catch (ignored: Exception) {
167+
logger.debug(ignored) { "Unable to read SSE error response body (may already be consumed)" }
168+
null
169+
}
164170
throw KoogHttpClientException(
165171
clientName = clientName,
166172
statusCode = e.response?.status?.value,
167-
errorBody = e.response?.bodyAsText(),
173+
errorBody = errorBody,
174+
message = e.message,
168175
cause = e
169176
)
170177
} catch (e: CancellationException) {

integration-tests/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ kotlin {
1717
implementation(libs.testcontainers)
1818
implementation(libs.ktor.server.netty)
1919
implementation(kotlin("test-junit5"))
20-
runtimeOnly(libs.ktor.client.apache5)
20+
runtimeOnly(libs.ktor.client.cio)
2121
runtimeOnly(libs.slf4j.simple)
2222
}
2323
}

integration-tests/src/jvmTest/kotlin/ai/koog/integration/tests/executor/ExecutorIntegrationTestBase.kt

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
8989
import org.junit.jupiter.api.BeforeAll
9090
import java.nio.file.Path
9191
import java.nio.file.Paths
92-
import java.util.Base64
92+
import java.util.*
9393
import kotlin.io.path.pathString
9494
import kotlin.io.path.readBytes
9595
import kotlin.io.path.readText
@@ -1119,6 +1119,59 @@ abstract class ExecutorIntegrationTestBase {
11191119
answer.shouldContain("20")
11201120
}
11211121
}
1122+
1123+
open fun integration_testExecuteStreamingWithTools(model: LLModel) = runTest(timeout = 300.seconds) {
1124+
Models.assumeAvailable(model.provider)
1125+
assumeTrue(model.capabilities.contains(LLMCapability.Tools), "Model $model does not support tools")
1126+
1127+
val executor = getExecutor(model)
1128+
1129+
val prompt = Prompt.build("test-streaming", LLMParams(toolChoice = ToolChoice.Required)) {
1130+
system("You are a helpful assistant.")
1131+
user("Count three times five")
1132+
}
1133+
1134+
withRetry(times = 3, testName = "integration_testExecuteStreamingWithTools[${model.id}]") {
1135+
with(StringBuilder()) {
1136+
val endMessages = mutableListOf<StreamFrame.End>()
1137+
val toolMessages = mutableListOf<StreamFrame.ToolCall>()
1138+
1139+
executor.executeStreamAndCollect(
1140+
prompt = prompt,
1141+
model = model,
1142+
tools = listOf(SimpleCalculatorTool.descriptor),
1143+
appendable = this,
1144+
endMessages = endMessages,
1145+
toolMessages = toolMessages
1146+
)
1147+
1148+
toolMessages.shouldNotBeEmpty()
1149+
withClue("Expected calculator tool call but got: [$toolMessages]") {
1150+
toolMessages.any {
1151+
it.name == SimpleCalculatorTool.name &&
1152+
it.content.contains(CalculatorOperation.MULTIPLY.name, ignoreCase = true)
1153+
} shouldBe true
1154+
}
1155+
}
1156+
}
1157+
}
1158+
1159+
private suspend fun PromptExecutor.executeStreamAndCollect(
1160+
prompt: Prompt,
1161+
model: LLModel,
1162+
tools: List<ToolDescriptor>,
1163+
appendable: StringBuilder,
1164+
endMessages: MutableList<StreamFrame.End>,
1165+
toolMessages: MutableList<StreamFrame.ToolCall>
1166+
) {
1167+
executeStreaming(prompt, model, tools).collect { frame ->
1168+
when (frame) {
1169+
is StreamFrame.Append -> appendable.append(frame.text)
1170+
is StreamFrame.ToolCall -> toolMessages.add(frame)
1171+
is StreamFrame.End -> endMessages.add(frame)
1172+
}
1173+
}
1174+
}
11221175
}
11231176

11241177
private suspend fun PromptExecutor.executeStreamAndCollect(

integration-tests/src/jvmTest/kotlin/ai/koog/integration/tests/executor/MultipleLLMPromptExecutorIntegrationTest.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ class MultipleLLMPromptExecutorIntegrationTest : ExecutorIntegrationTestBase() {
121121
super.integration_testExecuteStreamingWithTools(model)
122122
}
123123

124+
@ParameterizedTest
125+
@MethodSource("allCompletionModels")
126+
override fun integration_testExecuteStreamingWithTools(model: LLModel) {
127+
super.integration_testExecuteStreamingWithTools(model)
128+
}
129+
124130
@ParameterizedTest
125131
@MethodSource("allCompletionModels")
126132
override fun integration_testToolWithRequiredParams(model: LLModel) {

integration-tests/src/jvmTest/kotlin/ai/koog/integration/tests/executor/SingleLLMPromptExecutorIntegrationTest.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ class SingleLLMPromptExecutorIntegrationTest : ExecutorIntegrationTestBase() {
115115
super.integration_testExecuteStreamingWithTools(model)
116116
}
117117

118+
@ParameterizedTest
119+
@MethodSource("allCompletionModels")
120+
override fun integration_testExecuteStreamingWithTools(model: LLModel) {
121+
super.integration_testExecuteStreamingWithTools(model)
122+
}
123+
118124
@ParameterizedTest
119125
@MethodSource("allCompletionModels")
120126
override fun integration_testToolWithRequiredParams(model: LLModel) {

prompt/prompt-executor/prompt-executor-clients/prompt-executor-google-client/src/commonMain/kotlin/ai/koog/prompt/executor/clients/google/GoogleLLMClient.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public open class GoogleLLMClient(
172172
"Model ${model.id} does not support chat completions"
173173
}
174174

175-
val request = createGoogleRequest(prompt, model, emptyList())
175+
val request = createGoogleRequest(prompt, model, tools)
176176

177177
try {
178178
httpClient.sse(

0 commit comments

Comments
 (0)