Skip to content

feat: streaming + multiturn#1100

Closed
leonardmq wants to merge 43 commits intoleonard/kil-421-adapter-support-multiturnfrom
leonard/kil-420-adapter-add-streaming-2
Closed

feat: streaming + multiturn#1100
leonardmq wants to merge 43 commits intoleonard/kil-421-adapter-support-multiturnfrom
leonard/kil-420-adapter-add-streaming-2

Conversation

@leonardmq
Copy link
Collaborator

@leonardmq leonardmq commented Mar 3, 2026

What does this PR do?

PR going into #1088

Merge our current streaming MVP with multiturn.

Checklists

  • Tests have been run locally and passed
  • New tests have been added to any work in /lib

Summary by CodeRabbit

  • New Features

    • Real-time streaming for model completions with chunked delivery and SDK-style stream events.
    • New UI error details component with troubleshooting steps and a Tool Server Config endpoint.
    • In-memory config properties for transient settings (e.g., autosave behavior).
  • Behavior Changes

    • Session continuation via prior run IDs removed — runs now start fresh unless explicitly continued.
  • Tests

    • Extensive streaming and converter tests covering chunking, tool-call flows, and edge cases.
  • Docs

    • Added Telemetry/MLOps integration guidance.

sfierro and others added 29 commits February 19, 2026 17:53
Allow tool server edit when it fails to connect
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

Caution

Review failed

Pull request was closed or merged during review

Walkthrough

Adds streaming-first infrastructure: a StreamingCompletion wrapper for litellm streaming, an AdapterStream orchestration with AI-SDK event conversion, threads streaming hooks through adapters, updates MCP error handling, introduces in-memory config properties, many tests, and related UI/backend endpoints and version bumps.

Changes

