Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions capabilities-services-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
<version>${jakarta.ws.rs-api.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>ai.wanaku.sdk</groupId>
<artifactId>capabilities-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import ai.wanaku.capabilities.sdk.api.types.ResourceReference;
import ai.wanaku.capabilities.sdk.api.types.ToolReference;
import ai.wanaku.capabilities.sdk.api.types.WanakuResponse;
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionEvent;
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionRequest;
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionResponse;
import ai.wanaku.capabilities.sdk.api.types.io.ResourcePayload;
import ai.wanaku.capabilities.sdk.api.types.io.ToolPayload;
import ai.wanaku.capabilities.sdk.common.config.ServiceConfig;
Expand All @@ -18,8 +21,13 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand Down Expand Up @@ -458,4 +466,155 @@ public void removeDataStore(String id) {
public void removeDataStoresByName(String name) {
executeDelete("/api/v1/data-store/remove?name=" + name);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Handle a final SSE event that is not terminated by an empty line at end-of-stream.

Because events are only emitted on an empty line, an SSE stream that ends immediately after event data (without a trailing blank line) will drop the final event in dataBuilder. After the while loop, consider checking dataBuilder.length() > 0 and emitting one last event to handle this case.


// ==================== Code Execution Engine API Methods ====================

/**
* Submits code for execution to the Wanaku Code Execution Engine.
* <p>
* This method submits a code execution request to the specified engine type and language.
* The response contains a task ID and stream URL that can be used to monitor the execution
* progress via Server-Sent Events (SSE).
* <p>
* Example usage:
* <pre>{@code
* CodeExecutionRequest request = new CodeExecutionRequest("System.out.println(\"Hello\");");
* WanakuResponse<CodeExecutionResponse> response = client.executeCode("jvm", "java", request);
* String taskId = response.getData().taskId();
* String streamUrl = response.getData().streamUrl();
* }</pre>
*
* @param engineType The type of execution engine (e.g., "jvm", "interpreted").
* @param language The programming language (e.g., "java", "groovy", "xml").
* @param request The code execution request containing the code and execution parameters.
* @return The response containing the task ID and stream URL wrapped in WanakuResponse.
* @throws WanakuException If an error occurs during the request.
* @since 1.0.0
*/
public WanakuResponse<CodeExecutionResponse> executeCode(
String engineType, String language, CodeExecutionRequest request) {
// Validate the request before sending
request.validate();

String path = String.format("/api/v2/code-execution-engine/%s/%s", engineType, language);
return executePost(path, request, new TypeReference<>() {});
}

/**
* Streams code execution events from the SSE endpoint.
* <p>
* This method connects to the Server-Sent Events (SSE) stream endpoint for a specific
* code execution task and consumes events as they arrive. The consumer callback is
* invoked for each event received from the stream.
* <p>
* The stream will continue until the execution completes (COMPLETED, FAILED, TIMEOUT,
* or CANCELLED event) or an error occurs.
* <p>
* Example usage:
* <pre>{@code
* client.streamCodeExecutionEvents("jvm", "java", taskId, event -> {
* switch (event.getEventType()) {
* case STARTED -> System.out.println("Execution started");
* case OUTPUT -> System.out.print(event.getOutput());
* case ERROR -> System.err.print(event.getError());
* case COMPLETED -> System.out.println("Exit code: " + event.getExitCode());
* case FAILED -> System.err.println("Execution failed: " + event.getMessage());
* }
* });
* }</pre>
*
* @param engineType The type of execution engine (e.g., "jvm", "interpreted").
* @param language The programming language (e.g., "java", "groovy", "xml").
* @param taskId The UUID of the execution task.
* @param eventConsumer The consumer callback to handle each event.
* @throws WanakuException If an error occurs during streaming.
* @since 1.0.0
*/
public void streamCodeExecutionEvents(
String engineType, String language, String taskId, Consumer<CodeExecutionEvent> eventConsumer) {
String path = String.format("/api/v2/code-execution-engine/%s/%s/%s", engineType, language, taskId);
URI uri = URI.create(this.baseUrl + path);

HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.header("Accept", "text/event-stream")
.header("Authorization", serviceAuthenticator.toHeaderValue())
.GET()
.build();

try {
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());

if (response.statusCode() >= 200 && response.statusCode() < 300) {
parseSSEStream(response.body(), eventConsumer);
} else {
throw new WanakuWebException(
"Failed to connect to SSE stream: HTTP " + response.statusCode(),
response.statusCode());
}
} catch (IOException e) {
throw new WanakuException("I/O error while streaming events", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WanakuException("Event streaming interrupted", e);
}
}

/**
* Parses an SSE stream and invokes the consumer for each event.
*
* @param inputStream The input stream containing SSE data.
* @param eventConsumer The consumer callback to handle each event.
* @throws IOException If an I/O error occurs while reading the stream.
*/
private void parseSSEStream(InputStream inputStream, Consumer<CodeExecutionEvent> eventConsumer)
throws IOException {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
StringBuilder dataBuilder = new StringBuilder();

while ((line = reader.readLine()) != null) {
// SSE format: lines starting with "data:" contain the JSON payload
if (line.startsWith("data:")) {
String data = line.substring(5).trim(); // Remove "data:" prefix
dataBuilder.append(data);
} else if (line.isEmpty() && dataBuilder.length() > 0) {
// Empty line signals end of an event
String jsonData = dataBuilder.toString();
dataBuilder.setLength(0); // Clear for next event

try {
CodeExecutionEvent event = objectMapper.readValue(jsonData, CodeExecutionEvent.class);
eventConsumer.accept(event);

// Stop streaming if we received a terminal event
if (isTerminalEvent(event)) {
LOG.debug("Received terminal event: {}", event.getEventType());
break;
}
} catch (JsonProcessingException e) {
LOG.warn("Failed to parse SSE event data: {}", jsonData, e);
// Continue processing other events even if one fails to parse
}
}
}
}
}

/**
* Checks if an event is a terminal event (indicates end of execution).
*
* @param event The code execution event to check.
* @return true if the event is terminal, false otherwise.
*/
private boolean isTerminalEvent(CodeExecutionEvent event) {
if (event.getEventType() == null) {
return false;
}
return switch (event.getEventType()) {
case COMPLETED, FAILED, TIMEOUT, CANCELLED -> true;
default -> false;
};
}
}