Skip to content

[FEATURE] Hopefully the model build will support custom httpClient #323

@nn200433

Description

@nn200433

Is your feature request related to a problem? Please describe.

I hope the community models can customize httpClientBuilder like the official ollama so that they can better customize http requests to support interrupted streaming responses.

希望社区的模型都能跟官方的 ollama 一样自定义 httpClientBuilder ,这样可以更好的自定义 http 请求,以支持中断流式响应。

Describe the solution you'd like

Currently, when sending a question to a large language model (like ChatGPT), I create an SseSession Manager for each request. I store the session ID and request ID in the thread context. Then, in the onPartialResponse callback, I check the interruption status based on these IDs to decide whether to continue sending the streaming response to the frontend.

While this method works for controlling interruption, it has several drawbacks:

  1. Memory contamination issue: Langchain4j automatically manages conversation memory, and we don’t have full control over when it saves context. If the stream is interrupted halfway, some incomplete or irrelevant content might still be recorded.
  2. Not an elegant approach: Even if we stop streaming to the frontend, Langchain4j still receives the full response from the model provider. This isn’t a clean or proper interruption.

Later, I discovered that Langchain4j allows customization of the HTTP client. Internally, it uses the method:

public abstract <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, BodyHandler<T> responseBodyHandler)

This gave me a cleaner solution:

  • Before making the request, I bind the request ID with the corresponding CompletableFuture task;
  • If the user chooses to interrupt the request, I simply cancel the async task. This stops the entire response stream cleanly — avoiding memory pollution and achieving true interruption behavior.
目前我在调用大模型(如 ChatGPT 等)时,每次提问都会创建一个 SseSession 管理器,并将会话 ID 和请求 ID 存储在线程上下文中。随后,在收到流式回复(onPartialResponse 方法)时,我会通过这些 ID 去判断是否中断输出,以决定是否继续将内容推送给前端。

这个做法虽然可以实现中断控制,但存在几个问题:

会话记忆污染:Langchain4j 的记忆机制是自动管理的,我们无法完全控制何时保存信息。如果在中断时仍接收了部分回复内容,可能会将无效或不完整的数据记录进记忆中。

实现不优雅:虽然我们在前端停止了输出,但底层 Langchain4j 实际上还是在接收来自模型提供商的完整响应流。这不是真正的“中断”。

后来我发现 Langchain4j 支持自定义 HTTP 客户端,而底层是通过 sendAsync 方法异步处理流式响应的。于是我找到了一个更优雅的方式:

在发起请求前,将请求 ID 与 CompletableFuture 任务进行绑定;

如果用户需要中断请求,直接取消这个异步任务即可,整个响应就会彻底终止,既不会污染记忆,也更符合流式处理的预期行为。

@Slf4j
public class InterruptibleJdkHttpClient implements HttpClient {

    private final java.net.http.HttpClient delegate;
    private final Duration readTimeout;
    
    /** 存储活跃的HTTP请求,key是requestId,value是CompletableFuture */
    private static final Map<String, CompletableFuture<?>> ACTIVE_REQUESTS = new ConcurrentHashMap<>();

    public InterruptibleJdkHttpClient(InterruptibleJdkHttpClientBuilder builder) {
        java.net.http.HttpClient.Builder httpClientBuilder =
                getOrDefault(builder.httpClientBuilder(), java.net.http.HttpClient::newBuilder);
        if (builder.connectTimeout() != null) {
            httpClientBuilder.connectTimeout(builder.connectTimeout());
        }
        this.delegate = httpClientBuilder.build();
        this.readTimeout = builder.readTimeout();
    }

    public static InterruptibleJdkHttpClientBuilder builder() {
        return new InterruptibleJdkHttpClientBuilder();
    }

