Skip to content

feat: multiturn, streaming, AI SDK and OpenAI protocol support#1107

Open
leonardmq wants to merge 19 commits intomainfrom
leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-protocols
Open

feat: multiturn, streaming, AI SDK and OpenAI protocol support#1107
leonardmq wants to merge 19 commits intomainfrom
leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-protocols

Conversation

@leonardmq
Copy link
Collaborator

@leonardmq leonardmq commented Mar 8, 2026

What does this PR do?

This PR adds support for:

  • multiturn conversations: invoking a task allows passing in an existing trace to continue the conversation
  • streaming: support for OpenAI Chat protocol and AI SDK protocol

Demo here: Kiln-AI/ai-sdk-python-streaming#1

Checklists

  • Tests have been run locally and passed
  • New tests have been added to any work in /lib

Summary by CodeRabbit

  • New Features

    • Resume multi-turn conversations from prior traces.
    • Real-time, re-iterable streaming for progressive model responses.
    • Rich streaming events that surface tool-call lifecycle (input/output/error) to clients.
  • Bug Fixes / Behavior

    • Adapters now enforce session semantics; some will reject session continuation when unsupported.
  • Tests

    • Expanded coverage for multi-turn, streaming, tool-call workflows and edge cases.
  • Chores

    • test_output/ added to ignore list.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 8, 2026

Walkthrough

Threads prior_trace through adapters, adds MultiturnFormatter and initial_messages, introduces StreamingCompletion for litellm, implements AdapterStream and AiSdkStreamConverter for streaming + tool-call orchestration, extends BaseAdapter with OpenAI/AI‑SDK streaming entry points, and adds tests and MCPAdapter guards rejecting prior_trace.

Changes

Cohort / File(s) Summary
Ignore
/.gitignore
Adds test_output/ to the ignore list.
Chat formatting
libs/core/kiln_ai/adapters/chat/__init__.py, libs/core/kiln_ai/adapters/chat/chat_formatter.py
Exports ChatCompletionMessageIncludingLiteLLM, adds initial_messages() API and a MultiturnFormatter to continue prior traces.
Chat formatter tests
libs/core/kiln_ai/adapters/chat/test_chat_formatter.py
New tests for MultiturnFormatter: initial_messages, next_turn behavior, and preservation of tool-call messages.
LiteLLM streaming util
libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py, .../test_litellm_streaming.py
Adds StreamingCompletion async wrapper that yields ModelResponseStream chunks and exposes final response after iteration; tests cover iteration, re-iteration, arg passthrough, and errors.
Streaming event conversion
libs/core/kiln_ai/adapters/model_adapters/stream_events.py, .../test_stream_events.py
Adds AiSdkStreamConverter, AiSdkStreamEvent, ToolCallEvent and enums to convert streaming chunks and tool-call events into AI SDK event streams with stateful tracking and finalize/reset semantics.
Adapter stream orchestration
libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py, .../test_adapter_stream.py
Introduces AdapterStream and _ModelTurnComplete sentinel, multi‑turn streaming loop, tool-call INPUT/OUTPUT events, usage/run aggregation, and AdapterStreamResult; tests cover flow, limits, and errors.
BaseAdapter streaming surface
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py, .../test_base_adapter.py
Adds invoke_openai_stream/invoke_ai_sdk_stream, OpenAIStreamResult/AiSdkStreamResult, _prepare_stream/_finalize_stream hooks, abstract _create_run_stream, and prior_trace propagation; tests updated for streaming.
LiteLlm adapter
libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py, .../test_litellm_adapter*.py
Adds prior_trace handling to _run, implements _create_run_stream, tightens response validation, and updates tests to use acompletion_checking_response and cover multiturn/tool preservation and streaming.
MCP adapter
libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py, .../test_mcp_adapter.py
Propagates prior_trace param and adds explicit NotImplementedError guards rejecting non‑null prior_trace (MCP single‑turn); tests assert the guard.
Test harness updates
multiple test files (.../test_saving_adapter_results.py, .../test_structured_output.py, test_prompt_builders.py, datamodel/test_basemodel.py, etc.)
Many MockAdapter._run signatures expanded to accept **kwargs for prior_trace; added session-continuation and run-generation tests and updated mocks to patch adapter-level completion helpers.
Server tests
libs/server/kiln_server/test_run_api.py
Adds adapter sanity-check test scaffolding for math tools, helpers for test output, and task run trace injections.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant BaseAdapter as BaseAdapter
    participant ChatFormatter as ChatFormatter
    participant AdapterStream as AdapterStream
    participant LiteLLM as LiteLLM
    participant ToolSystem as ToolSystem

    Client->>BaseAdapter: invoke_openai_stream(input, prior_trace)
    BaseAdapter->>ChatFormatter: build_chat_formatter(input, prior_trace)
    ChatFormatter-->>BaseAdapter: initial_messages() (MultiturnFormatter)
    BaseAdapter->>AdapterStream: _prepare_stream(...)
    Client->>AdapterStream: async iterate
    loop turns
        AdapterStream->>LiteLLM: acompletion(messages, stream=True)
        LiteLLM-->>AdapterStream: ModelResponseStream chunks
        AdapterStream->>AdapterStream: parse chunks, detect tool_call
        alt tool_call detected
            AdapterStream->>ToolSystem: call tool(arguments)
            ToolSystem-->>AdapterStream: tool result
            AdapterStream-->>Client: ToolCallEvent (OUTPUT_AVAILABLE)
            AdapterStream->>LiteLLM: acompletion(messages + tool_result)
        end
        AdapterStream-->>Client: AdapterStreamEvent (text/reasoning delta)
    end
    AdapterStream->>BaseAdapter: finalize -> AdapterStreamResult
    BaseAdapter-->>Client: TaskRun
Loading
sequenceDiagram
    participant Client as Client
    participant AiSdkConverter as AiSdkStreamConverter
    participant LiteLLM as LiteLLM

    Client->>AiSdkConverter: new()
    loop for each chunk
        LiteLLM-->>Client: ModelResponseStream (delta)
        Client->>AiSdkConverter: convert_chunk(chunk)
        AiSdkConverter-->>Client: AiSdkStreamEvent (TEXT/REASONING/TOOL events)
    end
    Client->>AiSdkConverter: finalize()
    AiSdkConverter-->>Client: FINISH event (usage/finishReason)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • scosman
  • sfierro

Poem

"🐰 I hopped through traces, stitched turns with care,
Streams of chunks and tool‑calls leapt in the air.
Multiturns remembered old words with delight,
I nudged prior traces, then streamed through the night.
Carrots of code — a small celebratory bite. 🥕"

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.62% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The PR description covers the main features added but is missing required sections: 'Related Issues' is empty and the template's structure is incomplete. However, the core changes are documented with a demo link. Add the 'Related Issues' section with links to related GitHub issues (if applicable). Ensure all template sections are properly completed before merging.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately and concisely summarizes the main changes: adding multiturn conversation support, streaming capabilities, and AI SDK/OpenAI protocol support.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-protocols

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the interaction capabilities with AI models by introducing comprehensive streaming and multi-turn conversation support. It allows users to maintain conversation history across multiple interactions and receive model responses in real-time streams, aligning with both OpenAI and AI SDK standards. This refactoring provides a more dynamic and responsive user experience, particularly for complex tasks involving tool use and extended dialogues.

Highlights

  • Multiturn Conversation Support: Introduced the ability to continue conversations by passing an existing trace to tasks, enabling stateful interactions with models. This is facilitated by a new MultiturnFormatter.
  • OpenAI Chat Protocol Streaming: Added support for streaming responses that adhere to the OpenAI Chat protocol, allowing for real-time chunk delivery from models.
  • AI SDK Protocol Streaming: Implemented streaming capabilities compatible with the AI SDK protocol, providing granular events for text, reasoning, and tool call lifecycles.
  • Core Streaming Orchestration: Developed AdapterStream to orchestrate complex streaming flows, managing chat turns, tool calls, and event emission for both OpenAI and AI SDK protocols.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • .gitignore
    • Added 'test_output/' to ignore directory for test artifacts.
  • libs/core/kiln_ai/adapters/chat/init.py
    • Exported new type aliases ChatCompletionMessageIncludingLiteLLM and MultiturnFormatter.
  • libs/core/kiln_ai/adapters/chat/chat_formatter.py
    • Defined ChatCompletionMessageIncludingLiteLLM as a TypeAlias for chat messages.
    • Added an initial_messages method to the ChatFormatter abstract base class.
    • Implemented a new MultiturnFormatter class to handle conversation continuation with a prior_trace.
    • Modified build_chat_formatter to instantiate MultiturnFormatter when a prior_trace is provided.
  • libs/core/kiln_ai/adapters/chat/test_chat_formatter.py
    • Added new test cases for the MultiturnFormatter to verify its functionality with initial messages, next turns, and preservation of tool call messages.
  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
    • Added StreamingCompletion class, an async iterable wrapper for litellm.acompletion to handle streaming responses and assemble final model responses.
  • libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
    • Added comprehensive unit tests for the StreamingCompletion class, covering chunk yielding, response availability, argument passing, and state resetting.
  • libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py
    • Added AdapterStream class to orchestrate streaming task execution, managing chat turns, tool calls, and yielding ModelResponseStream chunks and ToolCallEvent instances.
    • Defined AdapterStreamResult to encapsulate the final run output and usage from a streaming process.
    • Introduced _ModelTurnComplete as an internal sentinel for model turn completion within the stream.
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
    • Updated invoke and _run_returning_run_output methods to accept an optional prior_trace for multi-turn conversations.
    • Added invoke_openai_stream and invoke_ai_sdk_stream methods to provide streaming interfaces for OpenAI and AI SDK protocols, respectively.
    • Introduced _prepare_stream and _finalize_stream helper methods for managing the lifecycle of streaming invocations.
    • Modified the _run abstract method signature to include an optional prior_trace parameter.
    • Added a new abstract method _create_run_stream for adapters to implement streaming functionality.
    • Updated build_chat_formatter to use MultiturnFormatter when a prior_trace is present.
    • Refactored generate_run to create TaskOutput as a separate object before constructing TaskRun.
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
    • Removed the local ChatCompletionMessageIncludingLiteLLM TypeAlias, now importing it from kiln_ai.adapters.chat.
    • Updated the _run method to accept an optional prior_trace and initialize messages using chat_formatter.initial_messages().
    • Implemented the _create_run_stream method to return an AdapterStream instance for streaming operations.
    • Modified acompletion_checking_response to accept Any for kwargs for broader compatibility.
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py
    • Updated _run, invoke, and invoke_returning_run_output methods to accept an optional prior_trace parameter.
    • Added logic to raise NotImplementedError in MCPAdapter methods if prior_trace is provided, as MCP tools do not support session continuation.
  • libs/core/kiln_ai/adapters/model_adapters/stream_events.py
    • Added AiSdkEventType enum to define various event types for the AI SDK streaming protocol.
    • Introduced AiSdkStreamEvent dataclass for structured AI SDK stream events.
    • Defined ToolCallEventType enum and ToolCallEvent dataclass for tool call specific events.
    • Implemented AiSdkStreamConverter to convert raw OpenAI streaming chunks and ToolCallEvent instances into AI SDK protocol events.
  • libs/core/kiln_ai/adapters/model_adapters/test_adapter_stream.py
    • Added new test file for AdapterStream to verify its functionality, including simple content responses, tool call event emission, handling of task_response tool calls, and error conditions like too many tool calls or unparseable arguments.
  • libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py
    • Updated _run method signatures in mock adapters to accept **kwargs.
    • Added tests to confirm that build_chat_formatter correctly returns MultiturnFormatter when prior_trace is provided.
    • Introduced tests for invoke_openai_stream and invoke_ai_sdk_stream to ensure proper error handling for unsupported adapters and correct event emission/state management for AI SDK streams.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py
    • Updated _run method signature to accept prior_trace and added tests to confirm MultiturnFormatter usage and preservation of tool calls within the trace.
    • Modified existing tests to use acompletion_checking_response instead of directly mocking litellm.acompletion.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
    • Added new paid test file to validate OpenAI and AI SDK streaming functionality across various models, including tests for reasoning content, tool calls, and session continuation with prior_trace.
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py
    • Updated litellm.acompletion mocks to use acompletion_checking_response for consistency and better testing practices.
  • libs/core/kiln_ai/adapters/model_adapters/test_mcp_adapter.py
    • Added tests to explicitly verify that MCPAdapter raises NotImplementedError when prior_trace is used in invoke, invoke_returning_run_output, or _run methods, confirming its single-turn nature.
  • libs/core/kiln_ai/adapters/model_adapters/test_saving_adapter_results.py
    • Updated _run method signature in mock adapter to accept **kwargs.
    • Added tests for invoke with prior_trace to ensure session continuation correctly creates new runs and saves them.
    • Added a test for generate_run to confirm it always creates a new TaskRun instance.
  • libs/core/kiln_ai/adapters/test_prompt_adaptors.py
    • Updated litellm.acompletion mock to use acompletion_checking_response for consistency.
  • libs/core/kiln_ai/adapters/test_prompt_builders.py
    • Updated _run method signature in mock adapter to accept **kwargs.
  • libs/core/kiln_ai/datamodel/test_basemodel.py
    • Updated _run method signature in mock adapter to accept **kwargs.
  • libs/server/kiln_server/test_run_api.py
    • Added a default trace to task_run_setup fixture for more realistic test scenarios.
    • Introduced new fixtures adapter_sanity_check_setup and adapter_sanity_check_math_tools_setup for paid tests.
    • Added _assert_math_tools_response helper function to validate responses from math tool interactions.
    • Included paid tests for test_run_task_adapter_sanity_math_tools to verify built-in math tools work correctly across independent runs via the API.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link

github-actions bot commented Mar 8, 2026

📊 Coverage Report

Overall Coverage: 91%

Diff: origin/main...HEAD

  • libs/core/kiln_ai/adapters/chat/chat_formatter.py (92.0%): Missing lines 288,293
  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py (100%)
  • libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py (92.8%): Missing lines 28,78,93,103,122,132,172,202,279,291
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py (80.7%): Missing lines 65,308,320-321,337-339,341-343,345-349,352,358-359,363,366,378,382,386,391,393,395
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py (53.8%): Missing lines 270-272,274-275,279
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py (88.9%): Missing lines 99
  • libs/core/kiln_ai/adapters/model_adapters/stream_events.py (93.9%): Missing lines 95,207-209,276,280-283

Summary

  • Total: 496 lines
  • Missing: 54 lines
  • Coverage: 89%

Line-by-line

View line-by-line diff coverage

libs/core/kiln_ai/adapters/chat/chat_formatter.py

Lines 284-297

  284             return ChatTurn(messages=[user_msg], final_call=True)
  285 
  286         if self._state == "awaiting_final":
  287             if previous_output is None:
! 288                 raise ValueError("previous_output required for final step")
  289             self._messages.append(BasicChatMessage("assistant", previous_output))
  290             self._state = "done"
  291             return None
  292 
! 293         return None
  294 
  295 
  296 def get_chat_formatter(
  297     strategy: ChatStrategy,

libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py

Lines 24-32

  24 from kiln_ai.adapters.run_output import RunOutput
  25 from kiln_ai.datamodel import Usage
  26 
  27 if TYPE_CHECKING:
! 28     from kiln_ai.adapters.model_adapters.litellm_adapter import LiteLlmAdapter
  29 
  30 MAX_CALLS_PER_TURN = 10
  31 MAX_TOOL_CALLS_PER_TURN = 30

Lines 74-82

  74                 "AdapterStream has not been iterated yet. "
  75                 "Use 'async for event in stream:' before accessing .result"
  76             )
  77         if self._result is None:
! 78             raise RuntimeError("AdapterStream completed without producing a result")
  79         return self._result
  80 
  81     async def __aiter__(self) -> AsyncIterator[AdapterStreamEvent]:
  82         self._result = None

Lines 89-97

  89 
  90         while True:
  91             turns += 1
  92             if turns > MAX_CALLS_PER_TURN:
! 93                 raise RuntimeError(
  94                     f"Too many turns ({turns}). Stopping iteration to avoid using too many tokens."
  95                 )
  96 
  97             turn = self._chat_formatter.next_turn(prior_output)

Lines 99-107

   99                 break
  100 
  101             for message in turn.messages:
  102                 if message.content is None:
! 103                     raise ValueError("Empty message content isn't allowed")
  104                 self._messages.append(
  105                     {"role": message.role, "content": message.content}  # type: ignore[arg-type]
  106                 )

Lines 118-126

  118                 else:
  119                     yield event
  120 
  121             if not prior_output:
! 122                 raise RuntimeError("No assistant message/output returned from model")
  123 
  124         logprobs = self._adapter._extract_and_validate_logprobs(final_choice)
  125 
  126         intermediate_outputs = self._chat_formatter.intermediate_outputs()

Lines 128-136

  128             final_choice, intermediate_outputs
  129         )
  130 
  131         if not isinstance(prior_output, str):
! 132             raise RuntimeError(f"assistant message is not a string: {prior_output}")
  133 
  134         trace = self._adapter.all_messages_to_trace(self._messages)
  135         self._result = AdapterStreamResult(
  136             run_output=RunOutput(

Lines 168-176

  168 
  169             content = response_choice.message.content
  170             tool_calls = response_choice.message.tool_calls
  171             if not content and not tool_calls:
! 172                 raise ValueError(
  173                     "Model returned an assistant message, but no content or tool calls. This is not supported."
  174                 )
  175 
  176             self._messages.append(response_choice.message)

Lines 198-206

  198                     usage=usage,
  199                 )
  200                 return
  201 
! 202             raise RuntimeError(
  203                 "Model returned neither content nor tool calls. It must return at least one of these."
  204             )
  205 
  206         raise RuntimeError(

Lines 275-283

  275         or not response.choices
  276         or len(response.choices) == 0
  277         or not isinstance(response.choices[0], Choices)
  278     ):
! 279         raise RuntimeError(
  280             f"Expected ModelResponse with Choices, got {type(response)}."
  281         )
  282     return response, response.choices[0]

Lines 287-292

  287 ) -> str:
  288     for tc in tool_calls:
  289         if tc.id == tool_call_id:
  290             return tc.function.name or "unknown"
! 291     return "unknown"

libs/core/kiln_ai/adapters/model_adapters/base_adapter.py

Lines 61-69

  61 from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
  62 from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
  63 
  64 if TYPE_CHECKING:
! 65     from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream
  66 
  67 
  68 @dataclass
  69 class AdapterConfig:

Lines 304-312

  304         input: InputType,
  305         prior_trace: list[ChatCompletionMessageParam] | None,
  306     ) -> AdapterStream:
  307         if self.input_schema is not None:
