Skip to content
257 changes: 137 additions & 120 deletions src/praisonai-agents/praisonaiagents/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def get_response(
)
if delta.content:
live.update(display_generating(response_text, current_time))

else:
# Non-verbose streaming
for chunk in litellm.completion(
Expand All @@ -759,9 +760,12 @@ def get_response(
):
if chunk and chunk.choices and chunk.choices[0].delta:
delta = chunk.choices[0].delta
response_text, tool_calls = self._process_stream_delta(
delta, response_text, tool_calls, formatted_tools
)
if delta.content:
response_text += delta.content

# Capture tool calls from streaming chunks if provider supports it
if formatted_tools and self._supports_streaming_tools():
tool_calls = self._process_tool_calls_from_stream(delta, tool_calls)

response_text = response_text.strip()

Expand Down Expand Up @@ -802,20 +806,7 @@ def get_response(
# Handle tool calls - Sequential tool calling logic
if tool_calls and execute_tool_fn:
# Convert tool_calls to a serializable format for all providers
serializable_tool_calls = []
for tc in tool_calls:
if isinstance(tc, dict):
serializable_tool_calls.append(tc) # Already a dict
else:
# Convert object to dict
serializable_tool_calls.append({
"id": tc.id,
"type": getattr(tc, 'type', "function"),
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
})
serializable_tool_calls = self._serialize_tool_calls(tool_calls)
messages.append({
"role": "assistant",
"content": response_text,
Expand All @@ -826,20 +817,8 @@ def get_response(
tool_results = [] # Store all tool results
for tool_call in tool_calls:
# Handle both object and dict access patterns
if isinstance(tool_call, dict):
is_ollama = self._is_ollama_provider()
function_name, arguments, tool_call_id = self._parse_tool_call_arguments(tool_call, is_ollama)
else:
# Handle object-style tool calls
try:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
tool_call_id = tool_call.id
except (json.JSONDecodeError, AttributeError) as e:
logging.error(f"Error parsing object-style tool call: {e}")
function_name = "unknown_function"
arguments = {}
tool_call_id = f"tool_{id(tool_call)}"
is_ollama = self._is_ollama_provider()
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama)

logging.debug(f"[TOOL_EXEC_DEBUG] About to execute tool {function_name} with args: {arguments}")
tool_result = execute_tool_fn(function_name, arguments)
Expand Down Expand Up @@ -1367,6 +1346,7 @@ async def get_response_async(
if delta.content:
print("\033[K", end="\r")
print(f"Generating... {time.time() - start_time:.1f}s", end="\r")

else:
# Non-verbose streaming
async for chunk in await litellm.acompletion(
Expand All @@ -1380,9 +1360,12 @@ async def get_response_async(
):
if chunk and chunk.choices and chunk.choices[0].delta:
delta = chunk.choices[0].delta
response_text, tool_calls = self._process_stream_delta(
delta, response_text, tool_calls, formatted_tools
)
if delta.content:
response_text += delta.content

# Capture tool calls from streaming chunks if provider supports it
if formatted_tools and self._supports_streaming_tools():
tool_calls = self._process_tool_calls_from_stream(delta, tool_calls)

response_text = response_text.strip()

Expand Down Expand Up @@ -1417,20 +1400,7 @@ async def get_response_async(

if tool_calls:
# Convert tool_calls to a serializable format for all providers
serializable_tool_calls = []
for tc in tool_calls:
if isinstance(tc, dict):
serializable_tool_calls.append(tc) # Already a dict
else:
# Convert object to dict
serializable_tool_calls.append({
"id": tc.id,
"type": getattr(tc, 'type', "function"),
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
})
serializable_tool_calls = self._serialize_tool_calls(tool_calls)
messages.append({
"role": "assistant",
"content": response_text,
Expand All @@ -1440,20 +1410,8 @@ async def get_response_async(
tool_results = [] # Store all tool results
for tool_call in tool_calls:
# Handle both object and dict access patterns
if isinstance(tool_call, dict):
is_ollama = self._is_ollama_provider()
function_name, arguments, tool_call_id = self._parse_tool_call_arguments(tool_call, is_ollama)
else:
# Handle object-style tool calls
try:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
tool_call_id = tool_call.id
except (json.JSONDecodeError, AttributeError) as e:
logging.error(f"Error parsing object-style tool call: {e}")
function_name = "unknown_function"
arguments = {}
tool_call_id = f"tool_{id(tool_call)}"
is_ollama = self._is_ollama_provider()
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama)

tool_result = await execute_tool_fn(function_name, arguments)
tool_results.append(tool_result) # Store the result
Expand Down Expand Up @@ -1899,6 +1857,90 @@ def _build_completion_params(self, **override_params) -> Dict[str, Any]:

return params

def _prepare_response_logging(self, temperature: float, stream: bool, verbose: bool, markdown: bool, **kwargs) -> Optional[Dict[str, Any]]:
"""Prepare debug logging information for response methods"""
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
debug_info = {
"model": self.model,
"timeout": self.timeout,
"temperature": temperature,
"top_p": self.top_p,
"n": self.n,
"max_tokens": self.max_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
"stream": stream,
"verbose": verbose,
"markdown": markdown,
"kwargs": str(kwargs)
}
return debug_info
return None

def _process_streaming_chunk(self, chunk) -> Optional[str]:
"""Extract content from a streaming chunk"""
if chunk and chunk.choices and chunk.choices[0].delta.content:
return chunk.choices[0].delta.content
return None

def _process_tool_calls_from_stream(self, delta, tool_calls: List[Dict]) -> List[Dict]:
"""Process tool calls from streaming delta chunks.

This handles the accumulation of tool call data from streaming chunks,
building up the complete tool call information incrementally.
"""
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tc in delta.tool_calls:
if tc.index >= len(tool_calls):
tool_calls.append({
"id": tc.id,
"type": "function",
"function": {"name": "", "arguments": ""}
})
if tc.function.name:
tool_calls[tc.index]["function"]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
return tool_calls
Comment on lines +1892 to +1904
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function modifies the tool_calls list in-place, which is a side effect. Consider creating a copy of the list at the beginning of the function to avoid mutating the original argument.

        processed_tool_calls = tool_calls[:]
        if hasattr(delta, 'tool_calls') and delta.tool_calls:
            for tc in delta.tool_calls:
                if tc.index >= len(processed_tool_calls):
                    processed_tool_calls.append({
                        "id": tc.id,
                        "type": "function",
                        "function": {"name": "", "arguments": ""}
                    })
                if tc.function.name:
                    processed_tool_calls[tc.index]["function"]["name"] = tc.function.name
                if tc.function.arguments:
                    processed_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
        return processed_tool_calls


def _serialize_tool_calls(self, tool_calls) -> List[Dict]:
"""Convert tool calls to a serializable format for all providers."""
serializable_tool_calls = []
for tc in tool_calls:
if isinstance(tc, dict):
serializable_tool_calls.append(tc) # Already a dict
else:
# Convert object to dict
serializable_tool_calls.append({
"id": tc.id,
"type": getattr(tc, 'type', "function"),
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
})
return serializable_tool_calls
Comment on lines +1908 to +1922
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop can be simplified into a more concise list comprehension for improved readability.

        return [
            tc if isinstance(tc, dict) else {
                "id": tc.id,
                "type": getattr(tc, 'type', "function"),
                "function": {
                    "name": tc.function.name,
                    "arguments": tc.function.arguments
                }
            }
            for tc in tool_calls
        ]


def _extract_tool_call_info(self, tool_call, is_ollama: bool = False) -> tuple:
"""Extract function name, arguments, and tool_call_id from a tool call.

Handles both dict and object formats for tool calls.
"""
if isinstance(tool_call, dict):
return self._parse_tool_call_arguments(tool_call, is_ollama)
else:
# Handle object-style tool calls
try:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
tool_call_id = tool_call.id
except (json.JSONDecodeError, AttributeError) as e:
logging.error(f"Error parsing object-style tool call: {e}")
function_name = "unknown_function"
arguments = {}
tool_call_id = f"tool_{id(tool_call)}"
return function_name, arguments, tool_call_id

# Response without tool calls
def response(
self,
Expand Down Expand Up @@ -1946,42 +1988,29 @@ def response(
)

# Get response from LiteLLM
response_text = ""
completion_params = self._build_completion_params(
messages=messages,
temperature=temperature,
stream=stream,
**kwargs
)

if stream:
response_text = ""
if verbose:
with Live(display_generating("", start_time), console=console or self.console, refresh_per_second=4) as live:
for chunk in litellm.completion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=True,
**kwargs
)
):
if chunk and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
for chunk in litellm.completion(**completion_params):
content = self._process_streaming_chunk(chunk)
if content:
response_text += content
live.update(display_generating(response_text, start_time))
else:
for chunk in litellm.completion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=True,
**kwargs
)
):
if chunk and chunk.choices and chunk.choices[0].delta.content:
response_text += chunk.choices[0].delta.content
for chunk in litellm.completion(**completion_params):
content = self._process_streaming_chunk(chunk)
if content:
response_text += content
else:
response = litellm.completion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=False,
**kwargs
)
)
response = litellm.completion(**completion_params)
response_text = response.choices[0].message.content.strip()

if verbose:
Expand Down Expand Up @@ -2022,6 +2051,7 @@ async def aresponse(

logger.debug("Using asynchronous response function")


# Log all self values when in debug mode
self._log_llm_config(
'Async response method',
Expand All @@ -2046,42 +2076,29 @@ async def aresponse(
)

# Get response from LiteLLM
response_text = ""
completion_params = self._build_completion_params(
messages=messages,
temperature=temperature,
stream=stream,
**kwargs
)

if stream:
response_text = ""
if verbose:
with Live(display_generating("", start_time), console=console or self.console, refresh_per_second=4) as live:
async for chunk in await litellm.acompletion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=True,
**kwargs
)
):
if chunk and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
async for chunk in await litellm.acompletion(**completion_params):
content = self._process_streaming_chunk(chunk)
if content:
response_text += content
live.update(display_generating(response_text, start_time))
else:
async for chunk in await litellm.acompletion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=True,
**kwargs
)
):
if chunk and chunk.choices and chunk.choices[0].delta.content:
response_text += chunk.choices[0].delta.content
async for chunk in await litellm.acompletion(**completion_params):
content = self._process_streaming_chunk(chunk)
if content:
response_text += content
else:
response = await litellm.acompletion(
**self._build_completion_params(
messages=messages,
temperature=temperature,
stream=False,
**kwargs
)
)
response = await litellm.acompletion(**completion_params)
response_text = response.choices[0].message.content.strip()

if verbose:
Expand Down
Loading