    @Override
    public SuccessfulHttpResponse execute(HttpRequest request) throws HttpException {
        try {
            java.net.http.HttpRequest jdkRequest = toJdkRequest(request);

            java.net.http.HttpResponse<String> jdkResponse = delegate.send(jdkRequest, BodyHandlers.ofString());

            if (!isSuccessful(jdkResponse)) {
                throw new HttpException(jdkResponse.statusCode(), jdkResponse.body());
            }

            return fromJdkResponse(jdkResponse, jdkResponse.body());
        } catch (HttpTimeoutException e) {
            throw new TimeoutException(e);
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
        // 从ThreadLocal获取请求信息
        String requestId = ThreadLocalUtil.get(RagConstants.LOCAL_THREAD_REQUEST_ID);
        String conversationId = ThreadLocalUtil.get(RagConstants.LOCAL_THREAD_CONVERSATION_ID);
        
        log.debug("开始流式HTTP请求: requestId={}, conversationId={}", requestId, conversationId);
        
        java.net.http.HttpRequest jdkRequest = toJdkRequest(request);

        CompletableFuture<Void> future = delegate.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
                .thenAccept(jdkResponse -> {
                    // 在处理响应前再次检查是否被中断
                    if (isRequestInterrupted(requestId, conversationId)) {
                        log.info("HTTP响应处理前检测到中断: requestId={}", requestId);
                        return;
                    }

                    if (!isSuccessful(jdkResponse)) {
                        HttpException exception = new HttpException(jdkResponse.statusCode(), readBody(jdkResponse));
                        ignoringExceptions(() -> listener.onError(exception));
                        return;
                    }

                    SuccessfulHttpResponse response = fromJdkResponse(jdkResponse, null);
                    ignoringExceptions(() -> listener.onOpen(response));

                    try (InputStream inputStream = jdkResponse.body()) {
                        // 使用包装的listener进行流解析,支持中断检查
                        parseWithInterruptCheck(inputStream, parser, listener, requestId, conversationId);
                        ignoringExceptions(listener::onClose);
                    } catch (IOException e) {
                        if (!isRequestInterrupted(requestId, conversationId)) {
                            throw new RuntimeException(e);
                        }
                        log.info("HTTP流解析被中断: requestId={}", requestId);
                    }
                })
                .exceptionally(throwable -> {
                    if (throwable instanceof CancellationException) {
                        log.info("HTTP请求被取消: requestId={}", requestId);
                    } else if (throwable.getCause() instanceof HttpTimeoutException) {
                        ignoringExceptions(() -> listener.onError(new TimeoutException(throwable)));
                    } else {
                        ignoringExceptions(() -> listener.onError(throwable));
                    }
                    return null;
                })
                .whenComplete((result, throwable) -> {
                    // 请求完成后清理
                    ACTIVE_REQUESTS.remove(requestId);
                    log.debug("HTTP请求完成,清理缓存: requestId={}", requestId);
                });
        
        // 如果有requestId,则缓存这个future用于后续中断
        if (StrUtil.isNotBlank(requestId)) {
            ACTIVE_REQUESTS.put(requestId, future);
            log.debug("缓存HTTP请求用于中断控制: requestId={}", requestId);
        }
    }

    private java.net.http.HttpRequest toJdkRequest(HttpRequest request) {
        java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder()
                .uri(URI.create(request.url()));

        request.headers().forEach((name, values) -> {
            if (values != null) {
                values.forEach(value -> builder.header(name, value));
            }
        });

        BodyPublisher bodyPublisher;
        if (request.body() != null) {
            bodyPublisher = BodyPublishers.ofString(request.body());
        } else {
            bodyPublisher = BodyPublishers.noBody();
        }
        builder.method(request.method().name(), bodyPublisher);

        if (readTimeout != null) {
            builder.timeout(readTimeout);
        }

        return builder.build();
    }

    private static SuccessfulHttpResponse fromJdkResponse(java.net.http.HttpResponse<?> response, String body) {
        return SuccessfulHttpResponse.builder()
                .statusCode(response.statusCode())
                .headers(response.headers().map())
                .body(body)
                .build();
    }

    private static boolean isSuccessful(java.net.http.HttpResponse<?> response) {
        int statusCode = response.statusCode();
        return statusCode >= 200 && statusCode < 300;
    }

    private static String readBody(java.net.http.HttpResponse<InputStream> response) {
        try (InputStream inputStream = response.body();
             BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
            return reader.lines().collect(joining(System.lineSeparator()));
        } catch (IOException e) {
            return "Cannot read error response body: " + e.getMessage();
        }
    }
    
    /**
     * 带中断检查的流解析
     */
    private void parseWithInterruptCheck(InputStream inputStream, ServerSentEventParser parser,
                                         ServerSentEventListener listener, String requestId, String conversationId){
        // 包装listener,在每次事件处理前检查中断状态
        ServerSentEventListener wrappedListener = new ServerSentEventListener() {
            @Override
            public void onOpen(SuccessfulHttpResponse response) {
                if (!isRequestInterrupted(requestId, conversationId)) {
                    listener.onOpen(response);
                }
            }

            @Override
            public void onEvent(ServerSentEvent event) {
                if (!isRequestInterrupted(requestId, conversationId)) {
                    listener.onEvent(event);
                } else {
                    log.debug("检测到中断,直接取消HTTP请求: requestId={}", requestId);
                    // 直接取消HTTP请求,更加优雅和高效
                    cancelCurrentRequest(requestId);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                if (!isRequestInterrupted(requestId, conversationId)) {
                    listener.onError(throwable);
                }
            }

            @Override
            public void onClose() {
                if (!isRequestInterrupted(requestId, conversationId)) {
                    listener.onClose();
                }
            }
        };
        
        parser.parse(inputStream, wrappedListener);
    }
    
    /**
     * 检查请求是否被中断
     */
    private boolean isRequestInterrupted(String requestId, String conversationId) {
        if (StrUtil.isBlank(requestId) || StrUtil.isBlank(conversationId)) {
            return false;
        }
        
        // 检查会话状态管理器
        boolean shouldContinue = ConversationStateManager.shouldContinueProcessing(conversationId, requestId);
        if (!shouldContinue) {
            log.debug("会话状态管理器指示应该中断: requestId={}, conversationId={}", requestId, conversationId);
            return true;
        }
        
        // 检查ThreadLocal中断标记
        Boolean isInterrupted = ThreadLocalUtil.get(RagConstants.LOCAL_THREAD_INTERRUPT_FLAG);
        if (Boolean.TRUE.equals(isInterrupted)) {
            log.debug("ThreadLocal中断标记为true: requestId={}", requestId);
            return true;
        }
        
        return false;
    }
    
    /**
     * 取消当前HTTP请求
     */
    private void cancelCurrentRequest(String requestId) {
        if (StrUtil.isBlank(requestId)) {
            return;
        }
        
        CompletableFuture<?> future = ACTIVE_REQUESTS.get(requestId);
        if (future != null && !future.isDone()) {
            boolean cancelled = future.cancel(true);
            if (cancelled) {
                ACTIVE_REQUESTS.remove(requestId);
                log.info("成功取消HTTP请求: requestId={}", requestId);
            } else {
                log.debug("请求已完成,无法取消: requestId={}", requestId);
            }
        } else {
            log.debug("未找到活跃的HTTP请求: requestId={}", requestId);
        }
    }

}

Additional context

Functions in development

Image

System Environment

JDK: 17
Spring Boot: 3.4.3
LangChain4j: 1.2.0
OS: Win 11

Metadata

Metadata

Assignees

No one assigned

    Labels

    P3Medium priorityenhancementNew feature or requesttheme: modelIssues/PRs related to model

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions