Skip to content

Unable to stream chat using OciGenAiStreamingChatModel #378

@ranjkkum

Description

@ranjkkum

I’m working on implementing a method for streaming chat responses. The method runs without errors, but instead of streaming partial responses, it only returns the full response once the process is complete. Could you review my implementation and help me identify what might be preventing the response from streaming as expected?

pom.xml

<dependency>
            <groupId>dev.langchain4j</groupId>
            <artifactId>langchain4j-spring-boot-starter</artifactId>
            <version>1.5.0-beta11</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
            <artifactId>langchain4j-community-oci-genai</artifactId>
            <version>1.5.0-beta11</version>
        </dependency>
        <dependency>
            <groupId>dev.langchain4j</groupId>
            <artifactId>langchain4j-reactor</artifactId>
            <version>1.5.0-beta11</version>
        </dependency>

StreamingAssistant.java

@SystemMessage("You are an assistant for deployment queries. For general questions, use RAG to retrieve relevant information. When invoking tools (e.g., listComputeInstances), do not retrieve or augment with RAG sources; rely solely on the tool's output for accuracy and speed.")
public interface StreamingAssistant {

	@UserMessage("{{it}}")
	TokenStream chat(String text);
}

@Bean
	public OciGenAiStreamingChatModel getStreamingChatModel() {
		log.info("Creating streaming chat model with modelName: {}, compartmentId: {}, region: {}",
				config.getModel().getName(), config.getCompartmentId(), config.getRegion());
		return OciGenAiStreamingChatModel.builder()
				.modelName(config.getModel().getName())
				.compartmentId(config.getCompartmentId())
				.authProvider(authenticationDetailsProvider)
				.region(Region.valueOf(config.getRegion()))
				.build();
	}

private StreamingAssistant getOrCreateStreamingAssistant(String sessionId) {
		return streamingAssistantCache.computeIfAbsent(sessionId, id -> {
			log.debug("Creating new assistant for sessionId: {}", id);
			MessageWindowChatMemory memory = createChatMemory(id);
			AiServices<StreamingAssistant> builder = AiServices.builder(StreamingAssistant.class);
			builder.streamingChatModel(getStreamingChatModel());
			builder.tools(assistantToolProvider.getTools());
			builder.chatMemory(memory);
			return builder.build();
		});
	}

Approach 1 -

public Flux<String> chatStream(AssistantChatRequest chatRequest) throws JsonProcessingException {

		Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

		StreamingAssistant streamingAssistant = getOrCreateStreamingAssistant(chatRequest.getSessionId());
		streamingAssistant.chat(chatRequest.getPrompt())
				.onPartialResponse(sink::tryEmitNext)
				.onCompleteResponse(aiMessageResponse -> sink.tryEmitComplete())
				.onError(sink::tryEmitError)
				.start();
		return sink.asFlux();

	}

Approach 2 -

	@Override
	public Flux<String> chatStream(AssistantChatRequest chatRequest) throws JsonProcessingException {
		return Flux.create(sink -> {
			var model = getStreamingChatModel();
			model.chat(chatRequest.getPrompt(), new StreamingChatResponseHandler() {
				@Override
				public void onPartialResponse(String s) {
					JSONObject jsonObject = new JSONObject();
					try {
						jsonObject.put("message", s);
					} catch (JSONException e) {
						throw new RuntimeException(e);
					}
					sink.next(jsonObject.toString());
				}

				@Override
				public void onCompleteResponse(ChatResponse chatResponse) {
					log.info("onCompleteResponse: {}", chatResponse);
					sink.complete();
				}

				@Override
				public void onError(Throwable throwable) {
					log.error("onError", throwable);
					sink.error(throwable);
				}
			});
		}, FluxSink.OverflowStrategy.BUFFER);
}

AssistantChatController.java

 @PostMapping(value = RESOURCE_IDENTIFIER + "/actions/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @Operation(summary = "Stream chat with Database Tools Chat AI", description = "Stream chat with Database Tools Chat AI")
    public Flux<String> chatStream(@RequestBody AssistantChatRequest chatRequest) throws JsonProcessingException {
        return assistantChatService.chatStream(chatRequest);
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2High priorityP3Medium prioritybugSomething isn't workingtheme: modelIssues/PRs related to model

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions