Skip to content

Commit f047634

Browse files
committed
Add code execution engine API methods to ServicesHttpClient
- Added executeCode() method to submit code for execution - POST to /api/v2/code-execution-engine/{engineType}/{language} - Validates CodeExecutionRequest before submission - Returns WanakuResponse<CodeExecutionResponse> with task ID and stream URL - Added streamCodeExecutionEvents() method to consume SSE stream - GET from /api/v2/code-execution-engine/{engineType}/{language}/{taskId} - Connects to Server-Sent Events (SSE) endpoint - Parses SSE format (data: {json}\n\n) - Deserializes JSON to CodeExecutionEvent objects - Uses Consumer<CodeExecutionEvent> callback pattern - Automatically stops on terminal events (COMPLETED, FAILED, TIMEOUT, CANCELLED) - Handles errors gracefully with logging - Added jackson-databind dependency for JSON handling - Added imports for CodeExecutionEvent, CodeExecutionRequest, CodeExecutionResponse The new methods enable the SDK to submit code execution requests to Wanaku and monitor execution progress in real-time via SSE streams. Resolves #46
1 parent b1b5319 commit f047634

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed

capabilities-services-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
<version>${jakarta.ws.rs-api.version}</version>
2121
</dependency>
2222

23+
<dependency>
24+
<groupId>com.fasterxml.jackson.core</groupId>
25+
<artifactId>jackson-databind</artifactId>
26+
</dependency>
27+
2328
<dependency>
2429
<groupId>ai.wanaku.sdk</groupId>
2530
<artifactId>capabilities-common</artifactId>

capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import ai.wanaku.capabilities.sdk.api.types.ResourceReference;
1010
import ai.wanaku.capabilities.sdk.api.types.ToolReference;
1111
import ai.wanaku.capabilities.sdk.api.types.WanakuResponse;
12+
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionEvent;
13+
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionRequest;
14+
import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionResponse;
1215
import ai.wanaku.capabilities.sdk.api.types.io.ResourcePayload;
1316
import ai.wanaku.capabilities.sdk.api.types.io.ToolPayload;
1417
import ai.wanaku.capabilities.sdk.common.config.ServiceConfig;
@@ -18,8 +21,13 @@
1821
import com.fasterxml.jackson.core.JsonProcessingException;
1922
import com.fasterxml.jackson.core.type.TypeReference;
2023
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import java.io.BufferedReader;
2125
import java.io.IOException;
26+
import java.io.InputStreamReader;
27+
import java.io.InputStream;
28+
import java.nio.charset.StandardCharsets;
2229
import java.util.List;
30+
import java.util.function.Consumer;
2331
import java.net.URI;
2432
import java.net.http.HttpClient;
2533
import java.net.http.HttpRequest;
@@ -458,4 +466,155 @@ public void removeDataStore(String id) {
458466
public void removeDataStoresByName(String name) {
459467
executeDelete("/api/v1/data-store/remove?name=" + name);
460468
}
469+
470+
// ==================== Code Execution Engine API Methods ====================
471+
472+
/**
473+
* Submits code for execution to the Wanaku Code Execution Engine.
474+
* <p>
475+
* This method submits a code execution request to the specified engine type and language.
476+
* The response contains a task ID and stream URL that can be used to monitor the execution
477+
* progress via Server-Sent Events (SSE).
478+
* <p>
479+
* Example usage:
480+
* <pre>{@code
481+
* CodeExecutionRequest request = new CodeExecutionRequest("System.out.println(\"Hello\");");
482+
* WanakuResponse<CodeExecutionResponse> response = client.executeCode("jvm", "java", request);
483+
* String taskId = response.getData().taskId();
484+
* String streamUrl = response.getData().streamUrl();
485+
* }</pre>
486+
*
487+
* @param engineType The type of execution engine (e.g., "jvm", "interpreted").
488+
* @param language The programming language (e.g., "java", "groovy", "xml").
489+
* @param request The code execution request containing the code and execution parameters.
490+
* @return The response containing the task ID and stream URL wrapped in WanakuResponse.
491+
* @throws WanakuException If an error occurs during the request.
492+
* @since 1.0.0
493+
*/
494+
public WanakuResponse<CodeExecutionResponse> executeCode(
495+
String engineType, String language, CodeExecutionRequest request) {
496+
// Validate the request before sending
497+
request.validate();
498+
499+
String path = String.format("/api/v2/code-execution-engine/%s/%s", engineType, language);
500+
return executePost(path, request, new TypeReference<>() {});
501+
}
502+
503+
/**
504+
* Streams code execution events from the SSE endpoint.
505+
* <p>
506+
* This method connects to the Server-Sent Events (SSE) stream endpoint for a specific
507+
* code execution task and consumes events as they arrive. The consumer callback is
508+
* invoked for each event received from the stream.
509+
* <p>
510+
* The stream will continue until the execution completes (COMPLETED, FAILED, TIMEOUT,
511+
* or CANCELLED event) or an error occurs.
512+
* <p>
513+
* Example usage:
514+
* <pre>{@code
515+
* client.streamCodeExecutionEvents("jvm", "java", taskId, event -> {
516+
* switch (event.getEventType()) {
517+
* case STARTED -> System.out.println("Execution started");
518+
* case OUTPUT -> System.out.print(event.getOutput());
519+
* case ERROR -> System.err.print(event.getError());
520+
* case COMPLETED -> System.out.println("Exit code: " + event.getExitCode());
521+
* case FAILED -> System.err.println("Execution failed: " + event.getMessage());
522+
* }
523+
* });
524+
* }</pre>
525+
*
526+
* @param engineType The type of execution engine (e.g., "jvm", "interpreted").
527+
* @param language The programming language (e.g., "java", "groovy", "xml").
528+
* @param taskId The UUID of the execution task.
529+
* @param eventConsumer The consumer callback to handle each event.
530+
* @throws WanakuException If an error occurs during streaming.
531+
* @since 1.0.0
532+
*/
533+
public void streamCodeExecutionEvents(
534+
String engineType, String language, String taskId, Consumer<CodeExecutionEvent> eventConsumer) {
535+
String path = String.format("/api/v2/code-execution-engine/%s/%s/%s", engineType, language, taskId);
536+
URI uri = URI.create(this.baseUrl + path);
537+
538+
HttpRequest request = HttpRequest.newBuilder()
539+
.uri(uri)
540+
.header("Accept", "text/event-stream")
541+
.header("Authorization", serviceAuthenticator.toHeaderValue())
542+
.GET()
543+
.build();
544+
545+
try {
546+
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
547+
548+
if (response.statusCode() >= 200 && response.statusCode() < 300) {
549+
parseSSEStream(response.body(), eventConsumer);
550+
} else {
551+
throw new WanakuWebException(
552+
"Failed to connect to SSE stream: HTTP " + response.statusCode(),
553+
response.statusCode());
554+
}
555+
} catch (IOException e) {
556+
throw new WanakuException("I/O error while streaming events", e);
557+
} catch (InterruptedException e) {
558+
Thread.currentThread().interrupt();
559+
throw new WanakuException("Event streaming interrupted", e);
560+
}
561+
}
562+
563+
/**
564+
* Parses an SSE stream and invokes the consumer for each event.
565+
*
566+
* @param inputStream The input stream containing SSE data.
567+
* @param eventConsumer The consumer callback to handle each event.
568+
* @throws IOException If an I/O error occurs while reading the stream.
569+
*/
570+
private void parseSSEStream(InputStream inputStream, Consumer<CodeExecutionEvent> eventConsumer)
571+
throws IOException {
572+
try (BufferedReader reader = new BufferedReader(
573+
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
574+
String line;
575+
StringBuilder dataBuilder = new StringBuilder();
576+
577+
while ((line = reader.readLine()) != null) {
578+
// SSE format: lines starting with "data:" contain the JSON payload
579+
if (line.startsWith("data:")) {
580+
String data = line.substring(5).trim(); // Remove "data:" prefix
581+
dataBuilder.append(data);
582+
} else if (line.isEmpty() && dataBuilder.length() > 0) {
583+
// Empty line signals end of an event
584+
String jsonData = dataBuilder.toString();
585+
dataBuilder.setLength(0); // Clear for next event
586+
587+
try {
588+
CodeExecutionEvent event = objectMapper.readValue(jsonData, CodeExecutionEvent.class);
589+
eventConsumer.accept(event);
590+
591+
// Stop streaming if we received a terminal event
592+
if (isTerminalEvent(event)) {
593+
LOG.debug("Received terminal event: {}", event.getEventType());
594+
break;
595+
}
596+
} catch (JsonProcessingException e) {
597+
LOG.warn("Failed to parse SSE event data: {}", jsonData, e);
598+
// Continue processing other events even if one fails to parse
599+
}
600+
}
601+
}
602+
}
603+
}
604+
605+
/**
606+
* Checks if an event is a terminal event (indicates end of execution).
607+
*
608+
* @param event The code execution event to check.
609+
* @return true if the event is terminal, false otherwise.
610+
*/
611+
private boolean isTerminalEvent(CodeExecutionEvent event) {
612+
if (event.getEventType() == null) {
613+
return false;
614+
}
615+
return switch (event.getEventType()) {
616+
case COMPLETED, FAILED, TIMEOUT, CANCELLED -> true;
617+
default -> false;
618+
};
619+
}
461620
}

0 commit comments

Comments
 (0)