! 308             validate_schema_with_value_error(
  309                 input,
  310                 self.input_schema,
  311                 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  312                 require_object=False,

Lines 316-325

  316 
  317         formatted_input = input
  318         formatter_id = self.model_provider().formatter
  319         if formatter_id is not None:
! 320             formatter = request_formatter_from_id(formatter_id)
! 321             formatted_input = formatter.format_input(input)
  322 
  323         return self._create_run_stream(formatted_input, prior_trace)
  324 
  325     def _finalize_stream(

Lines 333-356

  333         At the end of the stream, we still need to validate the output, create a run and everything
  334         else that a non-streaming invocation would do.
  335         """
  336 
! 337         result: AdapterStreamResult = adapter_stream.result
! 338         run_output = result.run_output
! 339         usage = result.usage
  340 
! 341         provider = self.model_provider()
! 342         parser = model_parser_from_id(provider.parser)
! 343         parsed_output = parser.parse_output(original_output=run_output)
  344 
! 345         if self.output_schema is not None:
! 346             if isinstance(parsed_output.output, str):
! 347                 parsed_output.output = parse_json_string(parsed_output.output)
! 348             if not isinstance(parsed_output.output, dict):
! 349                 raise RuntimeError(
  350                     f"structured response is not a dict: {parsed_output.output}"
  351                 )
! 352             validate_schema_with_value_error(
  353                 parsed_output.output,
  354                 self.output_schema,
  355                 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  356             )

Lines 354-370

  354                 self.output_schema,
  355                 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
  356             )
  357         else:
! 358             if not isinstance(parsed_output.output, str):
! 359                 raise RuntimeError(
  360                     f"response is not a string for non-structured task: {parsed_output.output}"
  361                 )
  362 
! 363         trace_has_toolcalls = parsed_output.trace is not None and any(
  364             message.get("role", None) == "tool" for message in parsed_output.trace
  365         )
! 366         if (
  367             provider.reasoning_capable
  368             and (
  369                 not parsed_output.intermediate_outputs
  370                 or "reasoning" not in parsed_output.intermediate_outputs

Lines 374-399

  374                 and self.has_structured_output()
  375             )
  376             and not trace_has_toolcalls
  377         ):
! 378             raise RuntimeError(
  379                 "Reasoning is required for this model, but no reasoning was returned."
  380             )
  381 
! 382         run = self.generate_run(
  383             input, input_source, parsed_output, usage, run_output.trace
  384         )
  385 
! 386         if (
  387             self.base_adapter_config.allow_saving
  388             and Config.shared().autosave_runs
  389             and self.task.path is not None
  390         ):
! 391             run.save_to_file()
  392         else:
! 393             run.id = None
  394 
! 395         return run
  396 
  397     def has_structured_output(self) -> bool:
  398         return self.output_schema is not None

libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py

Lines 266-283

  266         self,
  267         input: InputType,
  268         prior_trace: list[ChatCompletionMessageParam] | None = None,
  269     ) -> AdapterStream:
! 270         provider = self.model_provider()
! 271         if not provider.model_id:
! 272             raise ValueError("Model ID is required for OpenAI compatible models")
  273 
! 274         chat_formatter = self.build_chat_formatter(input, prior_trace)
! 275         initial_messages: list[ChatCompletionMessageIncludingLiteLLM] = copy.deepcopy(
  276             chat_formatter.initial_messages()
  277         )
  278 
! 279         return AdapterStream(
  280             adapter=self,
  281             provider=provider,
  282             chat_formatter=chat_formatter,
  283             initial_messages=initial_messages,

libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py

Lines 95-103

   95                 "Session continuation is not supported for MCP adapter. "
   96                 "MCP tools are single-turn and do not maintain conversation state."
   97             )
   98 
!  99         run_output, _ = await self.invoke_returning_run_output(
  100             input, input_source, prior_trace
  101         )
  102         return run_output

libs/core/kiln_ai/adapters/model_adapters/stream_events.py

Lines 91-99

  91                 self._finish_reason = choice.finish_reason
  92 
  93             delta = choice.delta
  94             if delta is None:
! 95                 continue
  96 
  97             reasoning_content = getattr(delta, "reasoning_content", None)
  98             if reasoning_content:
  99                 if not self._reasoning_started:

Lines 203-213

  203                             )
  204                         )
  205 
  206         if not chunk.choices:
! 207             usage = getattr(chunk, "usage", None)
! 208             if usage is not None:
! 209                 self._usage_data = usage
  210 
  211         return events
  212 
  213     def convert_tool_event(self, event: ToolCallEvent) -> list[AiSdkStreamEvent]:

Lines 272-287

  272         if self._finish_reason is not None:
  273             finish_payload["finishReason"] = self._finish_reason.replace("_", "-")
  274 
  275         if self._usage_data is not None:
! 276             usage_payload: dict[str, Any] = {
  277                 "promptTokens": self._usage_data.prompt_tokens,
  278                 "completionTokens": self._usage_data.completion_tokens,
  279             }
! 280             total = getattr(self._usage_data, "total_tokens", None)
! 281             if total is not None:
! 282                 usage_payload["totalTokens"] = total
! 283             finish_payload["usage"] = usage_payload
  284 
  285         if finish_payload:
  286             events.append(
  287                 AiSdkStreamEvent(


Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces comprehensive streaming capabilities and session continuation for AI model interactions. Key changes include the addition of a StreamingCompletion class for async streaming of litellm responses and an AdapterStream to orchestrate multi-turn conversations and tool calls, yielding raw model response chunks and structured ToolCallEvents. A new stream_events.py module defines AiSdkEventType, AiSdkStreamEvent, and AiSdkStreamConverter to transform raw model and tool call events into a standardized AI SDK event format, supporting detailed streaming feedback. The BaseAdapter and LiteLlmAdapter are updated to support prior_trace (conversation history) for continuing multi-turn sessions, utilizing a new MultiturnFormatter. New invoke_openai_stream and invoke_ai_sdk_stream methods are added to BaseAdapter for streaming interactions. The MCPAdapter explicitly raises NotImplementedError for prior_trace, indicating that multi-turn session continuation is not supported for single-turn MCP tools. Extensive unit and paid integration tests have been added or updated to validate these new features, covering various scenarios including tool calls, error handling, and session continuation. Additionally, .gitignore is updated to exclude test_output/, and existing test mocks are adjusted to align with the new streaming architecture.

Note: Security Review did not run due to the size of the PR.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/stream_events.py (1)

287-290: Consider resetting text/reasoning state in reset_for_next_step.

Currently reset_for_next_step only clears _tool_calls_state and _finish_reason. If a model turn ends with active reasoning or text (without an explicit end event), and then tool calls occur followed by a new model turn, the _text_started and _reasoning_started flags remain set. This could cause issues if the next model turn starts emitting new text or reasoning without proper START events.

However, looking at the usage in base_adapter.py (Context snippet 1), reset_for_next_step is called when transitioning from a ToolCallEvent to a new ModelResponseStream, which typically represents a new model response after tool execution. At this point, the previous model turn should have completed its text/reasoning blocks naturally.

Consider also resetting text/reasoning state for robustness
 def reset_for_next_step(self) -> None:
     """Reset per-step state between LLM calls in a multi-step flow."""
     self._tool_calls_state = {}
     self._finish_reason = None
+    # Reset text/reasoning state for new model turn
+    self._text_started = False
+    self._reasoning_started = False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py` around lines 287
- 290, reset_for_next_step currently only clears _tool_calls_state and
_finish_reason but leaves _text_started and _reasoning_started set, which can
cause the next model turn to misinterpret new text/reasoning emits; update
reset_for_next_step in stream_events.py to also reset the per-turn
text/reasoning state by clearing _text_started and _reasoning_started (and any
associated per-turn buffers such as _current_text_block/_current_reasoning_block
if present) so a new ModelResponseStream begins with a clean slate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libs/core/kiln_ai/adapters/chat/chat_formatter.py`:
- Around line 273-283: initial_messages() returns the prior trace but _messages
is never populated from _prior_trace, so messages() and message_dicts() only
include new turns; ensure formatter-owned state preserves the prior trace by
seeding _messages from _prior_trace when starting the conversation. Modify
next_turn (the branch where self._state == "start") to initialize or extend
self._messages with list(self._prior_trace) before appending the new
BasicChatMessage, so the existing prior trace is retained for subsequent
messages() / message_dicts() calls (references: initial_messages, next_turn,
_prior_trace, _messages, messages(), message_dicts()).

In `@libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py`:
- Around line 54-68: The issue is that AdapterStream reuses the same mutable
_messages list across iterations, causing duplicated conversation traces; fix by
storing the original initial_messages in a new attribute (e.g.,
_initial_messages) inside __init__ and on __aiter__ reset self._messages =
list(self._initial_messages) (or a shallow copy) in addition to clearing
self._result and self._iterated so each iteration starts with a fresh message
buffer; update both __init__ (add _initial_messages) and __aiter__ (reset
_messages) accordingly.
- Around line 178-191: The code is including synthetic "task_response" entries
in the tool call execution and counting, allowing synthetic items to be executed
and bypass MAX_TOOL_CALLS_PER_TURN; update the logic around
self._handle_tool_calls, self._extract_task_response and tool_calls_count so you
first separate/filter out any synthetic task_response (use the presence/format
returned by _extract_task_response or an explicit flag) from the list of
tool_calls, call process/iterate only over the real tool calls, and increment
tool_calls_count by the number of real tool calls processed (not counting the
synthetic task_response); apply the same filtering/counting fix to the later
duplicate block (around lines 214-236) so both paths exclude task_response from
execution and from the MAX_TOOL_CALLS_PER_TURN accounting.

In `@libs/server/kiln_server/test_run_api.py`:
- Around line 1757-1785: The _assert_math_tools_response helper currently allows
false positives; update it to (1) assert that the assistant's final content
includes the exact bracketed final answer pattern (e.g., contains "[" +
expected_in_output + "]"), (2) assert that the most recent user message in trace
matches the current user prompt (verify by finding the last message with role
"user" and checking its content equals expected_in_output or another provided
prompt string), and (3) tighten tool call counts by enforcing scenario-specific
minima: require at least 2 tool-calling assistant messages for multi-step cases
(use assistant_with_tool_calls length) and at least 1 tool message for simple
cases, or accept a passed-in minimum count parameter; also keep existing checks
for output, tool messages, last_assistant, and that intermediate_outputs values
are strings. Reference: function _assert_math_tools_response, variables output,
expected_in_output, trace, assistant_with_tool_calls, tool_messages,
last_assistant, intermediate_outputs.
- Around line 1686-1729: The fixture adapter_sanity_check_setup should stop
using the hardcoded /Users/.../Downloads path and instead default to using the
pytest-provided tmp_path; update adapter_sanity_check_setup to construct
project_path under tmp_path (e.g., tmp_path / "adapter_sanity_project" /
"project.kiln"), create parent dirs as needed, and create/load Project and Task
the same way; add support for an opt-in environment variable (e.g.,
KILN_USE_PERSISTENT_ADAPTER_SANITY_PATH) that, when set, uses the original
persistent location for manual runs; ensure you still update
Config.shared()._settings["projects"] and restore original_projects in the
teardown exactly as before.

---

Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py`:
- Around line 287-290: reset_for_next_step currently only clears
_tool_calls_state and _finish_reason but leaves _text_started and
_reasoning_started set, which can cause the next model turn to misinterpret new
text/reasoning emits; update reset_for_next_step in stream_events.py to also
reset the per-turn text/reasoning state by clearing _text_started and
_reasoning_started (and any associated per-turn buffers such as
_current_text_block/_current_reasoning_block if present) so a new
ModelResponseStream begins with a clean slate.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b95fd129-637e-46f9-bf66-5f387475d44d

📥 Commits

Reviewing files that changed from the base of the PR and between f18aa3b and eb537ed.

📒 Files selected for processing (25)
  • .gitignore
  • libs/core/kiln_ai/adapters/chat/__init__.py
  • libs/core/kiln_ai/adapters/chat/chat_formatter.py
  • libs/core/kiln_ai/adapters/chat/test_chat_formatter.py
  • libs/core/kiln_ai/adapters/litellm_utils/__init__.py
  • libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
  • libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
  • libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py
  • libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/stream_events.py
  • libs/core/kiln_ai/adapters/model_adapters/test_adapter_stream.py
  • libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.py
  • libs/core/kiln_ai/adapters/model_adapters/test_mcp_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_saving_adapter_results.py
  • libs/core/kiln_ai/adapters/model_adapters/test_stream_events.py
  • libs/core/kiln_ai/adapters/model_adapters/test_structured_output.py
  • libs/core/kiln_ai/adapters/test_prompt_adaptors.py
  • libs/core/kiln_ai/adapters/test_prompt_builders.py
  • libs/core/kiln_ai/datamodel/test_basemodel.py
  • libs/server/kiln_server/test_run_api.py

Comment on lines +273 to +283
def initial_messages(self) -> list[ChatCompletionMessageIncludingLiteLLM]:
"""Messages to seed the conversation (prior trace)."""
return list(self._prior_trace)

def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
if self._state == "start":
# prior trace is already in the messages list and contains system and so on, we only need
# to append the latest new user message
user_msg = BasicChatMessage("user", format_user_message(self.user_input))
self._state = "awaiting_final"
self._messages.append(user_msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the prior trace in formatter-owned state.

initial_messages() carries the continuation history, but _messages is still empty here, so messages() and message_dicts() only describe the newly appended turn. Any caller that serializes the formatter state will drop the earlier conversation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/chat/chat_formatter.py` around lines 273 - 283,
initial_messages() returns the prior trace but _messages is never populated from
_prior_trace, so messages() and message_dicts() only include new turns; ensure
formatter-owned state preserves the prior trace by seeding _messages from
_prior_trace when starting the conversation. Modify next_turn (the branch where
self._state == "start") to initialize or extend self._messages with
list(self._prior_trace) before appending the new BasicChatMessage, so the
existing prior trace is retained for subsequent messages() / message_dicts()
calls (references: initial_messages, next_turn, _prior_trace, _messages,
messages(), message_dicts()).

Comment on lines +54 to +68
def __init__(
self,
adapter: LiteLlmAdapter,
provider: KilnModelProvider,
chat_formatter: ChatFormatter,
initial_messages: list[ChatCompletionMessageIncludingLiteLLM],
top_logprobs: int | None,
) -> None:
self._adapter = adapter
self._provider = provider
self._chat_formatter = chat_formatter
self._messages = initial_messages
self._top_logprobs = top_logprobs
self._result: AdapterStreamResult | None = None
self._iterated = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Reset _messages before a second iteration.

__aiter__() clears result and _iterated, but it reuses the message buffer mutated by the previous run. Re-iterating the same AdapterStream will resend the old conversation and duplicate the saved trace.

🛠️ Proposed fix
     def __init__(
         self,
         adapter: LiteLlmAdapter,
         provider: KilnModelProvider,
         chat_formatter: ChatFormatter,
         initial_messages: list[ChatCompletionMessageIncludingLiteLLM],
         top_logprobs: int | None,
     ) -> None:
         self._adapter = adapter
         self._provider = provider
         self._chat_formatter = chat_formatter
-        self._messages = initial_messages
+        self._initial_messages = list(initial_messages)
+        self._messages = list(self._initial_messages)
         self._top_logprobs = top_logprobs
         self._result: AdapterStreamResult | None = None
         self._iterated = False
@@
     async def __aiter__(self) -> AsyncIterator[AdapterStreamEvent]:
         self._result = None
         self._iterated = False
+        self._messages = list(self._initial_messages)

Also applies to: 81-83

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py` around lines 54
- 68, The issue is that AdapterStream reuses the same mutable _messages list
across iterations, causing duplicated conversation traces; fix by storing the
original initial_messages in a new attribute (e.g., _initial_messages) inside
__init__ and on __aiter__ reset self._messages = list(self._initial_messages)
(or a shallow copy) in addition to clearing self._result and self._iterated so
each iteration starts with a fresh message buffer; update both __init__ (add
_initial_messages) and __aiter__ (reset _messages) accordingly.

Comment on lines +178 to +191
if tool_calls and len(tool_calls) > 0:
async for event in self._handle_tool_calls(tool_calls):
yield event

assistant_msg = self._extract_task_response(tool_calls)
if assistant_msg is not None:
yield _ModelTurnComplete(
assistant_message=assistant_msg,
model_choice=response_choice,
usage=usage,
)
return

tool_calls_count += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Exclude task_response from execution and count real tool calls.

task_response is treated as synthetic for events, but the full list still goes into process_tool_calls(), and the guard only increments once per round. That can execute the synthetic tool and lets a large batch of tool calls bypass MAX_TOOL_CALLS_PER_TURN.

🛠️ Proposed fix
-            if tool_calls and len(tool_calls) > 0:
-                async for event in self._handle_tool_calls(tool_calls):
-                    yield event
+            if tool_calls:
+                real_tool_calls = [
+                    tc for tc in tool_calls if tc.function.name != "task_response"
+                ]
+                tool_calls_count += len(real_tool_calls)
+                if tool_calls_count > MAX_TOOL_CALLS_PER_TURN:
+                    raise RuntimeError(
+                        f"Too many tool calls ({tool_calls_count}). Stopping iteration to avoid using too many tokens."
+                    )
+                if real_tool_calls:
+                    async for event in self._handle_tool_calls(real_tool_calls):
+                        yield event
 
                 assistant_msg = self._extract_task_response(tool_calls)
                 if assistant_msg is not None:
                     yield _ModelTurnComplete(
                         assistant_message=assistant_msg,
                         model_choice=response_choice,
                         usage=usage,
                     )
                     return
 
-                tool_calls_count += 1
-                continue
+                if real_tool_calls:
+                    continue
-        real_tool_calls = [
-            tc for tc in tool_calls if tc.function.name != "task_response"
-        ]
-
-        for tc in real_tool_calls:
+        for tc in tool_calls:
             try:
                 parsed_args = json.loads(tc.function.arguments)
             except (json.JSONDecodeError, TypeError):
                 parsed_args = None

Also applies to: 214-236

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py` around lines 178
- 191, The code is including synthetic "task_response" entries in the tool call
execution and counting, allowing synthetic items to be executed and bypass
MAX_TOOL_CALLS_PER_TURN; update the logic around self._handle_tool_calls,
self._extract_task_response and tool_calls_count so you first separate/filter
out any synthetic task_response (use the presence/format returned by
_extract_task_response or an explicit flag) from the list of tool_calls, call
process/iterate only over the real tool calls, and increment tool_calls_count by
the number of real tool calls processed (not counting the synthetic
task_response); apply the same filtering/counting fix to the later duplicate
block (around lines 214-236) so both paths exclude task_response from execution
and from the MAX_TOOL_CALLS_PER_TURN accounting.

Comment on lines +1686 to +1729
@pytest.fixture
def adapter_sanity_check_setup(tmp_path):
"""Setup for paid adapter sanity check tests - real project/task, no adapter mocking."""
# if project at the path does not exist, create it, otherwise reuse
project_path = (
Path("/Users/leonardmarcq/Downloads/")
/ "adapter_sanity_project"
/ "project.kiln"
)
if not project_path.exists():
project_path.parent.mkdir()

project = Project(name="Adapter Sanity Project", path=str(project_path))
project.save_to_file()

task = Task(
name="Adapter Sanity Task",
instruction="You are a helpful assistant. Respond concisely.",
description="Task for adapter sanity checking",
parent=project,
)
task.save_to_file()

else:
project = Project.load_from_file(project_path)
task = next(
(
t
for t in project.tasks(readonly=True)
if t.name == "Adapter Sanity Task"
),
None,
)
if task is None:
raise ValueError("Task not found")

config = Config.shared()
original_projects = list(config.projects) if config.projects else []
config._settings["projects"] = [*original_projects, str(project.path)]

yield {"project": project, "task": task}

config._settings["projects"] = original_projects

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the adapter sanity fixture machine-agnostic.

This writes into a fixed /Users/.../Downloads location and reuses whatever is already there, so even as a paid/manual test it's non-portable and can silently depend on stale local state. Default to tmp_path, and only use a persistent location when an env var explicitly opts into it.

Suggested change
 `@pytest.fixture`
 def adapter_sanity_check_setup(tmp_path):
     """Setup for paid adapter sanity check tests - real project/task, no adapter mocking."""
-    # if project at the path does not exist, create it, otherwise reuse
-    project_path = (
-        Path("/Users/leonardmarcq/Downloads/")
-        / "adapter_sanity_project"
-        / "project.kiln"
-    )
+    project_root = os.environ.get("KILN_ADAPTER_SANITY_PROJECT_DIR")
+    if project_root:
+        project_path = Path(project_root) / "project.kiln"
+    else:
+        project_path = tmp_path / "adapter_sanity_project" / "project.kiln"
+
     if not project_path.exists():
-        project_path.parent.mkdir()
+        project_path.parent.mkdir(parents=True, exist_ok=True)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/server/kiln_server/test_run_api.py` around lines 1686 - 1729, The
fixture adapter_sanity_check_setup should stop using the hardcoded
/Users/.../Downloads path and instead default to using the pytest-provided
tmp_path; update adapter_sanity_check_setup to construct project_path under
tmp_path (e.g., tmp_path / "adapter_sanity_project" / "project.kiln"), create
parent dirs as needed, and create/load Project and Task the same way; add
support for an opt-in environment variable (e.g.,
KILN_USE_PERSISTENT_ADAPTER_SANITY_PATH) that, when set, uses the original
persistent location for manual runs; ensure you still update
Config.shared()._settings["projects"] and restore original_projects in the
teardown exactly as before.

Comment on lines +1757 to +1785
def _assert_math_tools_response(res: dict, expected_in_output: str) -> None:
"""Assert response has correct output, trace with tool calls, and output matches latest message."""
assert res["id"] is not None

output = res.get("output", {}).get("output", "")
assert output is not None
assert expected_in_output in output

trace = res.get("trace") or []
assistant_with_tool_calls = [
m for m in trace if m.get("role") == "assistant" and m.get("tool_calls")
]
assert len(assistant_with_tool_calls) >= 1

tool_messages = [m for m in trace if m.get("role") == "tool"]
assert len(tool_messages) >= 1

last_assistant = next(
(m for m in reversed(trace) if m.get("role") == "assistant"), None
)
assert last_assistant is not None
last_content = last_assistant.get("content") or ""
assert expected_in_output in last_content

intermediate_outputs = res.get("intermediate_outputs") or {}
if intermediate_outputs:
for key, value in intermediate_outputs.items():
assert isinstance(value, str)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Tighten the math-tool sanity assertions.

Right now expected_in_output in output plus >= 1 tool message can false-pass the single-digit cases, and the 7 * 8 + 3 run can still pass after only one tool call. Per Line 1742, assert the bracketed final answer, the current user prompt, and the minimum tool-message count for each scenario.

Suggested change
-def _assert_math_tools_response(res: dict, expected_in_output: str) -> None:
+def _assert_math_tools_response(
+    res: dict,
+    *,
+    expected_answer: str,
+    expected_prompt: str,
+    expected_min_tool_messages: int,
+) -> None:
     """Assert response has correct output, trace with tool calls, and output matches latest message."""
     assert res["id"] is not None
 
     output = res.get("output", {}).get("output", "")
     assert output is not None
-    assert expected_in_output in output
+    assert output.rstrip().endswith(f"[{expected_answer}]")
 
     trace = res.get("trace") or []
+    user_messages = [m.get("content") for m in trace if m.get("role") == "user"]
+    assert user_messages == [expected_prompt]
+
     assistant_with_tool_calls = [
         m for m in trace if m.get("role") == "assistant" and m.get("tool_calls")
     ]
     assert len(assistant_with_tool_calls) >= 1
 
     tool_messages = [m for m in trace if m.get("role") == "tool"]
-    assert len(tool_messages) >= 1
+    assert len(tool_messages) >= expected_min_tool_messages
 
     last_assistant = next(
         (m for m in reversed(trace) if m.get("role") == "assistant"), None
     )
     assert last_assistant is not None
     last_content = last_assistant.get("content") or ""
-    assert expected_in_output in last_content
+    assert last_content.rstrip().endswith(f"[{expected_answer}]")
@@
-    _assert_math_tools_response(res1, "4")
+    _assert_math_tools_response(
+        res1,
+        expected_answer="4",
+        expected_prompt="What is 2 + 2? Use the tools to calculate.",
+        expected_min_tool_messages=1,
+    )
@@
-    _assert_math_tools_response(res2, "12")
+    _assert_math_tools_response(
+        res2,
+        expected_answer="12",
+        expected_prompt="What is 3 times 4? Use the tools to calculate.",
+        expected_min_tool_messages=1,
+    )
@@
-    _assert_math_tools_response(res3, "59")
+    _assert_math_tools_response(
+        res3,
+        expected_answer="59",
+        expected_prompt="What is 7 times 8 plus 3? Use the tools to calculate.",
+        expected_min_tool_messages=2,
+    )
@@
-    _assert_math_tools_response(res4, "7")
+    _assert_math_tools_response(
+        res4,
+        expected_answer="7",
+        expected_prompt="What is 10 minus 3? Use the tools to calculate.",
+        expected_min_tool_messages=1,
+    )

Also applies to: 1814-1856

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/server/kiln_server/test_run_api.py` around lines 1757 - 1785, The
_assert_math_tools_response helper currently allows false positives; update it
to (1) assert that the assistant's final content includes the exact bracketed
final answer pattern (e.g., contains "[" + expected_in_output + "]"), (2) assert
that the most recent user message in trace matches the current user prompt
(verify by finding the last message with role "user" and checking its content
equals expected_in_output or another provided prompt string), and (3) tighten
tool call counts by enforcing scenario-specific minima: require at least 2
tool-calling assistant messages for multi-step cases (use
assistant_with_tool_calls length) and at least 1 tool message for simple cases,
or accept a passed-in minimum count parameter; also keep existing checks for
output, tool messages, last_assistant, and that intermediate_outputs values are
strings. Reference: function _assert_math_tools_response, variables output,
expected_in_output, trace, assistant_with_tool_calls, tool_messages,
last_assistant, intermediate_outputs.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (1)

355-367: Make the continuation assertion prove prior_trace was used.

Right now this only checks that the second call streamed something, which still passes if prior_trace is accidentally dropped and the model answers generically. Aggregate the continuation text and assert it references the first turn’s result so the test actually guards the multi-turn path.

Suggested assertion
     continuation_chunks: list[litellm.ModelResponseStream] = []
+    continuation_text_parts: list[str] = []
     async for chunk in adapter.invoke_openai_stream(
         input="What was the result? Reply in one short sentence.",
         prior_trace=initial_run.trace,
     ):
         continuation_chunks.append(chunk)
+        if chunk.choices:
+            delta = chunk.choices[0].delta
+            if delta is not None and delta.content is not None:
+                continuation_text_parts.append(delta.content)

     _dump_paid_test_output(request, continuation_chunks=continuation_chunks)
     assert len(continuation_chunks) > 0, "No continuation chunks collected"
+    assert "444" in "".join(continuation_text_parts), (
+        "Continuation response did not reflect the previous turn"
+    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`
around lines 355 - 367, The test only checks that adapter.invoke_openai_stream
produced continuation_chunks but not that prior_trace was honored; aggregate the
streamed text from continuation_chunks (e.g., join the chunk text fields) and
assert the aggregated continuation references the first turn's result from
initial_run (for the "123 + 321 = ?" prompt assert the continuation contains
"444" or otherwise includes the value/text from initial_run.trace), using
adapter.invoke_openai_stream, initial_run, prior_trace and continuation_chunks
to locate where to change the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py`:
- Line 84: _self._usage_data is currently overwritten by each tool step so
finalize() reports only the last turn; change the logic that assigns to
_usage_data to merge/accumulate usage instead: when a new usage dict/object
arrives (where code currently does self._usage_data = ... in the stream event
handlers), if self._usage_data is None set it to the new usage, otherwise sum
numeric token fields (e.g., prompt_tokens, completion_tokens, total_tokens) and
merge any other keys (prefer summing counts and keeping latest non-numeric
values) into the existing self._usage_data; ensure finalize() then reads the
accumulated totals. Update all places that set _usage_data (the spots handling
model usage events and where _usage_data is currently assigned) to use this
merge/accumulate logic and keep the attribute type consistent.
- Around line 250-295: In finalize(), emit a matching finish-step event before
the terminal FINISH to complete the start/start-step lifecycle: add
events.append(AiSdkStreamEvent(AiSdkEventType.FINISH_STEP)) (or include the
appropriate step id/payload if you track one) immediately before the block that
appends AiSdkEventType.FINISH so the AiSdkStreamConverter produces the
corresponding FINISH_STEP event to mirror BaseAdapter's start-step; update
finalize() (and use AiSdkStreamEvent and AiSdkEventType.FINISH_STEP)
accordingly.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 148-153: The loop over chunks dereferences chunk.choices[0]
without ensuring choices exists; update the loop (the "for chunk in chunks:"
block) to skip chunks with no choices first (e.g., if not chunk.choices:
continue) before checking finish_reason or accessing delta, so you never access
choices[0] on an empty/usage-only chunk.

---

Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 355-367: The test only checks that adapter.invoke_openai_stream
produced continuation_chunks but not that prior_trace was honored; aggregate the
streamed text from continuation_chunks (e.g., join the chunk text fields) and
assert the aggregated continuation references the first turn's result from
initial_run (for the "123 + 321 = ?" prompt assert the continuation contains
"444" or otherwise includes the value/text from initial_run.trace), using
adapter.invoke_openai_stream, initial_run, prior_trace and continuation_chunks
to locate where to change the test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 56fbfe5d-c3d8-47e8-87b0-73664216ad4f

📥 Commits

Reviewing files that changed from the base of the PR and between eb537ed and 66b3151.

📒 Files selected for processing (3)
  • libs/core/kiln_ai/adapters/model_adapters/stream_events.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
  • libs/core/kiln_ai/adapters/model_adapters/test_stream_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • libs/core/kiln_ai/adapters/model_adapters/test_stream_events.py

self._reasoning_block_count = 0
self._tool_calls_state: dict[int, dict[str, Any]] = {}
self._finish_reason: str | None = None
self._usage_data: Any = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Aggregate usage across tool steps instead of overwriting it.

The same converter instance spans the whole streamed run, but _usage_data only keeps one usage object. In a tool loop, later model turns overwrite earlier ones, so finalize() reports only the last turn’s token counts instead of the total for the stream.

Possible fix
@@
-        self._usage_data: Any = None
+        self._usage_totals = {
+            "prompt_tokens": 0,
+            "completion_tokens": 0,
+            "total_tokens": 0,
+        }
+        self._saw_usage = False
@@
-        if not chunk.choices:
-            usage = getattr(chunk, "usage", None)
-            if usage is not None:
-                self._usage_data = usage
+        usage = getattr(chunk, "usage", None)
+        if usage is not None:
+            self._saw_usage = True
+            self._usage_totals["prompt_tokens"] += (
+                getattr(usage, "prompt_tokens", 0) or 0
+            )
+            self._usage_totals["completion_tokens"] += (
+                getattr(usage, "completion_tokens", 0) or 0
+            )
+            self._usage_totals["total_tokens"] += (
+                getattr(usage, "total_tokens", 0) or 0
+            )
@@
-        if self._usage_data is not None:
+        if self._saw_usage:
             usage_payload: dict[str, Any] = {
-                "promptTokens": self._usage_data.prompt_tokens,
-                "completionTokens": self._usage_data.completion_tokens,
+                "promptTokens": self._usage_totals["prompt_tokens"],
+                "completionTokens": self._usage_totals["completion_tokens"],
             }
-            total = getattr(self._usage_data, "total_tokens", None)
-            if total is not None:
-                usage_payload["totalTokens"] = total
+            if self._usage_totals["total_tokens"]:
+                usage_payload["totalTokens"] = self._usage_totals["total_tokens"]

Also applies to: 206-209, 275-283

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py` at line 84,
_self._usage_data is currently overwritten by each tool step so finalize()
reports only the last turn; change the logic that assigns to _usage_data to
merge/accumulate usage instead: when a new usage dict/object arrives (where code
currently does self._usage_data = ... in the stream event handlers), if
self._usage_data is None set it to the new usage, otherwise sum numeric token
fields (e.g., prompt_tokens, completion_tokens, total_tokens) and merge any
other keys (prefer summing counts and keeping latest non-numeric values) into
the existing self._usage_data; ensure finalize() then reads the accumulated
totals. Update all places that set _usage_data (the spots handling model usage
events and where _usage_data is currently assigned) to use this merge/accumulate
logic and keep the attribute type consistent.

Comment on lines +250 to +295
def finalize(self) -> list[AiSdkStreamEvent]:
events: list[AiSdkStreamEvent] = []

if self._reasoning_started:
events.append(
AiSdkStreamEvent(
AiSdkEventType.REASONING_END,
{"id": self._reasoning_id},
)
)
self._reasoning_started = False

if self._text_started:
events.append(
AiSdkStreamEvent(
AiSdkEventType.TEXT_END,
{"id": self._text_id},
)
)
self._text_started = False

finish_payload: dict[str, Any] = {}
if self._finish_reason is not None:
finish_payload["finishReason"] = self._finish_reason.replace("_", "-")

if self._usage_data is not None:
usage_payload: dict[str, Any] = {
"promptTokens": self._usage_data.prompt_tokens,
"completionTokens": self._usage_data.completion_tokens,
}
total = getattr(self._usage_data, "total_tokens", None)
if total is not None:
usage_payload["totalTokens"] = total
finish_payload["usage"] = usage_payload

if finish_payload:
events.append(
AiSdkStreamEvent(
AiSdkEventType.FINISH,
{"messageMetadata": finish_payload},
)
)
else:
events.append(AiSdkStreamEvent(AiSdkEventType.FINISH))

return events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Emit finish-step before the terminal finish.

BaseAdapter already emits start / start-step and then hands the remaining lifecycle to AiSdkStreamConverter. finalize() currently closes open blocks and emits only finish, so the AI SDK stream never produces the matching finish-step. That leaves the protocol sequence incomplete and is what test_invoke_ai_sdk_stream() is checking for.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py` around lines 250
- 295, In finalize(), emit a matching finish-step event before the terminal
FINISH to complete the start/start-step lifecycle: add
events.append(AiSdkStreamEvent(AiSdkEventType.FINISH_STEP)) (or include the
appropriate step id/payload if you track one) immediately before the block that
appends AiSdkEventType.FINISH so the AiSdkStreamConverter produces the
corresponding FINISH_STEP event to mirror BaseAdapter's start-step; update
finalize() (and use AiSdkStreamEvent and AiSdkEventType.FINISH_STEP)
accordingly.

Comment on lines +148 to +153
for chunk in chunks:
if chunk.choices[0].finish_reason is not None:
continue
delta = chunk.choices[0].delta
if delta is None:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle usage-only chunks before touching choices[0].

This loop dereferences chunk.choices[0] unconditionally. The production converter already treats not chunk.choices as a valid chunk shape, so this test will raise IndexError on that terminal chunk instead of skipping it.

Suggested fix
     for chunk in chunks:
+        if not chunk.choices:
+            continue
         if chunk.choices[0].finish_reason is not None:
             continue
         delta = chunk.choices[0].delta
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`
around lines 148 - 153, The loop over chunks dereferences chunk.choices[0]
without ensuring choices exists; update the loop (the "for chunk in chunks:"
block) to skip chunks with no choices first (e.g., if not chunk.choices:
continue) before checking finish_reason or accessing delta, so you never access
choices[0] on an empty/usage-only chunk.

@leonardmq leonardmq mentioned this pull request Mar 9, 2026
2 tasks
…eat-stream-multiturn-ai-sdk-openai-protocols
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py (2)

493-496: Assert that prior_trace is forwarded explicitly.

call_args[1].get("prior_trace") is None also passes when the kwarg is omitted entirely, so this test can miss the regression it is meant to catch.

🧪 Tighten the assertion
-    assert adapter._run.call_args[1].get("prior_trace") is None
+    assert "prior_trace" in adapter._run.call_args.kwargs
+    assert adapter._run.call_args.kwargs["prior_trace"] is None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py` around lines
493 - 496, The test currently checks
adapter._run.call_args[1].get("prior_trace") is None which doesn't fail if the
kwarg is omitted; update the assertion to assert that the prior_trace kwarg is
present and explicitly None by checking the call keyword dict contains the key
and its value is None (i.e., inspect adapter._run.call_args[1] for the
"prior_trace" key and assert adapter._run.call_args[1]["prior_trace"] is None)
so the test for adapter.invoke and adapter._run will catch regressions where
prior_trace is not forwarded.

1060-1239: Add one streaming test that threads prior_trace through.

The new multiturn assertions only cover invoke / invoke_returning_run_output. A regression where invoke_openai_stream or invoke_ai_sdk_stream drops conversation history would still pass this suite.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py` around lines
1060 - 1239, Add a streaming test that verifies prior_trace is threaded through
invoke_openai_stream and invoke_ai_sdk_stream: create a stream_adapter whose
base_task includes a non-empty prior_trace, patch stream_adapter._prepare_stream
to return a FakeAdapterStream (yielding one chunk) and patch _finalize_stream to
return a TaskRun-like object; call invoke_openai_stream (and a separate test for
invoke_ai_sdk_stream), iterate the stream to completion, and assert that the
TaskRun or the object captured in the patched _prepare_stream/_finalize_stream
contains the same prior_trace from the base task (use symbols stream_adapter,
_prepare_stream, _finalize_stream, invoke_openai_stream, invoke_ai_sdk_stream,
TaskRun) so the test fails if conversation history is dropped.
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py (1)

1485-1599: Also assert that the caller's prior_trace stays untouched.

The production path deep-copies seeded messages specifically because LiteLLM mutates message objects. Adding a regression assertion here would protect that guarantee for the nested tool-call trace as well.

🧪 Suggested assertion
+    original_prior_trace = json.loads(json.dumps(prior_trace))
     run_output, _ = await adapter._run("what else?", prior_trace=prior_trace)
+    assert prior_trace == original_prior_trace
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py` around
lines 1485 - 1599, The test test_run_with_prior_trace_preserves_tool_calls must
also assert that the caller's prior_trace is not mutated by the adapter; after
calling await adapter._run("what else?", prior_trace=prior_trace) add an
assertion comparing the original prior_trace structure (e.g., checking specific
nested fields like the first assistant tool_calls id "call_abc123" and
subsequent tool response contents "28" and "172", or doing a deep-equality
check) to ensure adapter._run (and its helper mock_run_model_turn) did not
modify prior_trace; reference the prior_trace variable and the
adapter._run/_run_model_turn symbols when adding this assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py`:
- Around line 493-496: The test currently checks
adapter._run.call_args[1].get("prior_trace") is None which doesn't fail if the
kwarg is omitted; update the assertion to assert that the prior_trace kwarg is
present and explicitly None by checking the call keyword dict contains the key
and its value is None (i.e., inspect adapter._run.call_args[1] for the
"prior_trace" key and assert adapter._run.call_args[1]["prior_trace"] is None)
so the test for adapter.invoke and adapter._run will catch regressions where
prior_trace is not forwarded.
- Around line 1060-1239: Add a streaming test that verifies prior_trace is
threaded through invoke_openai_stream and invoke_ai_sdk_stream: create a
stream_adapter whose base_task includes a non-empty prior_trace, patch
stream_adapter._prepare_stream to return a FakeAdapterStream (yielding one
chunk) and patch _finalize_stream to return a TaskRun-like object; call
invoke_openai_stream (and a separate test for invoke_ai_sdk_stream), iterate the
stream to completion, and assert that the TaskRun or the object captured in the
patched _prepare_stream/_finalize_stream contains the same prior_trace from the
base task (use symbols stream_adapter, _prepare_stream, _finalize_stream,
invoke_openai_stream, invoke_ai_sdk_stream, TaskRun) so the test fails if
conversation history is dropped.

In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py`:
- Around line 1485-1599: The test test_run_with_prior_trace_preserves_tool_calls
must also assert that the caller's prior_trace is not mutated by the adapter;
after calling await adapter._run("what else?", prior_trace=prior_trace) add an
assertion comparing the original prior_trace structure (e.g., checking specific
nested fields like the first assistant tool_calls id "call_abc123" and
subsequent tool response contents "28" and "172", or doing a deep-equality
check) to ensure adapter._run (and its helper mock_run_model_turn) did not
modify prior_trace; reference the prior_trace variable and the
adapter._run/_run_model_turn symbols when adding this assertion.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 6f85a09d-8db8-4c3f-92d8-aa3ef3bcbd84

📥 Commits

Reviewing files that changed from the base of the PR and between 4460580 and c516dca.

📒 Files selected for processing (3)
  • libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py
  • libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py

@leonardmq leonardmq requested a review from scosman March 11, 2026 10:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants