Implement code execution engine API in ServicesHttpClient#51
Merged
orpiske merged 1 commit intowanaku-ai:mainfrom Jan 24, 2026
Merged
Implement code execution engine API in ServicesHttpClient#51orpiske merged 1 commit intowanaku-ai:mainfrom
orpiske merged 1 commit intowanaku-ai:mainfrom
Conversation
- Added executeCode() method to submit code for execution
- POST to /api/v2/code-execution-engine/{engineType}/{language}
- Validates CodeExecutionRequest before submission
- Returns WanakuResponse<CodeExecutionResponse> with task ID and stream URL
- Added streamCodeExecutionEvents() method to consume SSE stream
- GET from /api/v2/code-execution-engine/{engineType}/{language}/{taskId}
- Connects to Server-Sent Events (SSE) endpoint
- Parses SSE format (data: {json}\n\n)
- Deserializes JSON to CodeExecutionEvent objects
- Uses Consumer<CodeExecutionEvent> callback pattern
- Automatically stops on terminal events (COMPLETED, FAILED, TIMEOUT, CANCELLED)
- Handles errors gracefully with logging
- Added jackson-databind dependency for JSON handling
- Added imports for CodeExecutionEvent, CodeExecutionRequest, CodeExecutionResponse
The new methods enable the SDK to submit code execution requests to Wanaku
and monitor execution progress in real-time via SSE streams.
Resolves wanaku-ai#46
Reviewer's GuideExtends ServicesHttpClient with a client-side API for the new /api/v2/code-execution-engine endpoints, including request submission and SSE-based event streaming, plus supporting JSON parsing via Jackson. Sequence diagram for executeCode and SSE streaming with streamCodeExecutionEventssequenceDiagram
actor Client
participant ServicesHttpClient
participant HttpClient
participant WanakuCodeExecutionEngine as CodeExecutionEngineAPI
participant ObjectMapper
Client->>ServicesHttpClient: executeCode(engineType, language, codeExecutionRequest)
ServicesHttpClient->>CodeExecutionRequest: validate()
CodeExecutionRequest-->>ServicesHttpClient: validationOk
ServicesHttpClient->>ServicesHttpClient: build path /api/v2/code-execution-engine/{engineType}/{language}
ServicesHttpClient->>WanakuCodeExecutionEngine: HTTP POST path with CodeExecutionRequest
WanakuCodeExecutionEngine-->>ServicesHttpClient: HTTP 2xx with CodeExecutionResponse JSON
ServicesHttpClient->>ObjectMapper: deserialize CodeExecutionResponse
ObjectMapper-->>ServicesHttpClient: CodeExecutionResponse
ServicesHttpClient-->>Client: WanakuResponse<CodeExecutionResponse>
Client->>ServicesHttpClient: streamCodeExecutionEvents(engineType, language, taskId, eventConsumer)
ServicesHttpClient->>ServicesHttpClient: build path /api/v2/code-execution-engine/{engineType}/{language}/{taskId}
ServicesHttpClient->>HttpClient: send(HttpRequest text/event-stream)
HttpClient->>WanakuCodeExecutionEngine: HTTP GET SSE
WanakuCodeExecutionEngine-->>HttpClient: HTTP 2xx with SSE InputStream
HttpClient-->>ServicesHttpClient: HttpResponse<InputStream>
ServicesHttpClient->>ServicesHttpClient: parseSSEStream(inputStream, eventConsumer)
loop For each SSE event
ServicesHttpClient->>ServicesHttpClient: read lines and assemble data
ServicesHttpClient->>ObjectMapper: readValue(jsonData, CodeExecutionEvent)
ObjectMapper-->>ServicesHttpClient: CodeExecutionEvent
ServicesHttpClient->>Client: eventConsumer.accept(CodeExecutionEvent)
ServicesHttpClient->>ServicesHttpClient: isTerminalEvent(CodeExecutionEvent)
alt Terminal event
ServicesHttpClient-->>ServicesHttpClient: break stream loop
else Non-terminal event
ServicesHttpClient-->>ServicesHttpClient: continue streaming
end
end
alt HTTP error status
WanakuCodeExecutionEngine-->>ServicesHttpClient: HTTP non-2xx
ServicesHttpClient-->>Client: throw WanakuWebException
end
alt IO or InterruptedException
ServicesHttpClient-->>Client: throw WanakuException
end
Updated class diagram for ServicesHttpClient code execution engine methodsclassDiagram
class ServicesHttpClient {
- String baseUrl
- HttpClient httpClient
- ObjectMapper objectMapper
- ServiceAuthenticator serviceAuthenticator
+ WanakuResponse~CodeExecutionResponse~ executeCode(String engineType, String language, CodeExecutionRequest request)
+ void streamCodeExecutionEvents(String engineType, String language, String taskId, Consumer~CodeExecutionEvent~ eventConsumer)
- void parseSSEStream(InputStream inputStream, Consumer~CodeExecutionEvent~ eventConsumer)
- boolean isTerminalEvent(CodeExecutionEvent event)
+ void removeDataStore(String id)
+ void removeDataStoresByName(String name)
+ WanakuResponse executePost(String path, Object body, TypeReference typeReference)
}
class CodeExecutionRequest {
- String code
- Map~String, Object~ parameters
+ void validate()
}
class CodeExecutionResponse {
+ String taskId()
+ String streamUrl()
}
class CodeExecutionEvent {
+ CodeExecutionEventType getEventType()
+ String getOutput()
+ String getError()
+ Integer getExitCode()
+ String getMessage()
}
class CodeExecutionEventType {
<<enumeration>>
STARTED
OUTPUT
ERROR
COMPLETED
FAILED
TIMEOUT
CANCELLED
}
class WanakuResponse~T~ {
+ T getData()
+ int getStatusCode()
+ String getMessage()
}
class WanakuException {
+ WanakuException(String message)
+ WanakuException(String message, Throwable cause)
}
class WanakuWebException {
+ WanakuWebException(String message, int statusCode)
}
class ServiceAuthenticator {
+ String toHeaderValue()
}
class HttpClient {
+ HttpResponse send(HttpRequest request, HttpResponse.BodyHandler bodyHandler)
}
class ObjectMapper {
+ <T> T readValue(String content, Class clazz)
}
class Consumer~T~ {
+ void accept(T value)
}
class InputStream
class HttpRequest
class HttpResponse~T~ {
+ int statusCode()
+ T body()
}
ServicesHttpClient --> CodeExecutionRequest
ServicesHttpClient --> CodeExecutionResponse
ServicesHttpClient --> CodeExecutionEvent
ServicesHttpClient --> WanakuResponse
ServicesHttpClient --> WanakuException
ServicesHttpClient --> WanakuWebException
ServicesHttpClient --> ServiceAuthenticator
ServicesHttpClient --> HttpClient
ServicesHttpClient --> ObjectMapper
ServicesHttpClient --> Consumer
CodeExecutionEvent --> CodeExecutionEventType
WanakuWebException --|> WanakuException
WanakuException --|> RuntimeException
HttpResponse --> InputStream
Consumer <|.. eventConsumer
WanakuResponse "1" --> "1" CodeExecutionResponse : wraps
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The SSE parsing logic currently concatenates multiple
data:lines without any delimiter, which can corrupt multi-line JSON payloads; consider appending a newline or space betweendata:segments to preserve valid JSON. - The SSE parser only looks at
data:lines and ignores other valid SSE fields (e.g.,event,id, comments), so if the backend ever uses those you may want to extendparseSSEStreamto handle them explicitly or at least skip them robustly. - For
streamCodeExecutionEvents, consider validatingengineType,language, andtaskId(e.g., non-null/non-blank) before building the URI to fail fast on invalid input rather than issuing malformed HTTP requests.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The SSE parsing logic currently concatenates multiple `data:` lines without any delimiter, which can corrupt multi-line JSON payloads; consider appending a newline or space between `data:` segments to preserve valid JSON.
- The SSE parser only looks at `data:` lines and ignores other valid SSE fields (e.g., `event`, `id`, comments), so if the backend ever uses those you may want to extend `parseSSEStream` to handle them explicitly or at least skip them robustly.
- For `streamCodeExecutionEvents`, consider validating `engineType`, `language`, and `taskId` (e.g., non-null/non-blank) before building the URI to fail fast on invalid input rather than issuing malformed HTTP requests.
## Individual Comments
### Comment 1
<location> `capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java:577-468` </location>
<code_context>
+ while ((line = reader.readLine()) != null) {
</code_context>
<issue_to_address>
**issue:** Handle a final SSE event that is not terminated by an empty line at end-of-stream.
Because events are only emitted on an empty line, an SSE stream that ends immediately after event data (without a trailing blank line) will drop the final event in `dataBuilder`. After the `while` loop, consider checking `dataBuilder.length() > 0` and emitting one last event to handle this case.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| @@ -458,4 +466,155 @@ public void removeDataStore(String id) { | |||
| public void removeDataStoresByName(String name) { | |||
| executeDelete("/api/v1/data-store/remove?name=" + name); | |||
| } | |||
There was a problem hiding this comment.
issue: Handle a final SSE event that is not terminated by an empty line at end-of-stream.
Because events are only emitted on an empty line, an SSE stream that ends immediately after event data (without a trailing blank line) will drop the final event in dataBuilder. After the while loop, consider checking dataBuilder.length() > 0 and emitting one last event to handle this case.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Enhanced ServicesHttpClient with methods to interact with the new code execution engine API (
/api/v2/code-execution-engine), enabling submission of code for execution and real-time monitoring via SSE streams.Changes
executeCode(): Submit code execution requests to Wanaku
/api/v2/code-execution-engine/{engineType}/{language}streamCodeExecutionEvents(): Monitor execution via Server-Sent Events
/api/v2/code-execution-engine/{engineType}/{language}/{taskId}Added
jackson-databinddependency for JSON serializationAdded imports for
CodeExecutionEvent,CodeExecutionRequest,CodeExecutionResponseTest Plan
Closes #46
Summary by Sourcery
Add client support in ServicesHttpClient for submitting code to the code execution engine and streaming execution events via SSE.
New Features:
Enhancements:
Build: