Conversation
Summary of ChangesHello @leonardmq, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the system's interaction with Claude models by enabling and improving reasoning capabilities and introducing a robust streaming mechanism. The changes allow for real-time processing of model outputs and ensure that reasoning steps are properly captured, particularly for models accessed via LiteLLM. This significantly improves the user experience for long-running model calls and provides more detailed insights into the model's thought process. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Important Review skippedIgnore keyword(s) in the title. ⛔ Ignored keywords (2)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds an async StreamingCompletion wrapper and threads an on_chunk StreamCallback through adapter layers to support litellm streaming; updates litellm dependency, model metadata, and extensive tests to validate chunking, callback propagation, and final response assembly. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant BaseAdapter as BaseAdapter
participant LiteLlmAdapter as LiteLlmAdapter
participant StreamingCompletion as StreamingCompletion
participant litellm as litellm
participant on_chunk as on_chunk Callback
Client->>BaseAdapter: invoke(input, on_chunk=callback)
BaseAdapter->>LiteLlmAdapter: _run(input, on_chunk=callback)
LiteLlmAdapter->>LiteLlmAdapter: _run_model_turn(on_chunk=callback)
LiteLlmAdapter->>StreamingCompletion: __aiter__() / create wrapper
StreamingCompletion->>litellm: acompletion(..., stream=True)
litellm-->>StreamingCompletion: async iterator of chunks
loop For each chunk
StreamingCompletion->>StreamingCompletion: collect chunk
StreamingCompletion-->>LiteLlmAdapter: yield chunk
LiteLlmAdapter->>on_chunk: await callback(chunk)
on_chunk-->>LiteLlmAdapter: callback awaited
end
StreamingCompletion->>litellm: stream_chunk_builder(collected_chunks)
litellm-->>StreamingCompletion: final assembled response
StreamingCompletion-->>LiteLlmAdapter: expose .response
LiteLlmAdapter-->>BaseAdapter: return RunOutput with final response
BaseAdapter-->>Client: complete TaskRun
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This PR introduces streaming support for litellm completions and fixes an issue with Claude models missing reasoning. The changes are well-implemented, with a new StreamingCompletion wrapper to handle streaming logic cleanly. The on_chunk callback is plumbed through the adapter stack correctly. The fix for Claude models by adding allowed_openai_params for OpenRouter is a good, well-commented solution for a provider-specific issue. The addition of unit tests for the new streaming utility and extensive integration tests for various models demonstrates thoroughness. I've left one minor suggestion in a test helper to make it more robust. Overall, this is a great PR.
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
Show resolved
Hide resolved
aafe5f3 to
446d4d1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py (1)
292-310:⚠️ Potential issue | 🟠 MajorAlways-streaming path creates a silent usage tracking regression for non-Claude providers.
acompletion_checking_responsenow unconditionally routes all completions throughStreamingCompletion, whereas it previously used non-streaminglitellm.acompletiondirectly. Before this change,response._hidden_params["response_cost"]andresponse.usagewere reliably populated for all providers. Now, usage data depends on whether each provider includes it in streaming chunks — providers that only include usage in non-streaming responses, or emit it in waysstream_chunk_builderdoesn't correctly reassemble, will silently returnUsage()with allNonefields.This is an intentional change to support Claude extended thinking (which requires streaming), but it affects all providers. Usage tracking is not tested with streaming responses, creating a gap between the feature change and test coverage.
Recommended actions:
- Either preserve the non-streaming path when
on_chunk is None:if on_chunk is not None: stream = StreamingCompletion(**kwargs) async for chunk in stream: await on_chunk(chunk) response = stream.response else: import litellm response = await litellm.acompletion(**kwargs)
- Or add an explicit test verifying
usage_from_responsecorrectly populates tokens and cost from a streamed response for at least one non-Claude provider (e.g., OpenAI GPT-4).The
build_extra_bodythinking_level fix (reasoning_effort + allowed_openai_params) is correct.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py` around lines 292 - 310, The change in acompletion_checking_response routes all completions through StreamingCompletion causing usage/_hidden_params["response_cost"] to be missing for providers that only populate usage on the non-streaming path; restore the previous non-streaming behavior when no chunk handler is provided or add tests to ensure streaming reconstructs usage. Update acompletion_checking_response so that if on_chunk is None it calls litellm.acompletion(**kwargs) and assigns that to response, otherwise use StreamingCompletion(**kwargs) and iterate chunks; alternatively add a test that verifies usage_from_response (or response._hidden_params["response_cost"]) is correctly populated for a non-Claude provider (e.g., OpenAI GPT-4) when using StreamingCompletion/stream_chunk_builder. Ensure references to StreamingCompletion, litellm.acompletion, acompletion_checking_response, and usage_from_response are used to locate/edit the code and tests.
🧹 Nitpick comments (2)
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py (1)
7-8:litellmdependency introduced inBaseAdapter.
from litellm.types.utils import ModelResponseStreamcouples the abstract base class (and all its non-litellm subclasses) to thelitellmpackage. Consider movingStreamCallbackto a standalonestreaming_types.pymodule (usingAnyor a protocol) so non-litellm adapters don't carry a transitivelitellmdependency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/base_adapter.py` around lines 7 - 8, BaseAdapter currently imports ModelResponseStream from litellm which forces a transitive litellm dependency; create a new standalone module streaming_types.py that defines a StreamCallback type (either as typing.Any or a lightweight typing.Protocol matching ModelResponseStream’s public API) and export any minimal streaming types there, then update libs/core/kiln_ai/adapters/model_adapters/base_adapter.py to import StreamCallback from streaming_types instead of ModelResponseStream and update any references in BaseAdapter to use StreamCallback; leave direct litellm imports only in adapters that actually need litellm-specific ModelResponseStream and adjust their imports accordingly.libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (1)
77-92: Doublegetattrinrender_chunkforreasoning_content.Lines 85-87 call
getattr(chunk.choices[0].delta, "reasoning_content", None)twice — once in theelifcondition and once to assigntext. The second call is redundant.♻️ Proposed simplification
- elif getattr(chunk.choices[0].delta, "reasoning_content", None) is not None: - text = getattr(chunk.choices[0].delta, "reasoning_content", None) - if text is not None: - self.render_reasoning(text) + elif (text := getattr(chunk.choices[0].delta, "reasoning_content", None)) is not None: + self.render_reasoning(text)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 77 - 92, In render_chunk, avoid calling getattr(chunk.choices[0].delta, "reasoning_content", None) twice: first evaluate and store it in a local variable (e.g., reasoning_text) then use that variable both for the truthy check and for passing to render_reasoning; update the branch under render_chunk where it currently has the two getattr calls so the condition checks the stored reasoning_text and the subsequent call passes that same variable to render_reasoning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/core/kiln_ai/adapters/ml_model_list.py`:
- Line 1348: The provider entries that set thinking_level (e.g., the
dict/constructor call containing thinking_level="medium") must also set
reasoning_capable=True so Claude providers use single-turn reasoning and extract
"thinking" output; update every provider config that currently only sets
thinking_level (lines referenced include the occurrences at
thinking_level="medium" and the other positions) to include
reasoning_capable=True. For Anthropic providers where applicable, instead add or
also set anthropic_extended_thinking=True. Locate the provider definitions in
ml_model_list.py (the entries that include thinking_level) and add the
appropriate boolean flag to those same dict/constructor calls.
---
Outside diff comments:
In `@libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py`:
- Around line 292-310: The change in acompletion_checking_response routes all
completions through StreamingCompletion causing
usage/_hidden_params["response_cost"] to be missing for providers that only
populate usage on the non-streaming path; restore the previous non-streaming
behavior when no chunk handler is provided or add tests to ensure streaming
reconstructs usage. Update acompletion_checking_response so that if on_chunk is
None it calls litellm.acompletion(**kwargs) and assigns that to response,
otherwise use StreamingCompletion(**kwargs) and iterate chunks; alternatively
add a test that verifies usage_from_response (or
response._hidden_params["response_cost"]) is correctly populated for a
non-Claude provider (e.g., OpenAI GPT-4) when using
StreamingCompletion/stream_chunk_builder. Ensure references to
StreamingCompletion, litellm.acompletion, acompletion_checking_response, and
usage_from_response are used to locate/edit the code and tests.
---
Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/base_adapter.py`:
- Around line 7-8: BaseAdapter currently imports ModelResponseStream from
litellm which forces a transitive litellm dependency; create a new standalone
module streaming_types.py that defines a StreamCallback type (either as
typing.Any or a lightweight typing.Protocol matching ModelResponseStream’s
public API) and export any minimal streaming types there, then update
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py to import
StreamCallback from streaming_types instead of ModelResponseStream and update
any references in BaseAdapter to use StreamCallback; leave direct litellm
imports only in adapters that actually need litellm-specific ModelResponseStream
and adjust their imports accordingly.
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 77-92: In render_chunk, avoid calling
getattr(chunk.choices[0].delta, "reasoning_content", None) twice: first evaluate
and store it in a local variable (e.g., reasoning_text) then use that variable
both for the truthy check and for passing to render_reasoning; update the branch
under render_chunk where it currently has the two getattr calls so the condition
checks the stored reasoning_text and the subsequent call passes that same
variable to render_reasoning.
446d4d1 to
9642085
Compare
📊 Coverage ReportOverall Coverage: 91% Diff: origin/main...HEAD
Summary
Line-by-lineView line-by-line diff coveragelibs/core/kiln_ai/adapters/model_adapters/litellm_adapter.pyLines 297-309 297
298 async def acompletion_checking_response(
299 self, on_chunk: StreamCallback | None = None, **kwargs
300 ) -> Tuple[ModelResponse, Choices]:
! 301 stream = StreamingCompletion(**kwargs)
! 302 async for chunk in stream:
! 303 if on_chunk is not None:
! 304 await on_chunk(chunk)
! 305 response = stream.response
306
307 if (
308 not isinstance(response, ModelResponse)
309 or not response.choicesLines 418-426 418
419 # anthropic does not need allowed_openai_params, and we get an error if we pass it in
420 # but openrouter for example does need it or throws an error
421 if provider.name == ModelProviderName.openrouter:
! 422 extra_body["allowed_openai_params"] = ["reasoning_effort"]
423
424 if provider.require_openrouter_reasoning:
425 # https://openrouter.ai/docs/use-cases/reasoning-tokens
426 extra_body["reasoning"] = {libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.pyLines 82-90 82 input: InputType,
83 input_source: DataSource | None = None,
84 on_chunk: StreamCallback | None = None,
85 ) -> TaskRun:
! 86 run_output, _ = await self.invoke_returning_run_output(
87 input, input_source, on_chunk=on_chunk
88 )
89 return run_output
|
… mutating existing ones
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (9)
libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py (1)
30-36: Re-iterating the same instance silently makes a duplicate API call.Every
async for chunk in stream:invocation calls__aiter__, which resets_response/_iteratedand issues a freshlitellm.acompletion. Since this is intended as a single-use wrapper (matching the pattern inacompletion_checking_response), this should either be documented in the class docstring or enforced with a guard:def __aiter__(self): if self._iterated: raise RuntimeError("StreamingCompletion has already been iterated; create a new instance.") ...🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py` around lines 30 - 36, The wrapper silently allows re-iteration causing duplicate litellm.acompletion calls; update the StreamingCompletion iterator logic (the __aiter__ method) to enforce single-use by checking self._iterated and raising a RuntimeError("StreamingCompletion has already been iterated; create a new instance.") if True, and only call litellm.acompletion and set self._response/_iterated on the first iteration; alternatively add this single-use behavior note to the class docstring for clarity.libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (4)
76-91:render_chunkassumeschunk.choicesis non-empty.Line 77 accesses
chunk.choices[0]without a guard. While litellm streaming chunks should always contain at least one choice, a defensive check would prevent a confusingIndexErrorif an unexpected chunk shape arrives.Suggested guard
async def render_chunk(self, chunk: litellm.ModelResponseStream): + if not chunk.choices: + return if chunk.choices[0].finish_reason is not None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 76 - 91, The render_chunk function assumes chunk.choices[0] exists and can raise IndexError for empty choices; add a defensive guard at the top of render_chunk (in the render_chunk method) that checks if not chunk.choices (or len(chunk.choices) == 0) and handle that case (e.g., call self.render_unknown(chunk) or return) before referencing chunk.choices[0]; then proceed with the existing logic for tool_calls, reasoning_content, content, and finish_reason.
127-153:StructuredOutputMode.unknownis fragile — preferdefault.Line 139 uses
StructuredOutputMode.unknown, which raisesValueError("Structured output mode is unknown.")inresponse_format_options(). It currently works only because the task has no output schema, sohas_structured_output()short-circuits. If anyone later adds an output schema to thetaskfixture, these tests will fail with an opaque error. UsingStructuredOutputMode.defaultwould be safer and more representative of real usage.Suggested fix
- structured_output_mode=StructuredOutputMode.unknown, + structured_output_mode=StructuredOutputMode.default,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 127 - 153, The test fixture uses StructuredOutputMode.unknown which can later raise ValueError in response_format_options() if the task gains an output schema; update the adapter_factory fixture to use StructuredOutputMode.default instead of .unknown when constructing the LiteLlmAdapter’s LiteLlmConfig/KilnAgentRunConfigProperties so has_structured_output() won’t short-circuit and tests remain stable (look for adapter_factory, LiteLlmAdapter, LiteLlmConfig, KilnAgentRunConfigProperties, response_format_options).
97-107: Deadcurrent_block_typefield inChunkRawRenderer.Line 100 initializes
self.current_block_typebut it's never read or written afterwards. Looks like a copy-paste artifact fromChunkRenderer.Fix
class ChunkRawRenderer(ChunkRendererAbstract): def __init__(self): self.chunks: list[litellm.ModelResponseStream] = [] - self.current_block_type: str | None = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 97 - 107, The ChunkRawRenderer class defines an unused field current_block_type (set in __init__) that is never read or mutated—remove this dead field to clean up the class; update the __init__ of ChunkRawRenderer to only initialize self.chunks and leave render_chunk and get_stream_text unchanged (methods: ChunkRawRenderer.__init__, ChunkRawRenderer.render_chunk, ChunkRawRenderer.get_stream_text).
156-172: Duplicated parametrize lists across 4 tests.The same 11-entry model/provider list is copy-pasted in all four
@pytest.mark.parametrizedecorators. Extract it to a module-level constant to reduce maintenance burden and risk of drift.Suggested approach
+STREAMING_TEST_MODELS = [ + ("claude_sonnet_4_5_thinking", ModelProviderName.openrouter), + ("claude_sonnet_4_5_thinking", ModelProviderName.anthropic), + # ... all entries ... + ("minimax_m2_5", ModelProviderName.openrouter), +] + `@pytest.mark.paid` -@pytest.mark.parametrize( - "model_id,provider_name", - [ - ("claude_sonnet_4_5_thinking", ModelProviderName.openrouter), - ... - ], -) +@pytest.mark.parametrize("model_id,provider_name", STREAMING_TEST_MODELS) async def test_acompletion_streaming_response(...):Also applies to: 271-287, 360-376, 389-405
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 156 - 172, Extract the repeated 11-entry list used in the pytest.mark.parametrize(...) decorators into a single module-level constant (e.g., SUPPORTED_MODEL_PROVIDER_PAIRS) and replace each duplicated inline list with that constant in the four tests that parametrize "model_id,provider_name"; keep the parameter names ("model_id", "provider_name") and the pytest.mark.parametrize call but pass SUPPORTED_MODEL_PROVIDER_PAIRS as the second argument to avoid copy-paste drift.libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py (1)
494-511: Deadlitellm.acompletionpatches remain in_run_model_turntests.In
test_run_model_turn_parallel_tools,test_run_model_turn_sequential_tools, andtest_run_model_turn_max_tool_calls_exceeded, thepatch("litellm.acompletion", ...)is now unreachable becauseacompletion_checking_response(which is also patched) is what would call it. These patches are harmless but misleading dead code.Example cleanup for test_run_model_turn_parallel_tools
with patch.object( litellm_adapter, "cached_available_tools", return_value=[multiply_spy, add_spy] ): - with patch( - "litellm.acompletion", - side_effect=[mock_response, final_response], - ): - with patch.object( - litellm_adapter, "build_completion_kwargs", return_value={} - ): - with patch.object( - litellm_adapter, - "acompletion_checking_response", - side_effect=[ - (mock_response, mock_response.choices[0]), - (final_response, final_response.choices[0]), - ], - ): - result = await litellm_adapter._run_model_turn( - provider, prior_messages, None, False - ) + with patch.object( + litellm_adapter, "build_completion_kwargs", return_value={} + ): + with patch.object( + litellm_adapter, + "acompletion_checking_response", + side_effect=[ + (mock_response, mock_response.choices[0]), + (final_response, final_response.choices[0]), + ], + ): + result = await litellm_adapter._run_model_turn( + provider, prior_messages, None, False + )Also applies to: 615-633, 703-717
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py` around lines 494 - 511, Remove the dead patch of "litellm.acompletion" from the three tests where it is never reached because ac ompletion_checking_response is patched to provide responses; specifically, edit test_run_model_turn_parallel_tools, test_run_model_turn_sequential_tools, and test_run_model_turn_max_tool_calls_exceeded to delete the with patch("litellm.acompletion", ...) context managers (the side_effect=[mock_response, final_response] blocks) and keep the existing patches for litellm_adapter.acompletion_checking_response, litellm_adapter.cached_available_tools, and litellm_adapter.build_completion_kwargs so the tests remain functionally identical but without the misleading, unreachable patch.libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py (2)
96-103:on_chunkis forwarded to every tool-call iteration turn.In
_run_model_turn, theon_chunkcallback fires for every inner loop iteration (tool-call turns), not just the final content turn. Consumers will receive interleaved chunks from reasoning, tool-call deltas, and final content across multiple model calls. This is probably fine for streaming UIs but worth documenting so callers know chunks aren't scoped to a single logical response.Also applies to: 125-127, 189-191, 224-231
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py` around lines 96 - 103, The on_chunk callback passed into _run_model_turn is currently forwarded to every inner tool-call iteration, causing streaming chunks from reasoning, tool deltas, and final content to be interleaved; either explicitly document this behavior in the _run_model_turn docstring (and the same places noted for the other iterations) or change the forwarding so on_chunk is only invoked for the final content turn (e.g., pass None or a no-op for tool-call iterations and only pass the real on_chunk when emitting the final response); update the comments/docstrings for the related inner-loop call sites so callers know chunks are not scoped to a single logical response.
298-316: Always-streaming design is tested and working, but consider a non-streaming fallback and verify logprobs coverage.The concern about
stream_chunk_builderpreserving response fields is partially validated by existing tests.test_litellm_adapter_streaming.pyextensively verifies thatreasoning_contentandtool_callsare correctly reassembled during streaming. However, two gaps remain:
Logprobs are untested in streaming: The codebase tracks
supports_logprobsas a model capability, but there are no tests verifying logprobs are preserved throughstream_chunk_builderreassembly. If logprobs are needed downstream during streaming, add coverage.Known upstream limitation: Some LiteLLM providers emit reasoning in streamed
delta.contentbut do not provide structuredreasoning_contentduring streaming—only in non-streaming responses. This is a provider-specific issue that may silently affect certain model/provider combinations.Optional: Consider keeping a non-streaming path when
on_chunk is Noneto avoid the streaming overhead (stream_chunk_builderreassembly) for callers that don't need live callbacks. This maintains backward compatibility for non-streaming callers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py` around lines 298 - 316, The acompletion_checking_response method currently always builds a StreamingCompletion and reassembles chunks even when no on_chunk callback is provided; change it to use a non-streaming path when on_chunk is None by calling the synchronous/non-streaming completion API (or awaiting a single-shot completion) instead of instantiating StreamingCompletion, and ensure the returned ModelResponse/Choices still include logprobs by preserving whatever field/flag the adapter uses (see supports_logprobs and stream_chunk_builder reassembly logic) so streamed and non-streamed responses have equivalent logprobs and reasoning_content/tool_calls; add tests exercising logprobs through streaming reassembly (test_litellm_adapter_streaming.py) and a new non-streaming case to verify behavior when on_chunk is None.libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py (1)
49-78:on_chunkis accepted but never forwarded to the tool execution.The
on_chunkparameter is threaded through the call chain but silently ignored in_run(Line 77 –tool.run(...)receives no chunk callback). This is fine for interface conformance withBaseAdapter._run, but worth a brief inline comment so future readers know streaming isn't supported for MCP tools yet.Suggested comment
+ # Note: on_chunk is accepted for interface conformance but MCP tools + # do not support streaming, so it is intentionally unused here. result = await tool.run(context=None, **tool_kwargs)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py` around lines 49 - 78, The _run method currently accepts an on_chunk callback but never forwards or documents it; update McpAdapter._run to either forward on_chunk into the tool execution if the tool supports streaming or, if streaming isn't supported for MCP tools yet, add a brief inline comment just above the tool.run(...) call explaining that on_chunk is intentionally ignored (mentioning the parameters _run and on_chunk and the call site tool.run) so future readers understand this limitation and don't assume a bug.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py`:
- Around line 48-60: The async iterator __aiter__ in litellm_streaming.py can be
aborted leaving _iterated False and _response unset and never closing the
underlying litellm stream; wrap the streaming logic in a try/finally: create the
stream as before, iterate and yield chunks inside try, and in finally always
call stream.aclose() if stream exists, set self._response =
litellm.stream_chunk_builder(chunks) (even if empty) and self._iterated = True
so stream.response works after interruption; ensure any exceptions are re-raised
after finalization so behavior is preserved.
---
Duplicate comments:
In `@libs/core/kiln_ai/adapters/ml_model_list.py`:
- Around line 1304-1326: The Claude “Thinking” model providers set
thinking_level="medium" but lack explicit reasoning flags; search the model
adapter logic that consumes
thinking_level/reasoning_capable/anthropic_extended_thinking (look for usages of
thinking_level, reasoning_capable, anthropic_extended_thinking in the
model_adapters) and then update the KilnModel provider entries (the KilnModel
instance for ModelName.claude_4_5_haiku_thinking and the other listed Claude
"Thinking" KilnModel blocks) to include reasoning_capable=True on providers that
support single-call reasoning and anthropic_extended_thinking=True only on the
ModelProviderName.anthropic provider entries; apply the same flag changes to the
other referenced blocks (around lines 1394-1439, 1459-1479, 1606-1651,
1691-1731) so the adapter will choose the single-call reasoning path where
appropriate.
---
Nitpick comments:
In `@libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py`:
- Around line 30-36: The wrapper silently allows re-iteration causing duplicate
litellm.acompletion calls; update the StreamingCompletion iterator logic (the
__aiter__ method) to enforce single-use by checking self._iterated and raising a
RuntimeError("StreamingCompletion has already been iterated; create a new
instance.") if True, and only call litellm.acompletion and set
self._response/_iterated on the first iteration; alternatively add this
single-use behavior note to the class docstring for clarity.
In `@libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py`:
- Around line 96-103: The on_chunk callback passed into _run_model_turn is
currently forwarded to every inner tool-call iteration, causing streaming chunks
from reasoning, tool deltas, and final content to be interleaved; either
explicitly document this behavior in the _run_model_turn docstring (and the same
places noted for the other iterations) or change the forwarding so on_chunk is
only invoked for the final content turn (e.g., pass None or a no-op for
tool-call iterations and only pass the real on_chunk when emitting the final
response); update the comments/docstrings for the related inner-loop call sites
so callers know chunks are not scoped to a single logical response.
- Around line 298-316: The acompletion_checking_response method currently always
builds a StreamingCompletion and reassembles chunks even when no on_chunk
callback is provided; change it to use a non-streaming path when on_chunk is
None by calling the synchronous/non-streaming completion API (or awaiting a
single-shot completion) instead of instantiating StreamingCompletion, and ensure
the returned ModelResponse/Choices still include logprobs by preserving whatever
field/flag the adapter uses (see supports_logprobs and stream_chunk_builder
reassembly logic) so streamed and non-streamed responses have equivalent
logprobs and reasoning_content/tool_calls; add tests exercising logprobs through
streaming reassembly (test_litellm_adapter_streaming.py) and a new non-streaming
case to verify behavior when on_chunk is None.
In `@libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py`:
- Around line 49-78: The _run method currently accepts an on_chunk callback but
never forwards or documents it; update McpAdapter._run to either forward
on_chunk into the tool execution if the tool supports streaming or, if streaming
isn't supported for MCP tools yet, add a brief inline comment just above the
tool.run(...) call explaining that on_chunk is intentionally ignored (mentioning
the parameters _run and on_chunk and the call site tool.run) so future readers
understand this limitation and don't assume a bug.
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 76-91: The render_chunk function assumes chunk.choices[0] exists
and can raise IndexError for empty choices; add a defensive guard at the top of
render_chunk (in the render_chunk method) that checks if not chunk.choices (or
len(chunk.choices) == 0) and handle that case (e.g., call
self.render_unknown(chunk) or return) before referencing chunk.choices[0]; then
proceed with the existing logic for tool_calls, reasoning_content, content, and
finish_reason.
- Around line 127-153: The test fixture uses StructuredOutputMode.unknown which
can later raise ValueError in response_format_options() if the task gains an
output schema; update the adapter_factory fixture to use
StructuredOutputMode.default instead of .unknown when constructing the
LiteLlmAdapter’s LiteLlmConfig/KilnAgentRunConfigProperties so
has_structured_output() won’t short-circuit and tests remain stable (look for
adapter_factory, LiteLlmAdapter, LiteLlmConfig, KilnAgentRunConfigProperties,
response_format_options).
- Around line 97-107: The ChunkRawRenderer class defines an unused field
current_block_type (set in __init__) that is never read or mutated—remove this
dead field to clean up the class; update the __init__ of ChunkRawRenderer to
only initialize self.chunks and leave render_chunk and get_stream_text unchanged
(methods: ChunkRawRenderer.__init__, ChunkRawRenderer.render_chunk,
ChunkRawRenderer.get_stream_text).
- Around line 156-172: Extract the repeated 11-entry list used in the
pytest.mark.parametrize(...) decorators into a single module-level constant
(e.g., SUPPORTED_MODEL_PROVIDER_PAIRS) and replace each duplicated inline list
with that constant in the four tests that parametrize "model_id,provider_name";
keep the parameter names ("model_id", "provider_name") and the
pytest.mark.parametrize call but pass SUPPORTED_MODEL_PROVIDER_PAIRS as the
second argument to avoid copy-paste drift.
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py`:
- Around line 494-511: Remove the dead patch of "litellm.acompletion" from the
three tests where it is never reached because ac ompletion_checking_response is
patched to provide responses; specifically, edit
test_run_model_turn_parallel_tools, test_run_model_turn_sequential_tools, and
test_run_model_turn_max_tool_calls_exceeded to delete the with
patch("litellm.acompletion", ...) context managers (the
side_effect=[mock_response, final_response] blocks) and keep the existing
patches for litellm_adapter.acompletion_checking_response,
litellm_adapter.cached_available_tools, and
litellm_adapter.build_completion_kwargs so the tests remain functionally
identical but without the misleading, unreachable patch.
| async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: | ||
| self._response = None | ||
| self._iterated = False | ||
|
|
||
| chunks: list[ModelResponseStream] = [] | ||
| stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs) | ||
|
|
||
| async for chunk in stream: | ||
| chunks.append(chunk) | ||
| yield chunk | ||
|
|
||
| self._response = litellm.stream_chunk_builder(chunks) | ||
| self._iterated = True |
There was a problem hiding this comment.
Missing try/finally leaves response inaccessible after any interrupted iteration.
If the consumer's async for body raises (e.g., an on_chunk callback throws) or if the streaming call itself fails, Python sends GeneratorExit into the generator at the yield point. The two lines after the loop never execute, so _iterated stays False and stream.response will always raise RuntimeError — callers cannot distinguish "not yet started" from "stream failed".
Additionally, the litellm stream object (a CustomStreamWrapper) won't have .aclose() called implicitly when the generator is abandoned without exhaustion.
🐛 Proposed fix — `try/finally` for guaranteed state finalization
async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
self._response = None
self._iterated = False
chunks: list[ModelResponseStream] = []
- stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs)
-
- async for chunk in stream:
- chunks.append(chunk)
- yield chunk
-
- self._response = litellm.stream_chunk_builder(chunks)
- self._iterated = True
+ try:
+ stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs)
+ async for chunk in stream:
+ chunks.append(chunk)
+ yield chunk
+ finally:
+ self._response = litellm.stream_chunk_builder(chunks) if chunks else None
+ self._iterated = True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py` around lines
48 - 60, The async iterator __aiter__ in litellm_streaming.py can be aborted
leaving _iterated False and _response unset and never closing the underlying
litellm stream; wrap the streaming logic in a try/finally: create the stream as
before, iterate and yield chunks inside try, and in finally always call
stream.aclose() if stream exists, set self._response =
litellm.stream_chunk_builder(chunks) (even if empty) and self._iterated = True
so stream.response works after interruption; ensure any exceptions are re-raised
after finalization so behavior is preserved.
…-memory' of github.com:Kiln-AI/Kiln into leonard/kil-420-adapter-add-streaming
…dapter-add-streaming
…dapter-add-streaming
… of github.com:Kiln-AI/Kiln into leonard/kil-420-adapter-add-streaming
|
Replaced with PR that handles multiturn + streaming + protocols: #1107 |
What does this PR do?
Changes:
thinking_levelon Claude modelsChecklists
Summary by CodeRabbit
New Features
Tests
Dependencies