Skip to content

Commit 47e811d

Browse files
fix(llm_http_handler.py): fix fake streaming (#10061)
* fix(llm_http_handler.py): fix fake streaming allows groq to work with llm_http_handler * fix(groq.py): migrate groq to openai like config ensures json mode handling works correctly
1 parent c603680 commit 47e811d

File tree

5 files changed

+157
-19
lines changed

5 files changed

+157
-19
lines changed

litellm/llms/base_llm/base_model_iterator.py

+62-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
import json
22
from abc import abstractmethod
3-
from typing import Optional, Union
3+
from typing import List, Optional, Union, cast
44

55
import litellm
6-
from litellm.types.utils import GenericStreamingChunk, ModelResponseStream
6+
from litellm.types.utils import (
7+
Choices,
8+
Delta,
9+
GenericStreamingChunk,
10+
ModelResponse,
11+
ModelResponseStream,
12+
StreamingChoices,
13+
)
714

815

916
class BaseModelResponseIterator:
@@ -121,6 +128,59 @@ async def __anext__(self):
121128
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
122129

123130

131+
class MockResponseIterator: # for returning ai21 streaming responses
132+
def __init__(
133+
self, model_response: ModelResponse, json_mode: Optional[bool] = False
134+
):
135+
self.model_response = model_response
136+
self.json_mode = json_mode
137+
self.is_done = False
138+
139+
# Sync iterator
140+
def __iter__(self):
141+
return self
142+
143+
def _chunk_parser(self, chunk_data: ModelResponse) -> ModelResponseStream:
144+
try:
145+
streaming_choices: List[StreamingChoices] = []
146+
for choice in chunk_data.choices:
147+
streaming_choices.append(
148+
StreamingChoices(
149+
index=choice.index,
150+
delta=Delta(
151+
**cast(Choices, choice).message.model_dump(),
152+
),
153+
finish_reason=choice.finish_reason,
154+
)
155+
)
156+
processed_chunk = ModelResponseStream(
157+
id=chunk_data.id,
158+
object="chat.completion",
159+
created=chunk_data.created,
160+
model=chunk_data.model,
161+
choices=streaming_choices,
162+
)
163+
return processed_chunk
164+
except Exception as e:
165+
raise ValueError(f"Failed to decode chunk: {chunk_data}. Error: {e}")
166+
167+
def __next__(self):
168+
if self.is_done:
169+
raise StopIteration
170+
self.is_done = True
171+
return self._chunk_parser(self.model_response)
172+
173+
# Async iterator
174+
def __aiter__(self):
175+
return self
176+
177+
async def __anext__(self):
178+
if self.is_done:
179+
raise StopAsyncIteration
180+
self.is_done = True
181+
return self._chunk_parser(self.model_response)
182+
183+
124184
class FakeStreamResponseIterator:
125185
def __init__(self, model_response, json_mode: Optional[bool] = False):
126186
self.model_response = model_response

litellm/llms/custom_httpx/llm_http_handler.py

+46-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from litellm.llms.base_llm.audio_transcription.transformation import (
1212
BaseAudioTranscriptionConfig,
1313
)
14+
from litellm.llms.base_llm.base_model_iterator import MockResponseIterator
1415
from litellm.llms.base_llm.chat.transformation import BaseConfig
1516
from litellm.llms.base_llm.embedding.transformation import BaseEmbeddingConfig
1617
from litellm.llms.base_llm.files.transformation import BaseFilesConfig
@@ -231,6 +232,7 @@ def completion(
231232
):
232233
json_mode: bool = optional_params.pop("json_mode", False)
233234
extra_body: Optional[dict] = optional_params.pop("extra_body", None)
235+
fake_stream = fake_stream or optional_params.pop("fake_stream", False)
234236

235237
provider_config = ProviderConfigManager.get_provider_chat_config(
236238
model=model, provider=litellm.LlmProviders(custom_llm_provider)
@@ -317,6 +319,7 @@ def completion(
317319
),
318320
litellm_params=litellm_params,
319321
json_mode=json_mode,
322+
optional_params=optional_params,
320323
)
321324

322325
else:
@@ -378,6 +381,7 @@ def completion(
378381
),
379382
litellm_params=litellm_params,
380383
json_mode=json_mode,
384+
optional_params=optional_params,
381385
)
382386
return CustomStreamWrapper(
383387
completion_stream=completion_stream,
@@ -426,6 +430,7 @@ def make_sync_call(
426430
model: str,
427431
messages: list,
428432
logging_obj,
433+
optional_params: dict,
429434
litellm_params: dict,
430435
timeout: Union[float, httpx.Timeout],
431436
fake_stream: bool = False,
@@ -457,11 +462,22 @@ def make_sync_call(
457462
)
458463

459464
if fake_stream is True:
460-
completion_stream = provider_config.get_model_response_iterator(
461-
streaming_response=response.json(),
462-
sync_stream=True,
465+
model_response: (ModelResponse) = provider_config.transform_response(
466+
model=model,
467+
raw_response=response,
468+
model_response=litellm.ModelResponse(),
469+
logging_obj=logging_obj,
470+
request_data=data,
471+
messages=messages,
472+
optional_params=optional_params,
473+
litellm_params=litellm_params,
474+
encoding=None,
463475
json_mode=json_mode,
464476
)
477+
478+
completion_stream: Any = MockResponseIterator(
479+
model_response=model_response, json_mode=json_mode
480+
)
465481
else:
466482
completion_stream = provider_config.get_model_response_iterator(
467483
streaming_response=response.iter_lines(),
@@ -491,6 +507,7 @@ async def acompletion_stream_function(
491507
logging_obj: LiteLLMLoggingObj,
492508
data: dict,
493509
litellm_params: dict,
510+
optional_params: dict,
494511
fake_stream: bool = False,
495512
client: Optional[AsyncHTTPHandler] = None,
496513
json_mode: Optional[bool] = None,
@@ -509,6 +526,7 @@ async def acompletion_stream_function(
509526
)
510527

511528
completion_stream, _response_headers = await self.make_async_call_stream_helper(
529+
model=model,
512530
custom_llm_provider=custom_llm_provider,
513531
provider_config=provider_config,
514532
api_base=api_base,
@@ -520,6 +538,8 @@ async def acompletion_stream_function(
520538
fake_stream=fake_stream,
521539
client=client,
522540
litellm_params=litellm_params,
541+
optional_params=optional_params,
542+
json_mode=json_mode,
523543
)
524544
streamwrapper = CustomStreamWrapper(
525545
completion_stream=completion_stream,
@@ -531,6 +551,7 @@ async def acompletion_stream_function(
531551

532552
async def make_async_call_stream_helper(
533553
self,
554+
model: str,
534555
custom_llm_provider: str,
535556
provider_config: BaseConfig,
536557
api_base: str,
@@ -540,8 +561,10 @@ async def make_async_call_stream_helper(
540561
logging_obj: LiteLLMLoggingObj,
541562
timeout: Union[float, httpx.Timeout],
542563
litellm_params: dict,
564+
optional_params: dict,
543565
fake_stream: bool = False,
544566
client: Optional[AsyncHTTPHandler] = None,
567+
json_mode: Optional[bool] = None,
545568
) -> Tuple[Any, httpx.Headers]:
546569
"""
547570
Helper function for making an async call with stream.
@@ -572,8 +595,21 @@ async def make_async_call_stream_helper(
572595
)
573596

574597
if fake_stream is True:
575-
completion_stream = provider_config.get_model_response_iterator(
576-
streaming_response=response.json(), sync_stream=False
598+
model_response: (ModelResponse) = provider_config.transform_response(
599+
model=model,
600+
raw_response=response,
601+
model_response=litellm.ModelResponse(),
602+
logging_obj=logging_obj,
603+
request_data=data,
604+
messages=messages,
605+
optional_params=optional_params,
606+
litellm_params=litellm_params,
607+
encoding=None,
608+
json_mode=json_mode,
609+
)
610+
611+
completion_stream: Any = MockResponseIterator(
612+
model_response=model_response, json_mode=json_mode
577613
)
578614
else:
579615
completion_stream = provider_config.get_model_response_iterator(
@@ -598,8 +634,12 @@ def _add_stream_param_to_request_body(
598634
"""
599635
Some providers like Bedrock invoke do not support the stream parameter in the request body, we only pass `stream` in the request body the provider supports it.
600636
"""
637+
601638
if fake_stream is True:
602-
return data
639+
# remove 'stream' from data
640+
new_data = data.copy()
641+
new_data.pop("stream", None)
642+
return new_data
603643
if provider_config.supports_stream_param_in_request_body is True:
604644
data["stream"] = True
605645
return data

litellm/llms/groq/chat/transformation.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
ChatCompletionToolParamFunctionChunk,
1515
)
1616

17-
from ...openai.chat.gpt_transformation import OpenAIGPTConfig
17+
from ...openai_like.chat.transformation import OpenAILikeChatConfig
1818

1919

20-
class GroqChatConfig(OpenAIGPTConfig):
20+
class GroqChatConfig(OpenAILikeChatConfig):
2121
frequency_penalty: Optional[int] = None
2222
function_call: Optional[Union[str, dict]] = None
2323
functions: Optional[list] = None
@@ -132,8 +132,11 @@ def map_openai_params(
132132
optional_params: dict,
133133
model: str,
134134
drop_params: bool = False,
135+
replace_max_completion_tokens_with_max_tokens: bool = False, # groq supports max_completion_tokens
135136
) -> dict:
136137
_response_format = non_default_params.get("response_format")
138+
if self._should_fake_stream(non_default_params):
139+
optional_params["fake_stream"] = True
137140
if _response_format is not None and isinstance(_response_format, dict):
138141
json_schema: Optional[dict] = None
139142
if "response_schema" in _response_format:
@@ -160,6 +163,8 @@ def map_openai_params(
160163
non_default_params.pop(
161164
"response_format", None
162165
) # only remove if it's a json_schema - handled via using groq's tool calling params.
163-
return super().map_openai_params(
166+
optional_params = super().map_openai_params(
164167
non_default_params, optional_params, model, drop_params
165168
)
169+
170+
return optional_params

litellm/llms/openai_like/chat/transformation.py

+38-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import httpx
88

99
from litellm.secret_managers.main import get_secret_str
10-
from litellm.types.llms.openai import ChatCompletionAssistantMessage
10+
from litellm.types.llms.openai import AllMessageValues, ChatCompletionAssistantMessage
1111
from litellm.types.utils import ModelResponse
1212

1313
from ...openai.chat.gpt_transformation import OpenAIGPTConfig
@@ -25,7 +25,6 @@ def _get_openai_compatible_provider_info(
2525
self,
2626
api_base: Optional[str],
2727
api_key: Optional[str],
28-
model: Optional[str] = None,
2928
) -> Tuple[Optional[str], Optional[str]]:
3029
api_base = api_base or get_secret_str("OPENAI_LIKE_API_BASE") # type: ignore
3130
dynamic_api_key = (
@@ -74,8 +73,8 @@ def _transform_response(
7473
messages: List,
7574
print_verbose,
7675
encoding,
77-
json_mode: bool,
78-
custom_llm_provider: str,
76+
json_mode: Optional[bool],
77+
custom_llm_provider: Optional[str],
7978
base_model: Optional[str],
8079
) -> ModelResponse:
8180
response_json = response.json()
@@ -97,14 +96,46 @@ def _transform_response(
9796

9897
returned_response = ModelResponse(**response_json)
9998

100-
returned_response.model = (
101-
custom_llm_provider + "/" + (returned_response.model or "")
102-
)
99+
if custom_llm_provider is not None:
100+
returned_response.model = (
101+
custom_llm_provider + "/" + (returned_response.model or "")
102+
)
103103

104104
if base_model is not None:
105105
returned_response._hidden_params["model"] = base_model
106106
return returned_response
107107

108+
def transform_response(
109+
self,
110+
model: str,
111+
raw_response: httpx.Response,
112+
model_response: ModelResponse,
113+
logging_obj: LiteLLMLoggingObj,
114+
request_data: dict,
115+
messages: List[AllMessageValues],
116+
optional_params: dict,
117+
litellm_params: dict,
118+
encoding: Any,
119+
api_key: Optional[str] = None,
120+
json_mode: Optional[bool] = None,
121+
) -> ModelResponse:
122+
return OpenAILikeChatConfig._transform_response(
123+
model=model,
124+
response=raw_response,
125+
model_response=model_response,
126+
stream=optional_params.get("stream", False),
127+
logging_obj=logging_obj,
128+
optional_params=optional_params,
129+
api_key=api_key,
130+
data=request_data,
131+
messages=messages,
132+
print_verbose=None,
133+
encoding=None,
134+
json_mode=json_mode,
135+
custom_llm_provider=None,
136+
base_model=None,
137+
)
138+
108139
def map_openai_params(
109140
self,
110141
non_default_params: dict,

litellm/types/utils.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,9 @@ class ModelResponseStream(ModelResponseBase):
10111011

10121012
def __init__(
10131013
self,
1014-
choices: Optional[List[Union[StreamingChoices, dict, BaseModel]]] = None,
1014+
choices: Optional[
1015+
Union[List[StreamingChoices], Union[StreamingChoices, dict, BaseModel]]
1016+
] = None,
10151017
id: Optional[str] = None,
10161018
created: Optional[int] = None,
10171019
provider_specific_fields: Optional[Dict[str, Any]] = None,

0 commit comments

Comments
 (0)