Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,23 @@ public Mono<Void> notifyClients(String method, Object params) {
.then();
}

// FIXME: This javadoc makes claims about using isClosing flag but it's not
// actually
// doing that.

/**
* Sends a JSON-RPC notification to a specific client session through its SSE
* connection.
*
* <p>
* The method:
* <ul>
* <li>Looks up the session by the given session ID</li>
* <li>Returns an empty Mono if the session is not found</li>
* <li>Sends the notification to the specific session if found</li>
* </ul>
* @param sessionId The ID of the target client session
* @param method The JSON-RPC method to send to the client
* @param params The method parameters to send to the client
* @return A Mono that completes when the notification has been sent, or empty if the
* session is not found
*/
@Override
public Mono<Void> notifyClient(String sessionId, String method, Object params) {
return Mono.defer(() -> {
Expand Down Expand Up @@ -333,6 +346,7 @@ private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
.requireNonNull(this.sessionFactory, "sessionFactory must be set before handling connections")
.create(sessionTransport);
String sessionId = session.getId();
sessionTransport.setSessionId(sessionId);

logger.debug("Created new SSE connection for session: {}", sessionId);
this.sessions.put(sessionId, session);
Expand Down Expand Up @@ -442,10 +456,16 @@ private class WebFluxMcpSessionTransport implements McpServerTransport {

private final FluxSink<ServerSentEvent<?>> sink;

private @Nullable String sessionId;

WebFluxMcpSessionTransport(FluxSink<ServerSentEvent<?>> sink) {
this.sink = sink;
}

void setSessionId(String sessionId) {
this.sessionId = sessionId;
}

@Override
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
return Mono.fromSupplier(() -> {
Expand All @@ -462,8 +482,8 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
.build();
this.sink.next(event);
}).doOnError(e -> {
// TODO log with sessionid
Throwable exception = Exceptions.unwrap(e);
logger.error("Error sending message to session {}: {}", this.sessionId, exception.getMessage());
this.sink.error(exception);
}).then();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,28 @@ public ChatClientResponse adviseCall(ChatClientRequest chatClientRequest, CallAd
@SuppressWarnings("null")
private SchemaValidation validateOutputSchema(ChatClientResponse chatClientResponse) {

if (chatClientResponse.chatResponse() == null || chatClientResponse.chatResponse().getResult() == null
|| chatClientResponse.chatResponse().getResult().getOutput() == null
|| chatClientResponse.chatResponse().getResult().getOutput().getText() == null) {
if (chatClientResponse.chatResponse() == null || chatClientResponse.chatResponse().getResults() == null
|| chatClientResponse.chatResponse().getResults().isEmpty()) {

logger.warn("ChatClientResponse is missing required json output for validation.");
return SchemaValidation.failed("Missing required json output for validation.");
}

// TODO: should we consider validation for multiple results?
String json = chatClientResponse.chatResponse().getResult().getOutput().getText();
for (var result : chatClientResponse.chatResponse().getResults()) {
if (result == null || result.getOutput() == null || result.getOutput().getText() == null) {
logger.warn("ChatClientResponse result is missing required json output for validation.");
return SchemaValidation.failed("Missing required json output for validation.");
}

logger.debug("Validating JSON output against schema. Attempts left: {}", this.maxRepeatAttempts);

logger.debug("Validating JSON output against schema. Attempts left: {}", this.maxRepeatAttempts);
SchemaValidation validation = validateJsonText(result.getOutput().getText());
if (!validation.success()) {
return validation;
}
}

return validateJsonText(json);
return SchemaValidation.passed();
}

private SchemaValidation validateJsonText(String json) {
Expand Down