Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915
self.stream_options is not None
and self.stream_options["include_usage"] is True
):
return model_response
return self.strip_role_from_delta(model_response)
return
## CHECK FOR TOOL USE

Expand Down
14 changes: 9 additions & 5 deletions litellm/proxy/management_helpers/audit_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ def _build_audit_log_payload(
if request_data.updated_at is not None:
updated_at = request_data.updated_at.isoformat()

table_name_str: str = request_data.table_name.value if isinstance(request_data.table_name, LitellmTableNames) else str(request_data.table_name)
table_name_str: str = (
request_data.table_name.value
if isinstance(request_data.table_name, LitellmTableNames)
else str(request_data.table_name)
)

return StandardAuditLogPayload(
id=request_data.id,
Expand Down Expand Up @@ -89,7 +93,9 @@ async def _dispatch_audit_log_to_callbacks(

for callback in litellm.audit_log_callbacks:
try:
resolved: Optional[CustomLogger] = callback if isinstance(callback, CustomLogger) else None
resolved: Optional[CustomLogger] = (
callback if isinstance(callback, CustomLogger) else None
)
if isinstance(callback, str):
resolved = _resolve_audit_log_callback(callback)
if resolved is None:
Expand Down Expand Up @@ -138,9 +144,7 @@ async def create_object_audit_log(
return

_changed_by = (
litellm_changed_by
or user_api_key_dict.user_id
or litellm_proxy_admin_name
litellm_changed_by or user_api_key_dict.user_id or litellm_proxy_admin_name
)

await create_audit_log_for_update(
Expand Down
208 changes: 186 additions & 22 deletions tests/test_litellm/litellm_core_utils/test_streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,9 @@ async def test_vertex_streaming_bad_request_not_midstream(logging_obj: Logging):
from litellm.llms.vertex_ai.common_utils import VertexAIError

async def _raise_bad_request(**kwargs):
raise VertexAIError(status_code=400, message="invalid maxOutputTokens", headers=None)
raise VertexAIError(
status_code=400, message="invalid maxOutputTokens", headers=None
)

response = CustomStreamWrapper(
completion_stream=None,
Expand All @@ -788,7 +790,9 @@ async def _raise_bad_request(**kwargs):


@pytest.mark.asyncio
async def test_vertex_streaming_rate_limit_triggers_midstream_fallback(logging_obj: Logging):
async def test_vertex_streaming_rate_limit_triggers_midstream_fallback(
logging_obj: Logging,
):
"""Ensure Vertex 429 rate-limit errors raise MidStreamFallbackError, not RateLimitError.

Regression test for https://github.com/BerriAI/litellm/issues/20870
Expand All @@ -797,7 +801,9 @@ async def test_vertex_streaming_rate_limit_triggers_midstream_fallback(logging_o
from litellm.llms.vertex_ai.common_utils import VertexAIError

async def _raise_rate_limit(**kwargs):
raise VertexAIError(status_code=429, message="Resource exhausted.", headers=None)
raise VertexAIError(
status_code=429, message="Resource exhausted.", headers=None
)

response = CustomStreamWrapper(
completion_stream=None,
Expand Down Expand Up @@ -825,7 +831,9 @@ def test_sync_streaming_rate_limit_triggers_midstream_fallback(logging_obj: Logg
from litellm.llms.vertex_ai.common_utils import VertexAIError

def _raise_rate_limit(**kwargs):
raise VertexAIError(status_code=429, message="Resource exhausted.", headers=None)
raise VertexAIError(
status_code=429, message="Resource exhausted.", headers=None
)

response = CustomStreamWrapper(
completion_stream=None,
Expand All @@ -850,7 +858,9 @@ def test_sync_streaming_bad_request_not_midstream(logging_obj: Logging):
from litellm.llms.vertex_ai.common_utils import VertexAIError

def _raise_bad_request(**kwargs):
raise VertexAIError(status_code=400, message="invalid maxOutputTokens", headers=None)
raise VertexAIError(
status_code=400, message="invalid maxOutputTokens", headers=None
)

response = CustomStreamWrapper(
completion_stream=None,
Expand Down Expand Up @@ -1363,6 +1373,7 @@ def _build_chunks(pattern: list[str], N: int) -> list[ModelResponseStream]:
chunks.append(_make_chunk(p))
return chunks


_REPETITION_TEST_CASES = [
# Basic cases
pytest.param(
Expand Down Expand Up @@ -1419,7 +1430,14 @@ def _build_chunks(pattern: list[str], N: int) -> list[ModelResponseStream]:
id="last_chunk_different_no_raise",
),
pytest.param(
["same"] * (litellm.REPEATED_STREAMING_CHUNK_LIMIT // 2 + 1) + ["different_mid"] + ["same"] * (litellm.REPEATED_STREAMING_CHUNK_LIMIT - litellm.REPEATED_STREAMING_CHUNK_LIMIT // 2 + 1),
["same"] * (litellm.REPEATED_STREAMING_CHUNK_LIMIT // 2 + 1)
+ ["different_mid"]
+ ["same"]
* (
litellm.REPEATED_STREAMING_CHUNK_LIMIT
- litellm.REPEATED_STREAMING_CHUNK_LIMIT // 2
+ 1
),
False,
id="middle_chunk_different_no_raise",
),
Expand All @@ -1429,7 +1447,9 @@ def _build_chunks(pattern: list[str], N: int) -> list[ModelResponseStream]:
id="last_two_different_no_raise",
),
pytest.param(
["diff"] * litellm.REPEATED_STREAMING_CHUNK_LIMIT + ["same"] * litellm.REPEATED_STREAMING_CHUNK_LIMIT + ["diff"],
["diff"] * litellm.REPEATED_STREAMING_CHUNK_LIMIT
+ ["same"] * litellm.REPEATED_STREAMING_CHUNK_LIMIT
+ ["diff"],
True,
id="in_between_same_and_diff_raise",
),
Expand All @@ -1455,6 +1475,8 @@ def test_raise_on_model_repetition(
for chunk in chunks:
wrapper.chunks.append(chunk)
wrapper.raise_on_model_repetition()


def test_usage_chunk_after_finish_reason_updates_hidden_params(logging_obj):
"""
Test that provider-reported usage from a post-finish_reason chunk
Expand Down Expand Up @@ -1536,12 +1558,13 @@ def test_usage_chunk_after_finish_reason_updates_hidden_params(logging_obj):
last_chunk = collected[-1]
hidden_usage = last_chunk._hidden_params.get("usage")
assert hidden_usage is not None, "Expected usage in _hidden_params"
assert hidden_usage.prompt_tokens == 20, (
f"Expected prompt_tokens=20 from provider, got {hidden_usage.prompt_tokens}"
)
assert hidden_usage.completion_tokens == 135, (
f"Expected completion_tokens=135 from provider, got {hidden_usage.completion_tokens}"
)
assert (
hidden_usage.prompt_tokens == 20
), f"Expected prompt_tokens=20 from provider, got {hidden_usage.prompt_tokens}"
assert (
hidden_usage.completion_tokens == 135
), f"Expected completion_tokens=135 from provider, got {hidden_usage.completion_tokens}"


@pytest.mark.asyncio
async def test_custom_stream_wrapper_aclose():
Expand Down Expand Up @@ -1615,9 +1638,9 @@ def test_content_not_dropped_when_finish_reason_already_set(

result = initialized_custom_stream_wrapper.chunk_creator(chunk=content_chunk)

assert result is not None, (
"chunk_creator() returned None — content was dropped (issue #22098)"
)
assert (
result is not None
), "chunk_creator() returned None — content was dropped (issue #22098)"
assert result.choices[0].delta.content == "world!"


Expand Down Expand Up @@ -1669,14 +1692,14 @@ def test_tool_use_not_dropped_when_finish_reason_already_set(

result = initialized_custom_stream_wrapper.chunk_creator(chunk=tool_chunk)

assert result is not None, (
"chunk_creator() returned None — tool_use data was dropped"
)
assert (
result is not None
), "chunk_creator() returned None — tool_use data was dropped"

tool_calls = result.choices[0].delta.tool_calls
assert tool_calls is not None and len(tool_calls) > 0, (
"tool_calls should contain at least one tool call"
)
assert (
tool_calls is not None and len(tool_calls) > 0
), "tool_calls should contain at least one tool call"
assert tool_calls[0].id == "call_1"
assert tool_calls[0].function.name == "get_weather"

Expand Down Expand Up @@ -1826,3 +1849,144 @@ def __next__(self):
pass # expected clean termination
except RuntimeError as e:
pytest.fail(f"PEP 479 regression: StopIteration leaked as RuntimeError: {e}")


# Azure streaming chunks that reproduce the issue from #24221:
# When stream_options.include_usage=True, Azure sends an initial chunk with
# prompt_filter_results and no choices. This caused strip_role_from_delta to
# be skipped, so no chunk ever received role='assistant'.
azure_chunks_with_prompt_filter = [
# Chunk 1: prompt_filter_results, no choices
ModelResponseStream(
id="chatcmpl-abc123",
created=1742056047,
model=None,
object="chat.completion.chunk",
system_fingerprint=None,
choices=[],
usage=None,
),
# Chunk 2: first content chunk with role='assistant'
ModelResponseStream(
id="chatcmpl-abc123",
created=1742056047,
model=None,
object="chat.completion.chunk",
system_fingerprint=None,
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(
content="",
role="assistant",
),
logprobs=None,
)
],
usage=None,
),
# Chunk 3: content
ModelResponseStream(
id="chatcmpl-abc123",
created=1742056047,
model=None,
object="chat.completion.chunk",
system_fingerprint=None,
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(content="Hello"),
logprobs=None,
)
],
usage=None,
),
# Chunk 4: finish_reason
ModelResponseStream(
id="chatcmpl-abc123",
created=1742056047,
model=None,
object="chat.completion.chunk",
system_fingerprint=None,
choices=[
StreamingChoices(
finish_reason="stop",
index=0,
delta=Delta(),
logprobs=None,
)
],
usage=None,
),
# Chunk 5: final usage chunk, no choices
ModelResponseStream(
id="chatcmpl-abc123",
created=1742056047,
model=None,
object="chat.completion.chunk",
system_fingerprint=None,
choices=[],
usage=Usage(
completion_tokens=10,
prompt_tokens=20,
total_tokens=30,
),
),
]


@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_azure_streaming_role_with_include_usage(sync_mode: bool):
"""
Test for issue #24221: Azure streaming with stream_options.include_usage=True
should include role='assistant' in the first emitted chunk's delta.

Azure sends an initial chunk with no choices (prompt_filter_results).
When include_usage=True, this chunk is emitted rather than skipped.
The fix ensures strip_role_from_delta is called on that chunk so
role='assistant' appears in the stream.
"""
completion_stream = ModelResponseListIterator(
model_responses=azure_chunks_with_prompt_filter
)

response = CustomStreamWrapper(
completion_stream=completion_stream,
model="azure/gpt-4",
custom_llm_provider="azure",
logging_obj=Logging(
model="azure/gpt-4",
messages=[{"role": "user", "content": "Hey"}],
stream=True,
call_type="completion",
start_time=time.time(),
litellm_call_id="12345",
function_id="1245",
),
stream_options={"include_usage": True},
)

chunks = []
if sync_mode:
for chunk in response:
chunks.append(chunk)
else:
async for chunk in response:
chunks.append(chunk)

# At least one chunk must have role='assistant' in its delta
has_role = any(
hasattr(c, "choices")
and len(c.choices) > 0
and hasattr(c.choices[0], "delta")
and getattr(c.choices[0].delta, "role", None) == "assistant"
for c in chunks
)
assert (
has_role
), "No chunk contained role='assistant' in delta. " "Chunk deltas: " + str(
[c.choices[0].delta if c.choices else "no choices" for c in chunks]
)
Comment on lines +1980 to +1992
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Consider a stronger assertion for role placement

The current assertion only verifies that at least one chunk in the entire stream carries role='assistant'. After the fix, the role is attached to the empty prompt_filter_results chunk (the first chunk emitted), while the actual first content chunk has its role stripped by strip_role_from_delta. A stricter test would also confirm that the role appears on the correct chunk (the first yielded chunk) and is absent from later content chunks, preventing a future regression where both chunks could accidentally carry the role:

# Verify role appears in exactly the first emitted chunk
assert len(chunks) > 0, "No chunks were yielded"
first_chunk = chunks[0]
assert (
    len(first_chunk.choices) > 0
    and getattr(first_chunk.choices[0].delta, "role", None) == "assistant"
), f"Expected role='assistant' in the first chunk, got: {first_chunk.choices[0].delta if first_chunk.choices else 'no choices'}"

# Verify subsequent chunks do NOT repeat the role
for chunk in chunks[1:]:
    if chunk.choices:
        assert getattr(chunk.choices[0].delta, "role", None) != "assistant", \
            f"Unexpected role='assistant' in non-first chunk: {chunk.choices[0].delta}"

Loading