Cohort / File(s) Summary
Streaming utils & tests
libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py, libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
Adds StreamingCompletion async-iterable wrapper for litellm.acompletion (collects chunks, exposes .response), plus unit tests validating chunk ordering, arg forwarding, re-iteration, and lifecycle.
Adapter streaming core
libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py, libs/core/kiln_ai/adapters/model_adapters/stream_events.py, libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py
New streaming orchestration (AdapterStream, AdapterStreamResult) and AI-SDK event converter (AiSdkStreamConverter, event types) implementing per-turn streaming, tool-call events, and final aggregation.
Base & litellm adapters
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py, libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
Adds streaming entry points and types, threads streaming callback/AdapterStream through adapter flow, integrates StreamingCompletion and _create_run_stream hook for provider streaming.
MCP adapter & MCP tooling
libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py, libs/core/kiln_ai/tools/mcp_session_manager.py, libs/core/kiln_ai/tools/mcp_server_tool.py
Renames/threads prior_trace across signatures, adds KilnMCPError for MCP failures, changes MCPServerTool error return to structured JSON error instead of raising.
Adapter tests & streaming test suites
libs/core/kiln_ai/adapters/model_adapters/test_*, libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py, libs/core/kiln_ai/adapters/model_adapters/test_adapter_stream.py, libs/core/kiln_ai/adapters/model_adapters/test_stream_events.py
Large additions/updates: new comprehensive streaming tests (renderers, fixtures, parameterized provider/model combos), refactors mocks to patch LiteLlmAdapter.acompletion_checking_response, adapts tests to prior_trace semantics and **kwargs.
Config in-memory support & tests
libs/core/kiln_ai/utils/config.py, libs/core/kiln_ai/utils/test_config.py
Adds in_memory flag to ConfigProperty, in-memory settings store, masking/persistence logic, and extensive unit tests covering in-memory behavior and persistence.
Tool API & UI for tool servers
app/desktop/studio_server/tool_api.py, app/web_ui/src/lib/api_schema.d.ts, app/web_ui/src/routes/.../tool_servers/*, app/web_ui/src/routes/.../edit_tool_server/+page.svelte
Adds richer MCP error handling and diagnostics in tool_api, new endpoint get_tool_server_config, updates API schema, and replaces various UI error blocks with ErrorDetailsBlock using the new endpoint.
Run API / session continuation removal
libs/server/kiln_server/run_api.py, libs/server/kiln_server/test_run_api.py, app/web_ui/src/lib/utils/update.ts
Removes task_run_id from RunTaskRequest (no session continuation via API), updates tests; bumps app versions (0.24.0→0.25.0) in UI and packaging files.
CI / tooling and minor UI changes
.github/workflows/*, checks.sh, hooks_mcp.yaml, various Svelte pages
Fixes uvx→uv run command usage in CI and scripts; adds tooltips and small UI tweaks (spec builder: tool_id plumbing; prompts table tooltip; error UI component).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Application/Caller
    participant Adapter as LiteLlmAdapter / BaseAdapter
    participant Stream as StreamingCompletion
    participant LiteLLM as litellm (provider)
    participant Callback as on_chunk Callback

    Client->>Adapter: invoke(..., on_chunk=callback)
    Adapter->>Adapter: prepare streaming run / build kwargs
    Adapter->>Stream: instantiate StreamingCompletion(*args, **kwargs)
    Note over Stream,LiteLLM: StreamingCompletion starts litellm.acompletion(stream=True)
    loop per streaming chunk
        LiteLLM-->>Stream: ModelResponseStream chunk
        Stream-->>Adapter: yield chunk
        Adapter->>Callback: await on_chunk(chunk)
        Callback-->>Adapter: handled
    end
    LiteLLM-->>Stream: stream complete
    Stream->>Stream: stream_chunk_builder(chunks) -> final response
    Stream-->>Adapter: expose .response
    Adapter-->>Client: return final aggregated RunOutput / response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • sfierro

Poem

🐰 I hop on streams and nibble each byte,
Chunks parade by in morning light.
I stitch them gentle, one, two, three—
A final answer blooms for thee.
Hooray for streaming: carrot celebratory!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete and missing required template sections. It lacks Related Issues and does not include the Contributor License Agreement confirmation required by the template. Add the Related Issues section with issue links (PR #1088, #1091) and include the CLA confirmation with the GitHub username before merge.
Docstring Coverage ⚠️ Warning Docstring coverage is 37.93% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat: streaming + multiturn' clearly and concisely describes the main changes: adding streaming and multiturn support as a feature. It is directly relevant to the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch leonard/kil-420-adapter-add-streaming-2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the interactivity and responsiveness of AI agent interactions by introducing robust streaming capabilities. It provides a foundational StreamingCompletion utility for LiteLLM, integrates a flexible on_chunk callback into the core adapter architecture, and updates the LiteLLM adapter to leverage this streaming for real-time chunk processing. This allows for immediate feedback during model responses and supports streaming in multi-turn conversations, improving the user experience for long-running or interactive AI tasks.

Highlights

  • Streaming Completion Utility: Introduced a new StreamingCompletion class to wrap litellm.acompletion for asynchronous streaming, allowing chunks to be yielded as they arrive and the final response to be assembled after iteration.
  • Streaming Callback Integration: Added an on_chunk callback parameter to the BaseAdapter's invoke and _run methods, enabling real-time processing of streaming chunks across different model adapters.
  • LiteLLM Adapter Streaming Support: Refactored the LiteLlmAdapter's acompletion_checking_response to utilize the new StreamingCompletion class, effectively integrating streaming capabilities into LiteLLM model interactions.
  • Comprehensive Streaming Tests: Added a dedicated test file (test_litellm_adapter_streaming.py) with extensive tests for the new streaming functionality, covering chunk collection, content rendering, tool calls, and multi-turn conversation streaming across various LiteLLM models and providers.
  • Adapter Method Signature Updates: Updated the _run methods in BaseAdapter and its mock implementations to accept arbitrary keyword arguments (**kwargs), accommodating the new on_chunk parameter without breaking existing functionality.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
    • Added StreamingCompletion class for asynchronous LiteLLM streaming.
  • libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
    • Added unit tests for the StreamingCompletion class, verifying chunk yielding, response assembly, and argument forwarding.
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
    • Imported Awaitable, Callable, and ModelResponseStream types.
    • Defined StreamCallback type for asynchronous chunk processing.
    • Added on_chunk parameter to invoke and _run_returning_run_output methods.
    • Passed on_chunk parameter to internal _run calls.
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
    • Removed direct litellm import.
    • Imported StreamingCompletion and StreamCallback.
    • Added on_chunk parameter to _run_model_turn and _run methods.
    • Modified acompletion_checking_response to use StreamingCompletion and invoke on_chunk for each received chunk.
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py
    • Imported StreamCallback type.
    • Added on_chunk parameter to _run, invoke, and invoke_returning_run_output methods.
  • libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py
    • Updated MockAdapter._run signature to accept **kwargs.
    • Modified mock _run functions in tests to accept **kwargs and retrieve prior_trace from them.
    • Added TestStreamCallback class with tests to verify on_chunk parameter forwarding and default behavior.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py
    • Replaced direct litellm.acompletion patches with patch.object on LiteLlmAdapter.acompletion_checking_response to align with streaming changes.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
    • Added new test file to verify LiteLLM streaming functionality.
    • Implemented ChunkRendererAbstract, ChunkRenderer, and ChunkRawRenderer for testing chunk processing and rendering.
    • Included parameterized tests for acompletion_streaming_response, acompletion_streaming_chunks, acompletion_streaming_rendering, acompletion_streaming_rendering_raw_chunks, and acompletion_streaming_with_existing_run across various models and providers.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py
    • Updated run_simple_task_with_tools to include on_chunk handler for streaming tests.
    • Modified test_tools_simplied_mocked and test_tools_mocked to patch LiteLlmAdapter.acompletion_checking_response with AsyncMock and handle on_chunk.
  • libs/core/kiln_ai/adapters/model_adapters/test_saving_adapter_results.py
    • Updated MockAdapter._run signature to accept **kwargs.
    • Modified mock _run function in test_invoke_continue_session to retrieve prior_trace from kwargs.
  • libs/core/kiln_ai/adapters/model_adapters/test_structured_output.py
    • Imported AsyncMock and LiteLlmAdapter.
    • Updated MockAdapter._run signature to accept **kwargs.
    • Replaced direct litellm.acompletion patches with patch.object on LiteLlmAdapter.acompletion_checking_response.
  • libs/core/kiln_ai/adapters/test_prompt_adaptors.py
    • Imported AsyncMock and ModelResponse from litellm.types.utils.
    • Replaced direct litellm.acompletion patch with patch.object on LiteLlmAdapter.acompletion_checking_response.
  • libs/core/kiln_ai/adapters/test_prompt_builders.py
    • Updated MockAdapter._run signature to accept **kwargs.
  • libs/core/kiln_ai/datamodel/test_basemodel.py
    • Updated MockAdapter._run signature to accept **kwargs.
Activity
  • The pull request introduces a new StreamingCompletion class to manage asynchronous streaming from LiteLLM, providing a structured way to handle incoming data chunks.
  • A StreamCallback type was defined and integrated into the BaseAdapter's invoke and _run methods, allowing external functions to process streaming data in real-time.
  • The LiteLlmAdapter was updated to use the new StreamingCompletion class, ensuring that all LiteLLM interactions can now support streaming.
  • Extensive new tests were added in test_litellm_adapter_streaming.py to validate the streaming functionality across various models and providers, including scenarios for chunk collection, content rendering, tool calls, and multi-turn conversations.
  • Existing mock adapter implementations and test patches were updated to accommodate the new on_chunk parameter and the refactored acompletion_checking_response method, ensuring backward compatibility and proper test coverage.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@leonardmq
Copy link
Collaborator Author

Paid integration tests will fail until #1091 gets merged - it is holding the reasoning fix for Claude (currently that PR is incomplete / partially broken)

@github-actions
Copy link

github-actions bot commented Mar 3, 2026

📊 Coverage Report

Overall Coverage: 91%

Diff: origin/leonard/kil-421-adapter-support-multiturn...HEAD

  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py (100%)
  • libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py (92.8%): Missing lines 28,78,93,103,122,132,172,202,279,291
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py (63.9%): Missing lines 65,315-317,319,390,400,409-410,426-428,430-432,434-438,441,447-448,452,455,467,471-472,480,489,493,498-500,502
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py (25.0%): Missing lines 270-272,274-275,279
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py (100%)
  • libs/core/kiln_ai/adapters/model_adapters/stream_events.py (93.6%): Missing lines 95,197-199,266,270-273

Summary

  • Total: 412 lines
  • Missing: 60 lines
  • Coverage: 85%

Line-by-line

View line-by-line diff coverage

libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py

Lines 24-32

  24 from kiln_ai.adapters.run_output import RunOutput
  25 from kiln_ai.datamodel import Usage
  26 
  27 if TYPE_CHECKING:
! 28     from kiln_ai.adapters.model_adapters.litellm_adapter import LiteLlmAdapter
  29 
  30 MAX_CALLS_PER_TURN = 10
  31 MAX_TOOL_CALLS_PER_TURN = 30

Lines 74-82

  74                 "AdapterStream has not been iterated yet. "
  75                 "Use 'async for event in stream:' before accessing .result"
  76             )
  77         if self._result is None:
! 78             raise RuntimeError("AdapterStream completed without producing a result")
  79         return self._result
  80 
  81     async def __aiter__(self) -> AsyncIterator[AdapterStreamEvent]:
  82         self._result = None

Lines 89-97

  89 
  90         while True:
  91             turns += 1
  92             if turns > MAX_CALLS_PER_TURN:
! 93                 raise RuntimeError(
  94                     f"Too many turns ({turns}). Stopping iteration to avoid using too many tokens."
  95                 )
  96 
  97             turn = self._chat_formatter.next_turn(prior_output)

Lines 99-107

   99                 break
  100 
  101             for message in turn.messages:
  102                 if message.content is None:
! 103                     raise ValueError("Empty message content isn't allowed")
  104                 self._messages.append(
  105                     {"role": message.role, "content": message.content}  # type: ignore[arg-type]
  106                 )

Lines 118-126

  118                 else:
  119                     yield event
  120 
  121             if not prior_output:
! 122                 raise RuntimeError("No assistant message/output returned from model")
  123 
  124         logprobs = self._adapter._extract_and_validate_logprobs(final_choice)
  125 
  126         intermediate_outputs = self._chat_formatter.intermediate_outputs()

Lines 128-136

  128             final_choice, intermediate_outputs
  129         )
  130 
  131         if not isinstance(prior_output, str):
! 132             raise RuntimeError(f"assistant message is not a string: {prior_output}")
  133 
  134         trace = self._adapter.all_messages_to_trace(self._messages)
  135         self._result = AdapterStreamResult(
  136             run_output=RunOutput(

Lines 168-176

  168 
  169             content = response_choice.message.content
  170             tool_calls = response_choice.message.tool_calls
  171             if not content and not tool_calls:
! 172                 raise ValueError(
  173                     "Model returned an assistant message, but no content or tool calls. This is not supported."
  174                 )
  175 
  176             self._messages.append(response_choice.message)

Lines 198-206

  198                     usage=usage,
  199                 )
  200                 return
  201 
! 202             raise RuntimeError(
  203                 "Model returned neither content nor tool calls. It must return at least one of these."
  204             )
  205 
  206         raise RuntimeError(

Lines 275-283

  275         or not response.choices
  276         or len(response.choices) == 0
  277         or not isinstance(response.choices[0], Choices)
  278     ):
! 279         raise RuntimeError(
  280             f"Expected ModelResponse with Choices, got {type(response)}."
  281         )
  282     return response, response.choices[0]

Lines 287-292

  287 ) -> str:
  288     for tc in tool_calls:
  289         if tc.id == tool_call_id:
  290             return tc.function.name or "unknown"
! 291     return "unknown"

libs/core/kiln_ai/adapters/model_adapters/base_adapter.py

Lines 61-69

  61 from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
  62 from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
  63 
  64 if TYPE_CHECKING:
! 65     from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream
  66 
  67 
  68 @dataclass
  69 class AdapterConfig:

Lines 311-323

  311 
  312         try:
  313             adapter_stream = self._prepare_stream(input, existing_run)
  314 
! 315             async for event in adapter_stream:
! 316                 if isinstance(event, ModelResponseStream):
! 317                     yield event
  318 
! 319             self._finalize_stream(adapter_stream, input, input_source, existing_run)
  320         finally:
  321             if is_root_agent:
  322                 try:
  323                     run_id = get_agent_run_id()

Lines 386-394

  386         input: InputType,
  387         existing_run: TaskRun | None,
  388     ) -> AdapterStream:
  389         if self.input_schema is not None:
! 390             validate_schema_with_value_error(
  391                 input,
  392                 self.input_schema,
  393                 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  394                 require_object=False,

Lines 396-404

  396 
  397         if existing_run is not None and (
  398             not existing_run.trace or len(existing_run.trace) == 0
  399         ):
! 400             raise ValueError(
  401                 "Run has no trace. Cannot continue session without conversation history."
  402             )
  403 
  404         prior_trace = existing_run.trace if existing_run else None

Lines 405-414

  405 
  406         formatted_input = input
  407         formatter_id = self.model_provider().formatter
  408         if formatter_id is not None:
! 409             formatter = request_formatter_from_id(formatter_id)
! 410             formatted_input = formatter.format_input(input)
  411 
  412         return self._create_run_stream(formatted_input, prior_trace)
  413 
  414     def _finalize_stream(

Lines 422-445

  422         At the end of the stream, we still need to validate the output, create a run and everything
  423         else that a non-streaming invocation would do.
  424         """
  425 
! 426         result: AdapterStreamResult = adapter_stream.result
! 427         run_output = result.run_output
! 428         usage = result.usage
  429 
! 430         provider = self.model_provider()
! 431         parser = model_parser_from_id(provider.parser)
! 432         parsed_output = parser.parse_output(original_output=run_output)
  433 
! 434         if self.output_schema is not None:
! 435             if isinstance(parsed_output.output, str):
! 436                 parsed_output.output = parse_json_string(parsed_output.output)
! 437             if not isinstance(parsed_output.output, dict):
! 438                 raise RuntimeError(
  439                     f"structured response is not a dict: {parsed_output.output}"
  440                 )
! 441             validate_schema_with_value_error(
  442                 parsed_output.output,
  443                 self.output_schema,
  444                 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  445             )

Lines 443-459

  443                 self.output_schema,
  444                 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  445             )
  446         else:
! 447             if not isinstance(parsed_output.output, str):
! 448                 raise RuntimeError(
  449                     f"response is not a string for non-structured task: {parsed_output.output}"
  450                 )
  451 
! 452         trace_has_toolcalls = parsed_output.trace is not None and any(
  453             message.get("role", None) == "tool" for message in parsed_output.trace
  454         )
! 455         if (
  456             provider.reasoning_capable
  457             and (
  458                 not parsed_output.intermediate_outputs
  459                 or "reasoning" not in parsed_output.intermediate_outputs

Lines 463-476

  463                 and self.has_structured_output()
  464             )
  465             and not trace_has_toolcalls
  466         ):
! 467             raise RuntimeError(
  468                 "Reasoning is required for this model, but no reasoning was returned."
  469             )
  470 
! 471         if existing_run is not None:
! 472             merged_output = RunOutput(
  473                 output=parsed_output.output,
  474                 intermediate_outputs=parsed_output.intermediate_outputs
  475                 or run_output.intermediate_outputs,
  476                 output_logprobs=parsed_output.output_logprobs

Lines 476-484

  476                 output_logprobs=parsed_output.output_logprobs
  477                 or run_output.output_logprobs,
  478                 trace=run_output.trace,
  479             )
! 480             run = self.generate_run(
  481                 input,
  482                 input_source,
  483                 merged_output,
  484                 usage,

Lines 485-506

  485                 run_output.trace,
  486                 existing_run=existing_run,
  487             )
  488         else:
! 489             run = self.generate_run(
  490                 input, input_source, parsed_output, usage, run_output.trace
  491             )
  492 
! 493         if (
  494             self.base_adapter_config.allow_saving
  495             and Config.shared().autosave_runs
  496             and self.task.path is not None
  497         ):
! 498             run.save_to_file()
! 499         elif existing_run is None:
! 500             run.id = None
  501 
! 502         return run
  503 
  504     def has_structured_output(self) -> bool:
  505         return self.output_schema is not None

libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py

Lines 266-283

  266         self,
  267         input: InputType,
  268         prior_trace: list[ChatCompletionMessageParam] | None = None,
  269     ) -> AdapterStream:
! 270         provider = self.model_provider()
! 271         if not provider.model_id:
! 272             raise ValueError("Model ID is required for OpenAI compatible models")
  273 
! 274         chat_formatter = self.build_chat_formatter(input, prior_trace)
! 275         initial_messages: list[ChatCompletionMessageIncludingLiteLLM] = copy.deepcopy(
  276             chat_formatter.initial_messages()
  277         )
  278 
! 279         return AdapterStream(
  280             adapter=self,
  281             provider=provider,
  282             chat_formatter=chat_formatter,
  283             initial_messages=initial_messages,

libs/core/kiln_ai/adapters/model_adapters/stream_events.py

Lines 91-99

  91                 self._finish_reason = choice.finish_reason
  92 
  93             delta = choice.delta
  94             if delta is None:
! 95                 continue
  96 
  97             reasoning_content = getattr(delta, "reasoning_content", None)
  98             if reasoning_content is not None:
  99                 if not self._reasoning_started:

Lines 193-203

  193                             )
  194                         )
  195 
  196         if not chunk.choices:
! 197             usage = getattr(chunk, "usage", None)
! 198             if usage is not None:
! 199                 self._usage_data = usage
  200 
  201         return events
  202 
  203     def convert_tool_event(self, event: ToolCallEvent) -> list[AiSdkStreamEvent]:

Lines 262-277

  262         if self._finish_reason is not None:
  263             finish_payload["finishReason"] = self._finish_reason.replace("_", "-")
  264 
  265         if self._usage_data is not None:
! 266             usage_payload: dict[str, Any] = {
  267                 "promptTokens": self._usage_data.prompt_tokens,
  268                 "completionTokens": self._usage_data.completion_tokens,
  269             }
! 270             total = getattr(self._usage_data, "total_tokens", None)
! 271             if total is not None:
! 272                 usage_payload["totalTokens"] = total
! 273             finish_payload["usage"] = usage_payload
  274 
  275         if finish_payload:
  276             events.append(
  277                 AiSdkStreamEvent(


Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces streaming and multi-turn capabilities, which is a significant feature enhancement. The implementation is well-structured, with a new StreamingCompletion class that neatly encapsulates the streaming logic from litellm. The on_chunk callback is cleanly plumbed through the adapter layers. The tests have been updated to accommodate the new functionality, and new comprehensive tests for streaming have been added. The changes are robust and well-thought-out. I have one minor suggestion for improvement in a test utility class to make it more robust.

Note: Security Review did not run due to the size of the PR.

…_streaming.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (1)

239-242: Consider relaxing the tool call count assertion.

The assertion len(tool_calls) == 1 assumes the model will make exactly one tool call for "123 + 321 = ?". While this is likely true for this simple math, some models might:

  • Make intermediate tool calls before the final calculation
  • Use a multi-step approach

If this becomes flaky, consider:

assert len(tool_calls) >= 1, "Expected at least one tool call (to do the math)"

However, since these are paid integration tests with specific model versions, the current assertion may be intentionally strict to detect behavioral changes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`
around lines 239 - 242, The test's strict check on tool call count (the
assertion using tool_calls in test_litellm_adapter_streaming.py) can be flaky;
update the assertion that currently enforces len(tool_calls) == 1 to allow one
or more calls (e.g., require len(tool_calls) >= 1) so the test accepts
multi-step or intermediate tool calls while keeping the other assertions on
reasoning_contents and contents intact; locate the assertion referencing
tool_calls and change the equality check to a greater-or-equal check.
libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py (1)

49-54: Clarify: on_chunk parameter accepted but unused in MCP adapter.

The on_chunk callback is added to the public API (invoke, invoke_returning_run_output, _run) for signature consistency with BaseAdapter, but it's never invoked or forwarded. Since MCP tools are single-turn and don't stream, this is likely intentional, but the silent acceptance without usage could be confusing to callers who expect streaming behavior.

Consider either:

  1. Documenting that streaming is not supported for MCP adapter
  2. Logging a warning if on_chunk is provided but won't be used
📝 Optional: Add warning when on_chunk is provided
 async def _run(
     self,
     input: InputType,
     prior_trace: list[ChatCompletionMessageParam] | None = None,
     on_chunk: StreamCallback | None = None,
 ) -> Tuple[RunOutput, Usage | None]:
     if prior_trace is not None:
         raise NotImplementedError(
             "Session continuation is not supported for MCP adapter. "
             "MCP tools are single-turn and do not maintain conversation state."
         )
+    if on_chunk is not None:
+        import warnings
+        warnings.warn(
+            "on_chunk callback provided but MCP adapter does not support streaming. "
+            "The callback will not be invoked.",
+            stacklevel=2
+        )

Also applies to: 89-103, 107-113

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py` around lines 49 -
54, The MCP adapter currently accepts an on_chunk callback in the public
signatures (MCPAdapter._run, MCPAdapter.invoke,
MCPAdapter.invoke_returning_run_output) but never uses it; add a clear warning
when callers supply on_chunk to avoid confusion: in each of the methods (_run,
invoke, invoke_returning_run_output) check if on_chunk is not None and emit a
debug/warn log (using the adapter's logger or process logger) stating that
MCPAdapter does not support streaming and on_chunk will be ignored, then proceed
as before; alternatively you can also update the docstring of MCPAdapter to
state streaming is unsupported, but prefer adding the runtime warning to make
the behavior explicit to callers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py`:
- Around line 49-54: The MCP adapter currently accepts an on_chunk callback in
the public signatures (MCPAdapter._run, MCPAdapter.invoke,
MCPAdapter.invoke_returning_run_output) but never uses it; add a clear warning
when callers supply on_chunk to avoid confusion: in each of the methods (_run,
invoke, invoke_returning_run_output) check if on_chunk is not None and emit a
debug/warn log (using the adapter's logger or process logger) stating that
MCPAdapter does not support streaming and on_chunk will be ignored, then proceed
as before; alternatively you can also update the docstring of MCPAdapter to
state streaming is unsupported, but prefer adding the runtime warning to make
the behavior explicit to callers.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 239-242: The test's strict check on tool call count (the assertion
using tool_calls in test_litellm_adapter_streaming.py) can be flaky; update the
assertion that currently enforces len(tool_calls) == 1 to allow one or more
calls (e.g., require len(tool_calls) >= 1) so the test accepts multi-step or
intermediate tool calls while keeping the other assertions on reasoning_contents
and contents intact; locate the assertion referencing tool_calls and change the
equality check to a greater-or-equal check.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04748e4 and 1464fb8.

📒 Files selected for processing (15)
  • libs/core/kiln_ai/adapters/litellm_utils/__init__.py
  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
  • libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py
  • libs/core/kiln_ai/adapters/model_adapters/test_saving_adapter_results.py
  • libs/core/kiln_ai/adapters/model_adapters/test_structured_output.py
  • libs/core/kiln_ai/adapters/test_prompt_adaptors.py
  • libs/core/kiln_ai/adapters/test_prompt_builders.py
  • libs/core/kiln_ai/datamodel/test_basemodel.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (1)

157-171: Deduplicate the repeated model/provider matrix.

The same parameter list is copied across tests, which increases drift risk when updating coverage.

Suggested refactor
+STREAMING_MODEL_MATRIX = [
+    ("claude_sonnet_4_5", ModelProviderName.openrouter),
+    ("claude_sonnet_4_5", ModelProviderName.anthropic),
+    ("claude_sonnet_4_6", ModelProviderName.openrouter),
+    ("claude_sonnet_4_6", ModelProviderName.anthropic),
+    ("claude_opus_4_5", ModelProviderName.openrouter),
+    ("claude_opus_4_5", ModelProviderName.anthropic),
+    ("claude_opus_4_6", ModelProviderName.openrouter),
+    ("claude_opus_4_6", ModelProviderName.anthropic),
+    ("minimax_m2_5", ModelProviderName.openrouter),
+    ("claude_4_5_haiku", ModelProviderName.openrouter),
+    ("claude_4_5_haiku", ModelProviderName.anthropic),
+]
+
 `@pytest.mark.paid`
-@pytest.mark.parametrize("model_id,provider_name", [ ... ])
+@pytest.mark.parametrize("model_id,provider_name", STREAMING_MODEL_MATRIX)

Also applies to: 274-286, 363-375, 392-402, 419-429

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`
around lines 157 - 171, Create a shared parameter matrix constant (e.g.,
MODEL_PROVIDER_MATRIX) containing the tuple pairs used in
pytest.mark.parametrize and replace the repeated inline lists in each test
decorator (the parametrize calls in this file such as those decorating tests
around lines shown in test_litellm_adapter_streaming.py) with a reference to
that constant; update all affected decorators (the ones at the ranges referenced
in the comment) to use the same constant to eliminate duplication and keep
future updates centralized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 386-387: The assertions using renderer.get_stream_text() currently
check "is not None" which is a no-op for a str; change both occurrences (the
assertions around get_stream_text()) to assert the string is non-empty, e.g.
assert renderer.get_stream_text().strip() != "" or assert
len(renderer.get_stream_text()) > 0, and preferably also assert it contains an
expected substring/token if available (use the same renderer.get_stream_text()
calls in the surrounding test function to strengthen the rendering checks).
- Around line 82-89: The current chunk handling uses an elif chain that drops
subsequent fields when multiple are present; update the logic in the streaming
handler (the block calling render_tool_call, render_reasoning, render_content on
chunk.choices[0].delta) to check each field independently (use separate if
checks for delta.tool_calls, getattr(delta, "reasoning_content", None), and
delta.content) so all applicable render_* methods run for a single chunk; apply
the same change to the other identical block later in the file that also uses
render_tool_call/render_reasoning/render_content.

---

Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 157-171: Create a shared parameter matrix constant (e.g.,
MODEL_PROVIDER_MATRIX) containing the tuple pairs used in
pytest.mark.parametrize and replace the repeated inline lists in each test
decorator (the parametrize calls in this file such as those decorating tests
around lines shown in test_litellm_adapter_streaming.py) with a reference to
that constant; update all affected decorators (the ones at the ranges referenced
in the comment) to use the same constant to eliminate duplication and keep
future updates centralized.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1464fb8 and dee10b3.

📒 Files selected for processing (1)
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py

Comment on lines +82 to +89
if chunk.choices[0].delta.tool_calls is not None:
self.render_tool_call(chunk.choices[0].delta.tool_calls)
elif getattr(chunk.choices[0].delta, "reasoning_content", None) is not None:
text = getattr(chunk.choices[0].delta, "reasoning_content", None)
if text is not None:
self.render_reasoning(text)
elif chunk.choices[0].delta.content is not None:
self.render_content(chunk.choices[0].delta.content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle chunk fields independently instead of elif-chaining.

These branches currently discard data when multiple fields are present in a single chunk (e.g., tool calls plus reasoning/content), which can make these tests flaky or miss valid streamed output.

Suggested fix
-            if chunk.choices[0].delta.tool_calls is not None:
+            if chunk.choices[0].delta.tool_calls is not None:
                 self.render_tool_call(chunk.choices[0].delta.tool_calls)
-            elif getattr(chunk.choices[0].delta, "reasoning_content", None) is not None:
+            if getattr(chunk.choices[0].delta, "reasoning_content", None) is not None:
                 text = getattr(chunk.choices[0].delta, "reasoning_content", None)
                 if text is not None:
                     self.render_reasoning(text)
-            elif chunk.choices[0].delta.content is not None:
+            if chunk.choices[0].delta.content is not None:
                 self.render_content(chunk.choices[0].delta.content)
-        if delta.tool_calls is not None:
+        if delta.tool_calls is not None:
             tool_calls.extend(delta.tool_calls)
-        elif getattr(delta, "reasoning_content", None) is not None:
+        if getattr(delta, "reasoning_content", None) is not None:
             text = getattr(delta, "reasoning_content", None)
             if text is not None:
                 reasoning_contents.append(text)
-        elif delta.content is not None:
+        if delta.content is not None:
             contents.append(delta.content)

Also applies to: 318-325

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`
around lines 82 - 89, The current chunk handling uses an elif chain that drops
subsequent fields when multiple are present; update the logic in the streaming
handler (the block calling render_tool_call, render_reasoning, render_content on
chunk.choices[0].delta) to check each field independently (use separate if
checks for delta.tool_calls, getattr(delta, "reasoning_content", None), and
delta.content) so all applicable render_* methods run for a single chunk; apply
the same change to the other identical block later in the file that also uses
render_tool_call/render_reasoning/render_content.

@leonardmq leonardmq marked this pull request as draft March 3, 2026 17:26
@leonardmq leonardmq marked this pull request as ready for review March 8, 2026 10:44
@leonardmq
Copy link
Collaborator Author

Closing - replaced with a larger PR that does multiturn + streaming + protocols here: #1107

@leonardmq leonardmq closed this Mar 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants