diff --git a/capabilities-services-client/pom.xml b/capabilities-services-client/pom.xml index 7f1643b..39f7412 100644 --- a/capabilities-services-client/pom.xml +++ b/capabilities-services-client/pom.xml @@ -20,6 +20,11 @@ ${jakarta.ws.rs-api.version} + + com.fasterxml.jackson.core + jackson-databind + + ai.wanaku.sdk capabilities-common diff --git a/capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java b/capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java index cbf8ddb..1a36df0 100644 --- a/capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java +++ b/capabilities-services-client/src/main/java/ai/wanaku/capabilities/sdk/services/ServicesHttpClient.java @@ -9,6 +9,9 @@ import ai.wanaku.capabilities.sdk.api.types.ResourceReference; import ai.wanaku.capabilities.sdk.api.types.ToolReference; import ai.wanaku.capabilities.sdk.api.types.WanakuResponse; +import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionEvent; +import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionRequest; +import ai.wanaku.capabilities.sdk.api.types.execution.CodeExecutionResponse; import ai.wanaku.capabilities.sdk.api.types.io.ResourcePayload; import ai.wanaku.capabilities.sdk.api.types.io.ToolPayload; import ai.wanaku.capabilities.sdk.common.config.ServiceConfig; @@ -18,8 +21,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.function.Consumer; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -458,4 +466,155 @@ public void removeDataStore(String id) { public void removeDataStoresByName(String name) { executeDelete("/api/v1/data-store/remove?name=" + name); } + + // ==================== Code Execution Engine API Methods ==================== + + /** + * Submits code for execution to the Wanaku Code Execution Engine. + *

+ * This method submits a code execution request to the specified engine type and language. + * The response contains a task ID and stream URL that can be used to monitor the execution + * progress via Server-Sent Events (SSE). + *

+ * Example usage: + *

{@code
+     * CodeExecutionRequest request = new CodeExecutionRequest("System.out.println(\"Hello\");");
+     * WanakuResponse response = client.executeCode("jvm", "java", request);
+     * String taskId = response.getData().taskId();
+     * String streamUrl = response.getData().streamUrl();
+     * }
+ * + * @param engineType The type of execution engine (e.g., "jvm", "interpreted"). + * @param language The programming language (e.g., "java", "groovy", "xml"). + * @param request The code execution request containing the code and execution parameters. + * @return The response containing the task ID and stream URL wrapped in WanakuResponse. + * @throws WanakuException If an error occurs during the request. + * @since 1.0.0 + */ + public WanakuResponse executeCode( + String engineType, String language, CodeExecutionRequest request) { + // Validate the request before sending + request.validate(); + + String path = String.format("/api/v2/code-execution-engine/%s/%s", engineType, language); + return executePost(path, request, new TypeReference<>() {}); + } + + /** + * Streams code execution events from the SSE endpoint. + *

+ * This method connects to the Server-Sent Events (SSE) stream endpoint for a specific + * code execution task and consumes events as they arrive. The consumer callback is + * invoked for each event received from the stream. + *

+ * The stream will continue until the execution completes (COMPLETED, FAILED, TIMEOUT, + * or CANCELLED event) or an error occurs. + *

+ * Example usage: + *

{@code
+     * client.streamCodeExecutionEvents("jvm", "java", taskId, event -> {
+     *     switch (event.getEventType()) {
+     *         case STARTED -> System.out.println("Execution started");
+     *         case OUTPUT -> System.out.print(event.getOutput());
+     *         case ERROR -> System.err.print(event.getError());
+     *         case COMPLETED -> System.out.println("Exit code: " + event.getExitCode());
+     *         case FAILED -> System.err.println("Execution failed: " + event.getMessage());
+     *     }
+     * });
+     * }
+ * + * @param engineType The type of execution engine (e.g., "jvm", "interpreted"). + * @param language The programming language (e.g., "java", "groovy", "xml"). + * @param taskId The UUID of the execution task. + * @param eventConsumer The consumer callback to handle each event. + * @throws WanakuException If an error occurs during streaming. + * @since 1.0.0 + */ + public void streamCodeExecutionEvents( + String engineType, String language, String taskId, Consumer eventConsumer) { + String path = String.format("/api/v2/code-execution-engine/%s/%s/%s", engineType, language, taskId); + URI uri = URI.create(this.baseUrl + path); + + HttpRequest request = HttpRequest.newBuilder() + .uri(uri) + .header("Accept", "text/event-stream") + .header("Authorization", serviceAuthenticator.toHeaderValue()) + .GET() + .build(); + + try { + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream()); + + if (response.statusCode() >= 200 && response.statusCode() < 300) { + parseSSEStream(response.body(), eventConsumer); + } else { + throw new WanakuWebException( + "Failed to connect to SSE stream: HTTP " + response.statusCode(), + response.statusCode()); + } + } catch (IOException e) { + throw new WanakuException("I/O error while streaming events", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WanakuException("Event streaming interrupted", e); + } + } + + /** + * Parses an SSE stream and invokes the consumer for each event. + * + * @param inputStream The input stream containing SSE data. + * @param eventConsumer The consumer callback to handle each event. + * @throws IOException If an I/O error occurs while reading the stream. + */ + private void parseSSEStream(InputStream inputStream, Consumer eventConsumer) + throws IOException { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + String line; + StringBuilder dataBuilder = new StringBuilder(); + + while ((line = reader.readLine()) != null) { + // SSE format: lines starting with "data:" contain the JSON payload + if (line.startsWith("data:")) { + String data = line.substring(5).trim(); // Remove "data:" prefix + dataBuilder.append(data); + } else if (line.isEmpty() && dataBuilder.length() > 0) { + // Empty line signals end of an event + String jsonData = dataBuilder.toString(); + dataBuilder.setLength(0); // Clear for next event + + try { + CodeExecutionEvent event = objectMapper.readValue(jsonData, CodeExecutionEvent.class); + eventConsumer.accept(event); + + // Stop streaming if we received a terminal event + if (isTerminalEvent(event)) { + LOG.debug("Received terminal event: {}", event.getEventType()); + break; + } + } catch (JsonProcessingException e) { + LOG.warn("Failed to parse SSE event data: {}", jsonData, e); + // Continue processing other events even if one fails to parse + } + } + } + } + } + + /** + * Checks if an event is a terminal event (indicates end of execution). + * + * @param event The code execution event to check. + * @return true if the event is terminal, false otherwise. + */ + private boolean isTerminalEvent(CodeExecutionEvent event) { + if (event.getEventType() == null) { + return false; + } + return switch (event.getEventType()) { + case COMPLETED, FAILED, TIMEOUT, CANCELLED -> true; + default -> false; + }; + } }