bella-openai4j 库完整支持 OpenAI Response API,包括流式(SSE)和非流式两种模式。
<dependency>
<groupId>top.bella</groupId>
<artifactId>openai-service</artifactId>
<version>${bella-openai.version}</version>
</dependency>获取最新版本号:
- 访问 Maven 中央仓库:https://repo1.maven.org/maven2/top/bella/openai-service/
- 选择最新的版本目录即可获取当前可用的最新版本号
import com.theokanning.openai.service.OpenAiService;
// 从环境变量读取 API Key
OpenAiService service = new OpenAiService();
// 或者直接指定 API Key
OpenAiService service = new OpenAiService("your-api-key");
// 自定义超时时间
OpenAiService service = new OpenAiService(Duration.ofSeconds(60));import com.theokanning.openai.response.*;
// 创建请求
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Hello, how are you?"))
.build();
// 发送请求并获取完整响应
Response response = service.createResponse(request);
// 获取响应内容
System.out.println("Response ID: " + response.getId());
System.out.println("Status: " + response.getStatus());
System.out.println("Output: " + response.getOutput());CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("What is 2+2?"))
.instructions("You are a helpful math teacher. Be concise.")
.temperature(0.7)
.maxOutputTokens(100)
.build();
Response response = service.createResponse(request);// 第一轮对话
CreateResponseRequest request1 = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("My name is Alice"))
.build();
Response response1 = service.createResponse(request1);
// 第二轮对话 - 引用前一个 response
String previousId = response1.getId();
CreateResponseRequest request2 = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("What is my name?"))
.previousResponseId(previousId)
.build();
Response response2 = service.createResponse(request2);// 第一轮对话
CreateResponseRequest request1 = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("My name is Alice"))
.build();
Response response1 = service.createResponse(request1);
// 第二轮对话 - 引用前一个 response
String conversationId = response1.getConversation().getStringValue();
CreateResponseRequest request2 = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("What is my name?"))
.conversation(ConversationValue.of(conversationId))
.build();
Response response2 = service.createResponse(request2);String responseId = "resp_xxxxx";
Response response = service.getResponse(responseId);CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Hello"))
.metadata(Map.of(
"user_id", "user123",
"session", "session456"
))
.build();
Response response = service.createResponse(request);import io.reactivex.Flowable;
import com.theokanning.openai.service.response_stream.ResponseSSE;
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Write a poem about coding"))
.stream(true) // 开启流式模式
.build();
// 获取流式响应
Flowable<ResponseSSE> stream = service.createResponseStream(request);
// 订阅并处理事件
stream.subscribe(
sse -> {
System.out.println("Event type: " + sse.getType());
BaseStreamEvent event = sse.getEvent();
// 处理事件
},
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Stream completed")
);ResponseStreamManager 提供了更高级的流式处理能力,包括事件分发、文本累积等。
import com.theokanning.openai.service.response_stream.*;
import com.theokanning.openai.response.stream.*;
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Explain quantum computing"))
.stream(true)
.build();
// 创建事件处理器
ResponseEventHandler handler = new ResponseEventHandler() {
@Override
public void onResponseCreated(ResponseCreatedEvent event) {
System.out.println("Response created");
}
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
// 实时输出文本增量
System.out.print(event.getDelta());
}
@Override
public void onResponseCompleted(ResponseCompletedEvent event) {
System.out.println("\n\nResponse completed!");
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
};
// 启动异步流管理器
ResponseStreamManager manager = ResponseStreamManager.start(
service.createResponseStream(request),
handler
);
// 等待完成
manager.waitForCompletion();
// 获取累积的文本
String fullText = manager.getAccumulatedText().orElse("");
System.out.println("Final text: " + fullText);// 同步模式会阻塞直到流处理完成
ResponseStreamManager manager = ResponseStreamManager.syncStart(
service.createResponseStream(request),
handler
);
// 执行到这里时,流已经完全处理完毕
System.out.println("Stream completed synchronously");
String fullText = manager.getAccumulatedText().orElse("");如果只需要获取文本输出:
StringBuilder output = new StringBuilder();
ResponseStreamManager.start(
service.createResponseStream(request),
new ResponseEventHandler() {
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
output.append(event.getDelta());
}
}
).waitForCompletion();
System.out.println("Complete output: " + output.toString());import com.theokanning.openai.response.tool.definition.*;
// 定义工具
ToolDefinition weatherTool = ToolDefinition.builder()
.type("function")
.function(FunctionDefinition.builder()
.name("get_weather")
.description("Get the current weather")
.parameters(/* ... */)
.build())
.build();
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("What's the weather in Tokyo?"))
.tools(List.of(weatherTool))
.build();
Response response = service.createResponse(request);CreateResponseRequest request = CreateResponseRequest.builder()
.model("o1-preview")
.input(InputValue.of("Solve this complex problem..."))
.reasoning(CreateResponseRequest.ReasoningConfig.builder()
.effort("high")
.summary("detailed")
.build())
.build();
// 流式处理推理输出
ResponseStreamManager.start(
service.createResponseStream(request),
new ResponseEventHandler() {
@Override
public void onReasoningTextDelta(ReasoningTextDeltaEvent event) {
System.out.print("[Reasoning] " + event.getDelta());
}
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
System.out.print("[Output] " + event.getDelta());
}
}
).waitForCompletion();import com.theokanning.openai.completion.chat.ChatResponseFormat;
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Generate a JSON user profile"))
.text(CreateResponseRequest.TextConfig.builder()
.format(ChatResponseFormat.JSON_OBJECT)
.verbosity("detailed")
.build())
.build();
Response response = service.createResponse(request);import com.theokanning.openai.assistants.run.TruncationStrategy;
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Very long input..."))
.truncation("auto") // 或 "disabled"
.maxOutputTokens(1000)
.build();Response API 支持丰富的流式事件类型。
| 事件类型 | 描述 | Handler 方法 |
|---|---|---|
response.created |
Response 创建 | onResponseCreated() |
response.in_progress |
Response 处理中 | onResponseInProgress() |
response.completed |
Response 完成 | onResponseCompleted() |
response.failed |
Response 失败 | onResponseFailed() |
response.output_item.added |
输出项添加 | onOutputItemAdded() |
response.output_text.delta |
文本增量 | onOutputTextDelta() |
response.output_text.done |
文本完成 | onOutputTextDone() |
response.function_call_arguments.delta |
函数参数增量 | onFunctionCallArgumentsDelta() |
response.reasoning_text.delta |
推理文本增量 | onReasoningTextDelta() |
ResponseEventHandler handler = new ResponseEventHandler() {
@Override
public void onResponseCreated(ResponseCreatedEvent event) {
System.out.println("=== Response Created ===");
}
@Override
public void onOutputItemAdded(OutputItemAddedEvent event) {
System.out.println("New output item added");
}
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
System.out.print(event.getDelta());
}
@Override
public void onFunctionCallArgumentsDelta(FunctionCallArgumentsDeltaEvent event) {
System.out.println("Function args: " + event.getDelta());
}
@Override
public void onReasoningTextDelta(ReasoningTextDeltaEvent event) {
System.out.println("[Reasoning] " + event.getDelta());
}
@Override
public void onFileSearchSearching(FileSearchSearchingEvent event) {
System.out.println("Searching files...");
}
@Override
public void onWebSearchSearching(WebSearchSearchingEvent event) {
System.out.println("Searching web...");
}
@Override
public void onCodeInterpreterInterpreting(CodeInterpreterInterpretingEvent event) {
System.out.println("Executing code...");
}
@Override
public void onResponseCompleted(ResponseCompletedEvent event) {
System.out.println("\n=== Response Completed ===");
}
@Override
public void onResponseFailed(ResponseFailedEvent event) {
System.err.println("Response failed!");
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
};OpenAiService service = new OpenAiService();
CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Tell me a joke"))
.build();
Response response = service.createResponse(request);
System.out.println(response.getOutputText());CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Write a short story"))
.stream(true)
.build();
ResponseStreamManager.start(
service.createResponseStream(request),
new ResponseEventHandler() {
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
System.out.print(event.getDelta());
System.out.flush();
try {
Thread.sleep(50); // 模拟打字机效果
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
).waitForCompletion();String conversationId = "conv-" + UUID.randomUUID();
String previousResponseId = null;
// 对话循环
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("You: ");
String userInput = scanner.nextLine();
if ("exit".equalsIgnoreCase(userInput)) {
break;
}
CreateResponseRequest.CreateResponseRequestBuilder builder =
CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of(userInput))
.conversation(ConversationValue.of(conversationId))
.stream(true);
if (previousResponseId != null) {
builder.previousResponseId(previousResponseId);
}
System.out.print("AI: ");
StringBuilder response = new StringBuilder();
ResponseStreamManager manager = ResponseStreamManager.start(
service.createResponseStream(builder.build()),
new ResponseEventHandler() {
@Override
public void onOutputTextDelta(OutputTextDeltaEvent event) {
String delta = event.getDelta();
System.out.print(delta);
response.append(delta);
}
@Override
public void onResponseCreated(ResponseCreatedEvent event) {
previousResponseId = event.getResponse().getId();
}
}
);
manager.waitForCompletion();
System.out.println("\n");
}CreateResponseRequest request = CreateResponseRequest.builder()
.model("gpt-4o-mini")
.input(InputValue.of("Hello"))
.stream(true)
.build();
try {
ResponseStreamManager manager = ResponseStreamManager.start(
service.createResponseStream(request),
new ResponseEventHandler() {
@Override
public void onError(Throwable error) {
if (error instanceof OpenAiHttpException) {
OpenAiHttpException httpError = (OpenAiHttpException) error;
System.err.println("API Error: " + httpError.getMessage());
System.err.println("Status: " + httpError.statusCode);
} else {
System.err.println("Error: " + error.getMessage());
}
}
}
);
manager.waitForCompletion();
} catch (Exception e) {
System.err.println("Failed to start stream: " + e.getMessage());
}-
流式结束标记: Response API 不使用
[DONE]标记,而是通过response.completed、response.failed或response.incomplete事件表示结束。 -
超时设置: 对于长时间运行的请求,建议设置较长的超时时间:
OpenAiService service = new OpenAiService(Duration.ofMinutes(5));
-
资源管理:
ResponseStreamManager提供shutDown()方法用于提前终止流:ResponseStreamManager manager = ResponseStreamManager.start(stream, handler); // ... 某些条件下提前终止 manager.shutDown();
-
线程安全:
ResponseStreamManager内部使用同步集合,可以安全地在多线程环境中使用。 -
错误重试: 对于网络错误,建议实现重试逻辑:
int maxRetries = 3; for (int i = 0; i < maxRetries; i++) { try { Response response = service.createResponse(request); break; } catch (OpenAiHttpException e) { if (i == maxRetries - 1) throw e; Thread.sleep(1000 * (i + 1)); } }