diff --git a/camel/models/openai_model.py b/camel/models/openai_model.py index 5c24039a1a..1b9751ba88 100644 --- a/camel/models/openai_model.py +++ b/camel/models/openai_model.py @@ -11,9 +11,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= +import copy +import json import os import warnings -from typing import Any, Dict, List, Optional, Type, Union +from typing import ( + Any, + AsyncGenerator, + Dict, + Generator, + List, + Literal, + Optional, + Type, + Union, + cast, +) from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream from openai.lib.streaming.chat import ( @@ -26,6 +39,11 @@ from camel.logger import get_logger from camel.messages import OpenAIMessage from camel.models import BaseModelBackend +from camel.models.openai_responses_adapter import ( + aiter_response_events_to_chat_chunks, + iter_response_events_to_chat_chunks, + response_to_chat_completion, +) from camel.types import ( ChatCompletion, ChatCompletionChunk, @@ -35,6 +53,7 @@ BaseTokenCounter, OpenAITokenCounter, api_keys_required, + get_current_agent_session_id, is_langfuse_available, ) @@ -99,6 +118,9 @@ class OpenAIModel(BaseModelBackend): client instance. If provided, this client will be used instead of creating a new one. The client should implement the AsyncOpenAI client interface. (default: :obj:`None`) + api_mode (Literal["chat_completions", "responses"], optional): + OpenAI API mode to use. Supported values: + `"chat_completions"` (default) and `"responses"`. **kwargs (Any): Additional arguments to pass to the OpenAI client initialization. These can include parameters like 'organization', 'default_headers', 'http_client', etc. @@ -121,6 +143,9 @@ def __init__( max_retries: int = 3, client: Optional[Any] = None, async_client: Optional[Any] = None, + api_mode: Literal["chat_completions", "responses"] = ( + "chat_completions" + ), **kwargs: Any, ) -> None: if model_config_dict is None: @@ -128,6 +153,14 @@ def __init__( api_key = api_key or os.environ.get("OPENAI_API_KEY") url = url or os.environ.get("OPENAI_API_BASE_URL") timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) + if api_mode not in {"chat_completions", "responses"}: + raise ValueError( + "api_mode must be 'chat_completions' or 'responses', " + f"got: {api_mode}" + ) + self._api_mode = api_mode + self._responses_previous_response_id_by_session: Dict[str, str] = {} + self._responses_last_message_count_by_session: Dict[str, int] = {} # Store additional client args for later use self._max_retries = max_retries @@ -309,15 +342,34 @@ def _run( if response_format: if is_streaming: - # Use streaming parse for structured output + if self._api_mode == "responses": + return cast( + Stream[ChatCompletionChunk], + self._request_responses_stream( + messages, response_format, tools + ), + ) return self._request_stream_parse( messages, response_format, tools ) else: - # Use non-streaming parse for structured output + if self._api_mode == "responses": + return self._request_responses( + messages, response_format, tools + ) return self._request_parse(messages, response_format, tools) else: - result = self._request_chat_completion(messages, tools) + result: Union[ChatCompletion, Stream[ChatCompletionChunk]] + if self._api_mode == "responses": + if is_streaming: + result = cast( + Stream[ChatCompletionChunk], + self._request_responses_stream(messages, None, tools), + ) + else: + result = self._request_responses(messages, None, tools) + else: + result = self._request_chat_completion(messages, tools) return result @@ -362,20 +414,401 @@ async def _arun( if response_format: if is_streaming: - # Use streaming parse for structured output + if self._api_mode == "responses": + return cast( + AsyncStream[ChatCompletionChunk], + await self._arequest_responses_stream( + messages, response_format, tools + ), + ) return await self._arequest_stream_parse( messages, response_format, tools ) else: - # Use non-streaming parse for structured output + if self._api_mode == "responses": + return await self._arequest_responses( + messages, response_format, tools + ) return await self._arequest_parse( messages, response_format, tools ) else: - result = await self._arequest_chat_completion(messages, tools) + result: Union[ + ChatCompletion, + AsyncStream[ChatCompletionChunk], + ] + if self._api_mode == "responses": + if is_streaming: + result = cast( + AsyncStream[ChatCompletionChunk], + await self._arequest_responses_stream( + messages, None, tools + ), + ) + else: + result = await self._arequest_responses( + messages, None, tools + ) + else: + result = await self._arequest_chat_completion(messages, tools) return result + def _prepare_responses_request_config( + self, + tools: Optional[List[Dict[str, Any]]] = None, + response_format: Optional[Type[BaseModel]] = None, + stream: bool = False, + ) -> Dict[str, Any]: + request_config = self._prepare_request_config(tools) + request_config = self._sanitize_config(request_config) + + # Translate chat-completions style parameters to responses style. + max_tokens = request_config.pop("max_tokens", None) + if ( + max_tokens is not None + and "max_output_tokens" not in request_config + ): + request_config["max_output_tokens"] = max_tokens + + # `n` is unsupported in responses. Keep backward compatibility. + if request_config.get("n") not in (None, 1): + warnings.warn( + "OpenAI Responses API does not support `n`; " + "ignoring configured value.", + UserWarning, + ) + request_config.pop("n", None) + + request_config.pop("response_format", None) + request_config.pop("stream_options", None) + request_config["stream"] = stream + # previous_response_id chaining requires stored responses. + if request_config.get("store") is False: + warnings.warn( + "Overriding `store=False` to `store=True` because " + "`previous_response_id` chaining is enabled.", + UserWarning, + ) + request_config["store"] = True + + if request_config.get("tools"): + request_config["tools"] = self._normalize_tools_for_responses_api( + request_config["tools"] + ) + + if response_format is not None: + schema = copy.deepcopy(response_format.model_json_schema()) + self._enforce_object_additional_properties_false(schema) + request_config["text"] = { + "format": { + "type": "json_schema", + "name": response_format.__name__, + "schema": schema, + } + } + + return request_config + + @staticmethod + def _normalize_tools_for_responses_api( + tools: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + r"""Convert chat-completions style function tools to Responses style. + + Input: + {"type":"function","function":{"name":"foo",...}} + Output: + {"type":"function","name":"foo",...} + """ + normalized_tools: List[Dict[str, Any]] = [] + for tool in tools: + if ( + isinstance(tool, dict) + and tool.get("type") == "function" + and isinstance(tool.get("function"), dict) + ): + normalized_tools.append( + {"type": "function", **tool["function"]} + ) + else: + normalized_tools.append(tool) + return normalized_tools + + @staticmethod + def _enforce_object_additional_properties_false(schema: Any) -> None: + r"""Recursively enforce strict object schema for Responses API.""" + if isinstance(schema, dict): + if ( + schema.get("type") == "object" + and "additionalProperties" not in schema + ): + schema["additionalProperties"] = False + + for value in schema.values(): + OpenAIModel._enforce_object_additional_properties_false(value) + elif isinstance(schema, list): + for item in schema: + OpenAIModel._enforce_object_additional_properties_false(item) + + def _get_response_chain_session_key(self) -> str: + return get_current_agent_session_id() or "__default__" + + def _prepare_responses_input_and_chain( + self, + messages: List[OpenAIMessage], + ) -> Dict[str, Any]: + session_key = self._get_response_chain_session_key() + previous_response_id = ( + self._responses_previous_response_id_by_session.get(session_key) + ) + last_message_count = self._responses_last_message_count_by_session.get( + session_key, 0 + ) + + # If memory was reset/truncated, reset chain and send full context. + if len(messages) < last_message_count: + previous_response_id = None + last_message_count = 0 + self._responses_previous_response_id_by_session.pop( + session_key, None + ) + + if previous_response_id and last_message_count > 0: + delta_messages = messages[last_message_count:] + input_messages = ( + delta_messages if delta_messages else [messages[-1]] + ) + else: + input_messages = messages + + input_items = self._convert_messages_to_responses_input(input_messages) + + return { + "session_key": session_key, + "previous_response_id": previous_response_id, + "input_messages": input_items, + "message_count": len(messages), + } + + @staticmethod + def _convert_messages_to_responses_input( + messages: List[OpenAIMessage], + ) -> List[Dict[str, Any]]: + r"""Convert chat-completions style messages to Responses input items. + + This specifically rewrites tool-calling history: + - assistant message with `tool_calls` -> one or more `function_call` + items (+ optional assistant message when content exists) + - tool message -> `function_call_output` item + """ + input_items: List[Dict[str, Any]] = [] + for message in messages: + role = message.get("role") + content = message.get("content", "") + + if role == "tool": + input_items.append( + { + "type": "function_call_output", + "call_id": message.get("tool_call_id", "null"), + "output": content + if isinstance(content, str) + else str(content), + } + ) + continue + + if role == "assistant" and message.get("tool_calls"): + if content not in (None, "", []): + input_items.append( + { + "role": "assistant", + "content": content, + } + ) + + tool_calls = message.get("tool_calls", []) + if not isinstance(tool_calls, list): + tool_calls = [tool_calls] + for tool_call in tool_calls: + function_data = ( + tool_call.get("function", {}) + if isinstance(tool_call, dict) + else {} + ) + if not isinstance(function_data, dict): + continue + arguments = function_data.get("arguments", "{}") + if not isinstance(arguments, str): + arguments = json.dumps(arguments, ensure_ascii=False) + input_items.append( + { + "type": "function_call", + "call_id": ( + tool_call.get("id", "null") + if isinstance(tool_call, dict) + else "null" + ), + "name": function_data.get("name", ""), + "arguments": arguments, + } + ) + continue + + input_items.append( + { + "role": role, + "content": content, + } + ) + + return input_items + + def _save_response_chain_state( + self, + session_key: str, + response_id: Optional[str], + message_count: int, + ) -> None: + if response_id: + self._responses_previous_response_id_by_session[session_key] = ( + response_id + ) + self._responses_last_message_count_by_session[session_key] = ( + message_count + ) + + def _request_responses( + self, + messages: List[OpenAIMessage], + response_format: Optional[Type[BaseModel]], + tools: Optional[List[Dict[str, Any]]] = None, + ) -> ChatCompletion: + chain_state = self._prepare_responses_input_and_chain(messages) + request_config = self._prepare_responses_request_config( + tools=tools, response_format=response_format, stream=False + ) + if chain_state["previous_response_id"]: + request_config["previous_response_id"] = chain_state[ + "previous_response_id" + ] + response = self._client.responses.create( + input=chain_state["input_messages"], + model=self.model_type, + **request_config, + ) + self._save_response_chain_state( + session_key=chain_state["session_key"], + response_id=getattr(response, "id", None) + if not isinstance(response, dict) + else response.get("id"), + message_count=chain_state["message_count"], + ) + return response_to_chat_completion( + response=response, + model=str(self.model_type), + response_format=response_format, + ) + + async def _arequest_responses( + self, + messages: List[OpenAIMessage], + response_format: Optional[Type[BaseModel]], + tools: Optional[List[Dict[str, Any]]] = None, + ) -> ChatCompletion: + chain_state = self._prepare_responses_input_and_chain(messages) + request_config = self._prepare_responses_request_config( + tools=tools, response_format=response_format, stream=False + ) + if chain_state["previous_response_id"]: + request_config["previous_response_id"] = chain_state[ + "previous_response_id" + ] + response = await self._async_client.responses.create( + input=chain_state["input_messages"], + model=self.model_type, + **request_config, + ) + self._save_response_chain_state( + session_key=chain_state["session_key"], + response_id=getattr(response, "id", None) + if not isinstance(response, dict) + else response.get("id"), + message_count=chain_state["message_count"], + ) + return response_to_chat_completion( + response=response, + model=str(self.model_type), + response_format=response_format, + ) + + def _request_responses_stream( + self, + messages: List[OpenAIMessage], + response_format: Optional[Type[BaseModel]], + tools: Optional[List[Dict[str, Any]]] = None, + ) -> Generator[ChatCompletionChunk, None, None]: + chain_state = self._prepare_responses_input_and_chain(messages) + request_config = self._prepare_responses_request_config( + tools=tools, response_format=response_format, stream=True + ) + if chain_state["previous_response_id"]: + request_config["previous_response_id"] = chain_state[ + "previous_response_id" + ] + event_stream = self._client.responses.create( + input=chain_state["input_messages"], + model=self.model_type, + **request_config, + ) + + def _on_response_completed(response_id: str) -> None: + self._save_response_chain_state( + session_key=chain_state["session_key"], + response_id=response_id, + message_count=chain_state["message_count"], + ) + + return iter_response_events_to_chat_chunks( + event_stream=event_stream, + model=str(self.model_type), + on_response_completed=_on_response_completed, + ) + + async def _arequest_responses_stream( + self, + messages: List[OpenAIMessage], + response_format: Optional[Type[BaseModel]], + tools: Optional[List[Dict[str, Any]]] = None, + ) -> AsyncGenerator[ChatCompletionChunk, None]: + chain_state = self._prepare_responses_input_and_chain(messages) + request_config = self._prepare_responses_request_config( + tools=tools, response_format=response_format, stream=True + ) + if chain_state["previous_response_id"]: + request_config["previous_response_id"] = chain_state[ + "previous_response_id" + ] + event_stream = await self._async_client.responses.create( + input=chain_state["input_messages"], + model=self.model_type, + **request_config, + ) + + def _on_response_completed(response_id: str) -> None: + self._save_response_chain_state( + session_key=chain_state["session_key"], + response_id=response_id, + message_count=chain_state["message_count"], + ) + + return aiter_response_events_to_chat_chunks( + event_stream=event_stream, + model=str(self.model_type), + on_response_completed=_on_response_completed, + ) + def _request_chat_completion( self, messages: List[OpenAIMessage], diff --git a/camel/models/openai_responses_adapter.py b/camel/models/openai_responses_adapter.py new file mode 100644 index 0000000000..b12e0685c9 --- /dev/null +++ b/camel/models/openai_responses_adapter.py @@ -0,0 +1,348 @@ +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= +import json +import time +from dataclasses import dataclass, field +from typing import ( + Any, + AsyncGenerator, + Callable, + Dict, + Generator, + List, + Optional, + Type, +) + +from pydantic import BaseModel + +from camel.types import ChatCompletion, ChatCompletionChunk + + +@dataclass +class _ResponsesStreamState: + has_tool_call: bool = False + has_finish_reason: bool = False + response_id: str = "" + usage: Optional[Dict[str, int]] = None + tool_idx_map: Dict[int, int] = field(default_factory=dict) + tool_meta_emitted: Dict[int, bool] = field(default_factory=dict) + tool_args_delta_seen: Dict[int, bool] = field(default_factory=dict) + + +def _get(value: Any, key: str, default: Any = None) -> Any: + if isinstance(value, dict): + return value.get(key, default) + return getattr(value, key, default) + + +def _usage_to_openai(usage: Any) -> Optional[Dict[str, int]]: + if not usage: + return None + input_tokens = int(_get(usage, "input_tokens", 0) or 0) + output_tokens = int(_get(usage, "output_tokens", 0) or 0) + total_tokens = int( + _get(usage, "total_tokens", input_tokens + output_tokens) + ) + return { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": total_tokens, + } + + +def _extract_text_from_message_item(item: Any) -> str: + parts = _get(item, "content", []) or [] + text_parts = [] + for part in parts: + if _get(part, "type") == "output_text": + text_parts.append(_get(part, "text", "") or "") + return "".join(text_parts) + + +def _extract_tool_call(item: Any) -> Dict[str, Any]: + return { + "id": _get(item, "call_id", "") or _get(item, "id", ""), + "type": "function", + "function": { + "name": _get(item, "name", ""), + "arguments": _get(item, "arguments", "") or "", + }, + } + + +def _build_chat_completion_chunk( + *, + chunk_id: str, + model: str, + delta: Dict[str, Any], + finish_reason: Optional[str] = None, + usage: Optional[Dict[str, int]] = None, +) -> ChatCompletionChunk: + return ChatCompletionChunk.construct( + id=chunk_id, + choices=[ + { + "index": 0, + "delta": delta, + "finish_reason": finish_reason, + } + ], + created=int(time.time()), + model=model, + object="chat.completion.chunk", + usage=usage, + ) + + +def _process_output_item_event( + *, + event_type: str, + event: Any, + state: _ResponsesStreamState, + model: str, +) -> List[ChatCompletionChunk]: + item = _get(event, "item") + if _get(item, "type") != "function_call": + return [] + + state.has_tool_call = True + out_idx = int(_get(event, "output_index", 0)) + mapped_idx = state.tool_idx_map.setdefault( + out_idx, len(state.tool_idx_map) + ) + + if event_type == "response.output_item.added": + tc = { + "index": mapped_idx, + "id": _get(item, "call_id", "") or _get(item, "id", ""), + "type": "function", + "function": { + "name": _get(item, "name", ""), + "arguments": "", + }, + } + state.tool_meta_emitted[out_idx] = True + chunk_id = _get(item, "id", state.response_id) + return [ + _build_chat_completion_chunk( + chunk_id=chunk_id, + model=model, + delta={"tool_calls": [tc]}, + ) + ] + + if state.tool_args_delta_seen.get(out_idx, False): + return [] + + tc = { + "index": mapped_idx, + "function": {"arguments": _get(item, "arguments", "") or ""}, + } + if not state.tool_meta_emitted.get(out_idx, False): + tc["id"] = _get(item, "call_id", "") or _get(item, "id", "") + tc["type"] = "function" + tc["function"]["name"] = _get(item, "name", "") + chunk_id = _get(item, "id", state.response_id) + return [ + _build_chat_completion_chunk( + chunk_id=chunk_id, + model=model, + delta={"tool_calls": [tc]}, + ) + ] + + +def _process_response_stream_event( + *, + event: Any, + state: _ResponsesStreamState, + model: str, + on_response_completed: Optional[Callable[[str], None]] = None, +) -> List[ChatCompletionChunk]: + event_type = _get(event, "type", "") + + if event_type in ("response.created", "response.in_progress"): + resp = _get(event, "response") + if resp: + state.response_id = _get(resp, "id", state.response_id) + return [] + + if event_type == "response.output_text.delta": + delta = _get(event, "delta", "") or "" + if not delta: + return [] + chunk_id = _get(event, "item_id", state.response_id) + return [ + _build_chat_completion_chunk( + chunk_id=chunk_id, + model=model, + delta={"content": delta}, + ) + ] + + if event_type in ( + "response.output_item.added", + "response.output_item.done", + ): + return _process_output_item_event( + event_type=event_type, + event=event, + state=state, + model=model, + ) + + if event_type in ( + "response.function_call_arguments.delta", + "response.output_item.function_call_arguments.delta", + ): + state.has_tool_call = True + out_idx = int(_get(event, "output_index", 0)) + mapped_idx = state.tool_idx_map.setdefault( + out_idx, len(state.tool_idx_map) + ) + state.tool_args_delta_seen[out_idx] = True + delta = _get(event, "delta", "") or "" + tc = { + "index": mapped_idx, + "function": {"arguments": delta}, + } + chunk_id = _get(event, "item_id", state.response_id) + return [ + _build_chat_completion_chunk( + chunk_id=chunk_id, + model=model, + delta={"tool_calls": [tc]}, + ) + ] + + if event_type == "response.completed": + resp = _get(event, "response") + if resp: + state.response_id = _get(resp, "id", state.response_id) + state.usage = _usage_to_openai(_get(resp, "usage")) + if on_response_completed is not None and state.response_id: + on_response_completed(state.response_id) + finish_reason = "tool_calls" if state.has_tool_call else "stop" + state.has_finish_reason = True + return [ + _build_chat_completion_chunk( + chunk_id=state.response_id, + model=model, + delta={}, + finish_reason=finish_reason, + usage=state.usage, + ) + ] + + return [] + + +def response_to_chat_completion( + response: Any, + model: str, + response_format: Optional[Type[BaseModel]] = None, +) -> ChatCompletion: + output_items = _get(response, "output", []) or [] + content = "" + tool_calls = [] + + for item in output_items: + item_type = _get(item, "type") + if item_type == "message": + content += _extract_text_from_message_item(item) + elif item_type == "function_call": + tool_calls.append(_extract_tool_call(item)) + + finish_reason = "tool_calls" if tool_calls else "stop" + message: Dict[str, Any] = {"role": "assistant", "content": content} + if tool_calls: + message["tool_calls"] = tool_calls + + if response_format is not None and content: + try: + message["parsed"] = response_format.model_validate_json(content) + except Exception: + try: + parsed_json = json.loads(content) + message["parsed"] = response_format.model_validate(parsed_json) + except Exception: + pass + + return ChatCompletion.construct( + id=_get(response, "id", f"chatcmpl-{int(time.time())}"), + choices=[ + { + "index": 0, + "message": message, + "finish_reason": finish_reason, + } + ], + created=int(_get(response, "created_at", time.time())), + model=model, + object="chat.completion", + usage=_usage_to_openai(_get(response, "usage")), + ) + + +def iter_response_events_to_chat_chunks( + event_stream: Any, + model: str, + on_response_completed: Optional[Callable[[str], None]] = None, +) -> Generator[ChatCompletionChunk, None, None]: + state = _ResponsesStreamState() + + for event in event_stream: + yield from _process_response_stream_event( + event=event, + state=state, + model=model, + on_response_completed=on_response_completed, + ) + + # Safety fallback for abnormal stream termination. + if not state.has_finish_reason: + yield _build_chat_completion_chunk( + chunk_id=state.response_id, + model=model, + delta={}, + finish_reason="stop", + usage=state.usage, + ) + + +async def aiter_response_events_to_chat_chunks( + event_stream: Any, + model: str, + on_response_completed: Optional[Callable[[str], None]] = None, +) -> AsyncGenerator[ChatCompletionChunk, None]: + state = _ResponsesStreamState() + + async for event in event_stream: + for chunk in _process_response_stream_event( + event=event, + state=state, + model=model, + on_response_completed=on_response_completed, + ): + yield chunk + + if not state.has_finish_reason: + yield _build_chat_completion_chunk( + chunk_id=state.response_id, + model=model, + delta={}, + finish_reason="stop", + usage=state.usage, + ) diff --git a/examples/models/openai_responses_api_example.py b/examples/models/openai_responses_api_example.py new file mode 100644 index 0000000000..fcb20df647 --- /dev/null +++ b/examples/models/openai_responses_api_example.py @@ -0,0 +1,192 @@ +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= + +""" +OpenAI Responses API simple examples in CAMEL. + +This example demonstrates: +1. Basic non-streaming chat +2. Tool calling +3. Streaming tool calling +4. Structured output +5. Streaming structured output + +Required environment variable: +export OPENAI_API_KEY="your_openai_api_key" +""" + +from pydantic import BaseModel, Field + +from camel.agents import ChatAgent +from camel.models import ModelFactory +from camel.toolkits import FunctionTool +from camel.types import ModelPlatformType, ModelType + + +def get_weather(city: str) -> str: + """A tiny tool function for demo purposes.""" + fake_weather = { + "beijing": "sunny, 28C", + "new york": "cloudy, 19C", + "san francisco": "foggy, 16C", + } + return fake_weather.get(city.lower(), f"unknown weather for {city}") + + +class TravelAdvice(BaseModel): + city: str = Field(description="City name") + clothing: str = Field(description="Recommended clothing") + reason: str = Field(description="Brief reasoning") + + +def create_responses_model( + stream: bool = False, + model_type: ModelType = ModelType.GPT_4_1_MINI, + temperature: float = 0.2, + tools: list | None = None, +): + model_config = { + "temperature": temperature, + "stream": stream, + } + if tools: + model_config["tools"] = tools + if stream: + model_config["stream_options"] = {"include_usage": True} + + return ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=model_type, + model_config_dict=model_config, + api_mode="responses", + ) + + +def example_basic_non_stream() -> None: + print("\n=== Example 1: Basic Non-Streaming ===") + model = create_responses_model(stream=False) + agent = ChatAgent( + system_message="You are a concise assistant.", + model=model, + ) + resp = agent.step("Say hello to CAMEL in one sentence.") + print(resp.msgs[0].content) + print("usage:", resp.info.get("usage")) + + +def example_tool_calling() -> None: + print("\n=== Example 2: Tool Calling ===") + weather_tool = FunctionTool(get_weather) + model = create_responses_model( + stream=False, + model_type=ModelType.GPT_4_1, + temperature=0.0, + tools=[weather_tool.get_openai_tool_schema()], + ) + agent = ChatAgent( + system_message="You are a weather assistant. Use tools when needed.", + model=model, + tools=[weather_tool], + ) + resp = agent.step("How is the weather in Beijing today?") + print(resp.msgs[0].content) + print("tool_calls:", resp.info.get("tool_calls")) + + +def example_streaming_tool_calling() -> None: + print("\n=== Example 3: Streaming Tool Calling ===") + weather_tool = FunctionTool(get_weather) + model = create_responses_model( + stream=True, + model_type=ModelType.GPT_4_1, + temperature=0.0, + tools=[weather_tool.get_openai_tool_schema()], + ) + agent = ChatAgent( + system_message="You are a weather assistant. Use tools when needed.", + model=model, + tools=[weather_tool], + stream_accumulate=False, + ) + stream_resp = agent.step("How is the weather in Beijing today?") + + full_text = "" + for chunk in stream_resp: + msg = chunk.msgs[0] + if msg.content: + full_text += msg.content + print(msg.content, end="", flush=True) + print() + print("final:", full_text) + print("tool_calls:", stream_resp.info.get("tool_calls")) + print("usage:", stream_resp.info.get("usage")) + + +def example_structured_output() -> None: + print("\n=== Example 4: Structured Output ===") + model = create_responses_model(stream=False) + agent = ChatAgent( + system_message="You are a travel assistant.", + model=model, + ) + resp = agent.step( + "I am going to New York in autumn. Give advice as JSON.", + response_format=TravelAdvice, + ) + print("raw:", resp.msgs[0].content) + print("parsed:", resp.msgs[0].parsed) + + +def example_streaming_structured_output() -> None: + print("\n=== Example 5: Streaming Structured Output ===") + model = create_responses_model( + stream=True, + model_type=ModelType.GPT_4_1_MINI, + temperature=0.0, + ) + agent = ChatAgent( + system_message=( + "You are a travel assistant and must return strict JSON matching " + "the schema." + ), + model=model, + stream_accumulate=False, + ) + + stream_resp = agent.step( + "I am going to New York in autumn. Give travel advice in JSON.", + response_format=TravelAdvice, + ) + full_text = "" + for chunk in stream_resp: + msg = chunk.msgs[0] + if msg.content: + full_text += msg.content + print(msg.content, end="", flush=True) + print() + print("raw:", full_text) + try: + parsed = TravelAdvice.model_validate_json(full_text) + print("parsed:", parsed) + except Exception as exc: + print("parsed error:", exc) + print("usage:", stream_resp.info.get("usage")) + + +if __name__ == "__main__": + example_basic_non_stream() + example_tool_calling() + example_streaming_tool_calling() + example_structured_output() + example_streaming_structured_output() diff --git a/test/agents/test_openai_responses_api_e2e.py b/test/agents/test_openai_responses_api_e2e.py new file mode 100644 index 0000000000..cdc18771e5 --- /dev/null +++ b/test/agents/test_openai_responses_api_e2e.py @@ -0,0 +1,196 @@ +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= + +import os +from typing import Any, Dict, List, Optional + +import pytest +from pydantic import BaseModel, Field + +from camel.agents import ChatAgent +from camel.models import ModelFactory +from camel.toolkits import FunctionTool +from camel.types import ModelPlatformType, ModelType + +pytestmark = pytest.mark.model_backend + +HAS_OPENAI_API_KEY = bool(os.getenv("OPENAI_API_KEY")) + + +def get_weather(city: str) -> str: + fake_weather = { + "beijing": "sunny, 28C", + "new york": "cloudy, 19C", + "san francisco": "foggy, 16C", + } + return fake_weather.get(city.lower(), f"unknown weather for {city}") + + +class TravelAdvice(BaseModel): + city: str = Field(description="City name") + clothing: str = Field(description="Recommended clothing") + reason: str = Field(description="Brief reasoning") + + +class WeatherTravelAdvice(BaseModel): + city: str = Field(description="City name") + weather: str = Field(description="Weather summary from tool result") + clothing: str = Field(description="Recommended clothing") + reason: str = Field(description="Brief reasoning") + + +def _create_responses_model( + *, + stream: bool = False, + temperature: float = 0.0, + tools: Optional[List[Dict[str, Any]]] = None, +): + model_config: Dict[str, Any] = { + "temperature": temperature, + "stream": stream, + } + if stream: + model_config["stream_options"] = {"include_usage": True} + if tools: + model_config["tools"] = tools + + return ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4_1_MINI, + model_config_dict=model_config, + api_mode="responses", + ) + + +@pytest.mark.skipif( + not HAS_OPENAI_API_KEY, + reason="OPENAI_API_KEY is required for Responses API E2E tests", +) +def test_responses_tool_calling_non_stream(): + weather_tool = FunctionTool(get_weather) + model = _create_responses_model( + stream=False, + temperature=0.0, + tools=[weather_tool.get_openai_tool_schema()], + ) + agent = ChatAgent( + system_message="You are a weather assistant. Must use tool.", + model=model, + tools=[weather_tool], + ) + resp = agent.step("How is the weather in Beijing today?") + assert resp.info.get("tool_calls") + assert resp.msgs and resp.msgs[0].content + + +@pytest.mark.skipif( + not HAS_OPENAI_API_KEY, + reason="OPENAI_API_KEY is required for Responses API E2E tests", +) +def test_responses_structured_output_non_stream(): + model = _create_responses_model(stream=False, temperature=0.0) + agent = ChatAgent( + system_message="Return strict JSON matching schema.", + model=model, + ) + resp = agent.step( + "I am going to New York in autumn. Give travel advice as JSON.", + response_format=TravelAdvice, + ) + parsed = resp.msgs[0].parsed + assert parsed is not None + assert isinstance(parsed, TravelAdvice) + + +@pytest.mark.skipif( + not HAS_OPENAI_API_KEY, + reason="OPENAI_API_KEY is required for Responses API E2E tests", +) +def test_responses_tool_calling_and_structured_output_non_stream(): + weather_tool = FunctionTool(get_weather) + model = _create_responses_model( + stream=False, + temperature=0.0, + tools=[weather_tool.get_openai_tool_schema()], + ) + agent = ChatAgent( + system_message=( + "You are a travel assistant. Must call weather tool and return " + "strict JSON matching schema." + ), + model=model, + tools=[weather_tool], + ) + resp = agent.step( + "I am visiting Beijing tomorrow. Use tool for weather and return " + "structured travel advice JSON.", + response_format=WeatherTravelAdvice, + ) + parsed = resp.msgs[0].parsed + assert parsed is not None + assert isinstance(parsed, WeatherTravelAdvice) + assert resp.info.get("tool_calls") + + +@pytest.mark.skipif( + not HAS_OPENAI_API_KEY, + reason="OPENAI_API_KEY is required for Responses API E2E tests", +) +def test_responses_tool_calling_stream(): + weather_tool = FunctionTool(get_weather) + model = _create_responses_model( + stream=True, + temperature=0.0, + tools=[weather_tool.get_openai_tool_schema()], + ) + agent = ChatAgent( + system_message="You are a weather assistant. Must use tool.", + model=model, + tools=[weather_tool], + stream_accumulate=False, + ) + + stream_resp = agent.step("How is the weather in Beijing today?") + final_text = "" + for chunk in stream_resp: + if chunk.msgs and chunk.msgs[0].content: + final_text += chunk.msgs[0].content + + assert stream_resp.info.get("tool_calls") + assert isinstance(final_text, str) + + +@pytest.mark.skipif( + not HAS_OPENAI_API_KEY, + reason="OPENAI_API_KEY is required for Responses API E2E tests", +) +def test_responses_structured_output_stream(): + model = _create_responses_model(stream=True, temperature=0.0) + agent = ChatAgent( + system_message="Return strict JSON matching schema.", + model=model, + stream_accumulate=False, + ) + + stream_resp = agent.step( + "I am going to New York in autumn. Give travel advice as JSON.", + response_format=TravelAdvice, + ) + full_text = "" + for chunk in stream_resp: + if chunk.msgs and chunk.msgs[0].content: + full_text += chunk.msgs[0].content + + parsed = TravelAdvice.model_validate_json(full_text) + assert isinstance(parsed, TravelAdvice) diff --git a/test/models/test_openai_model.py b/test/models/test_openai_model.py index 1603db1113..cbd8e3bb4c 100644 --- a/test/models/test_openai_model.py +++ b/test/models/test_openai_model.py @@ -15,6 +15,7 @@ from unittest.mock import MagicMock, patch import pytest +from pydantic import BaseModel from camel.configs import ChatGPTConfig from camel.models import OpenAIModel @@ -115,3 +116,417 @@ def test_prompt_cache_key_not_passed_when_none(): # Verify that prompt_cache_key is NOT in the call kwargs call_kwargs = mock_client.chat.completions.create.call_args[1] assert "prompt_cache_key" not in call_kwargs + + +def test_openai_model_invalid_api_mode(): + with pytest.raises(ValueError, match="api_mode must be"): + OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="invalid_mode", + ) + + +def test_responses_mode_non_stream_response_mapping(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + responses_payload = { + "id": "resp_1", + "created_at": 1741294021, + "usage": { + "input_tokens": 10, + "output_tokens": 6, + "total_tokens": 16, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": "Hello from responses", + } + ], + } + ], + } + mock_client.responses.create.return_value = responses_payload + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="responses", + ) + messages = [{"role": "user", "content": "Hello"}] + response = model.run(messages) + + assert response.choices[0].message.content == "Hello from responses" + assert response.choices[0].finish_reason == "stop" + assert response.usage.prompt_tokens == 10 + assert response.usage.completion_tokens == 6 + assert response.usage.total_tokens == 16 + assert mock_client.responses.create.called + + +def test_responses_mode_stream_mapping(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + stream_events = [ + { + "type": "response.created", + "response": {"id": "resp_stream"}, + }, + { + "type": "response.output_text.delta", + "item_id": "msg_1", + "delta": "Hi", + }, + { + "type": "response.output_text.delta", + "item_id": "msg_1", + "delta": "!", + }, + { + "type": "response.completed", + "response": { + "id": "resp_stream", + "usage": { + "input_tokens": 3, + "output_tokens": 2, + "total_tokens": 5, + }, + }, + }, + ] + + mock_client.responses.create.return_value = stream_events + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + model_config_dict={"stream": True}, + api_mode="responses", + ) + messages = [{"role": "user", "content": "Hello"}] + chunks = list(model.run(messages)) + + assert len(chunks) >= 3 + assert chunks[0].choices[0].delta.content == "Hi" + assert chunks[1].choices[0].delta.content == "!" + assert chunks[-1].choices[0].finish_reason == "stop" + assert chunks[-1].usage.total_tokens == 5 + + +def test_responses_stream_tool_call_arguments_not_duplicated(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + stream_events = [ + { + "type": "response.created", + "response": {"id": "resp_stream"}, + }, + { + "type": "response.output_item.added", + "output_index": 0, + "item": { + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "get_weather", + "arguments": '{"city":"Beijing"}', + }, + }, + { + "type": "response.function_call_arguments.delta", + "output_index": 0, + "item_id": "fc_1", + "delta": '{"city":"Beijing"}', + }, + { + "type": "response.output_item.done", + "output_index": 0, + "item": { + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "get_weather", + "arguments": '{"city":"Beijing"}', + }, + }, + { + "type": "response.completed", + "response": { + "id": "resp_stream", + "usage": { + "input_tokens": 3, + "output_tokens": 2, + "total_tokens": 5, + }, + }, + }, + ] + mock_client.responses.create.return_value = stream_events + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + model_config_dict={"stream": True}, + api_mode="responses", + ) + chunks = list(model.run([{"role": "user", "content": "weather?"}])) + + arg_fragments = [] + for chunk in chunks: + if not chunk.choices: + continue + delta = chunk.choices[0].delta + if not delta or not getattr(delta, "tool_calls", None): + continue + for tc in delta.tool_calls: + if tc.function and tc.function.arguments: + arg_fragments.append(tc.function.arguments) + + assert "".join(arg_fragments) == '{"city":"Beijing"}' + assert chunks[-1].choices[0].finish_reason == "tool_calls" + + +def test_responses_mode_uses_previous_response_id_and_delta_input(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + first_response = { + "id": "resp_first", + "created_at": 1741294021, + "usage": { + "input_tokens": 10, + "output_tokens": 6, + "total_tokens": 16, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "First turn"}], + } + ], + } + second_response = { + "id": "resp_second", + "created_at": 1741294022, + "usage": { + "input_tokens": 11, + "output_tokens": 5, + "total_tokens": 16, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": "Second turn"} + ], + } + ], + } + mock_client.responses.create.side_effect = [ + first_response, + second_response, + ] + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="responses", + ) + + # First call sends full context + model.run( + [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + ] + ) + first_call_kwargs = mock_client.responses.create.call_args_list[ + 0 + ].kwargs + assert "previous_response_id" not in first_call_kwargs + assert len(first_call_kwargs["input"]) == 2 + + # Second call sends delta context + previous_response_id + model.run( + [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "First turn"}, + {"role": "user", "content": "Continue"}, + ] + ) + second_call_kwargs = mock_client.responses.create.call_args_list[ + 1 + ].kwargs + assert second_call_kwargs["previous_response_id"] == "resp_first" + assert len(second_call_kwargs["input"]) == 2 + assert second_call_kwargs["input"][-1]["content"] == "Continue" + + +def test_responses_mode_normalizes_function_tools_schema(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + mock_client.responses.create.return_value = { + "id": "resp_1", + "created_at": 1741294021, + "usage": { + "input_tokens": 1, + "output_tokens": 1, + "total_tokens": 2, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "ok"}], + } + ], + } + + chat_style_tool = { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get weather by city", + "parameters": { + "type": "object", + "properties": {"city": {"type": "string"}}, + "required": ["city"], + }, + }, + } + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="responses", + model_config_dict={"tools": [chat_style_tool]}, + ) + model.run([{"role": "user", "content": "weather?"}]) + + call_kwargs = mock_client.responses.create.call_args.kwargs + assert "tools" in call_kwargs + assert call_kwargs["tools"][0]["type"] == "function" + assert call_kwargs["tools"][0]["name"] == "get_weather" + assert "function" not in call_kwargs["tools"][0] + + +def test_responses_mode_structured_output_enforces_additional_properties(): + class Destination(BaseModel): + city: str + + class TravelAdvice(BaseModel): + destination: Destination + clothing: str + + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + mock_client.responses.create.return_value = { + "id": "resp_1", + "created_at": 1741294021, + "usage": { + "input_tokens": 1, + "output_tokens": 1, + "total_tokens": 2, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": '{"destination":{"city":"NYC"},' + '"clothing":"coat"}', + } + ], + } + ], + } + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="responses", + ) + model.run( + [{"role": "user", "content": "travel advice"}], + response_format=TravelAdvice, + ) + + schema = mock_client.responses.create.call_args.kwargs["text"][ + "format" + ]["schema"] + assert schema["additionalProperties"] is False + assert schema["$defs"]["Destination"]["additionalProperties"] is False + + +def test_responses_mode_converts_tool_call_history_to_input_items(): + with patch("camel.models.openai_model.OpenAI") as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + mock_client.responses.create.return_value = { + "id": "resp_1", + "created_at": 1741294021, + "usage": { + "input_tokens": 1, + "output_tokens": 1, + "total_tokens": 2, + }, + "output": [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "ok"}], + } + ], + } + + model = OpenAIModel( + model_type=ModelType.GPT_4O_MINI, + api_mode="responses", + ) + model.run( + [ + {"role": "user", "content": "How is weather?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_1", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"city":"Beijing"}', + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_1", + "content": "sunny, 28C", + }, + ] + ) + + input_items = mock_client.responses.create.call_args.kwargs["input"] + assert input_items[1]["type"] == "function_call" + assert input_items[1]["call_id"] == "call_1" + assert input_items[1]["name"] == "get_weather" + assert input_items[2]["type"] == "function_call_output" + assert input_items[2]["call_id"] == "call_1" + assert "tool_calls" not in input_items[1]