Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3f83187
refactor: support multiturn (existing conversation history with task …
leonardmq Feb 26, 2026
6c7cc58
refactor: use correct typing in chat formatter
leonardmq Feb 27, 2026
04748e4
refactor: retrieve task_run one level up
leonardmq Feb 27, 2026
57896a8
Proof of concept streaming API
scosman Feb 18, 2026
3d6ced2
test: paid integration test for streaming
leonardmq Mar 3, 2026
1464fb8
test: add test for session + streaming together
leonardmq Mar 3, 2026
dee10b3
Update libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter…
leonardmq Mar 3, 2026
456088d
refactor: stream with support for AI SDK (with tool events) and OpenA…
leonardmq Mar 8, 2026
0ea65b4
refactor: ai sdk events as pydantic models
leonardmq Mar 8, 2026
a98d886
fix: model_dump implementation and remove to_see to leave transport s…
leonardmq Mar 8, 2026
e989dca
fix: should reset before next round of toolcalls
leonardmq Mar 8, 2026
11710f2
refactor: take in a trace instead of a task_run for session continuation
leonardmq Mar 8, 2026
3ad6a27
refactor: remove ability to continue task run at api level
leonardmq Mar 8, 2026
3f08ed5
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-420-a…
leonardmq Mar 8, 2026
4d8e99f
refactor: wrap stream iterators to allow exposing task run at the end
leonardmq Mar 8, 2026
eb537ed
fix: remove autosave_runs hardcoded
leonardmq Mar 8, 2026
66b3151
fix: close text when opening toolcall
leonardmq Mar 8, 2026
4460580
fix: reset fully
leonardmq Mar 8, 2026
c516dca
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-447-f…
leonardmq Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ libs/server/build
dist/

.mcp.json

test_output/
4 changes: 4 additions & 0 deletions libs/core/kiln_ai/adapters/chat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .chat_formatter import (
BasicChatMessage,
ChatCompletionMessageIncludingLiteLLM,
ChatFormatter,
ChatMessage,
ChatStrategy,
MultiturnFormatter,
ToolCallMessage,
ToolResponseMessage,
get_chat_formatter,
Expand All @@ -11,9 +13,11 @@

__all__ = [
"BasicChatMessage",
"ChatCompletionMessageIncludingLiteLLM",
"ChatFormatter",
"ChatMessage",
"ChatStrategy",
"MultiturnFormatter",
"ToolCallMessage",
"ToolResponseMessage",
"build_tool_call_messages",
Expand Down
61 changes: 59 additions & 2 deletions libs/core/kiln_ai/adapters/chat/chat_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Literal, Optional, Sequence, Union
from typing import Dict, List, Literal, Optional, Sequence, TypeAlias, Union

from litellm.types.utils import Message as LiteLLMMessage

from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
from kiln_ai.utils.open_ai_types import ChatCompletionMessageToolCallParam
from kiln_ai.utils.open_ai_types import (
ChatCompletionMessageParam,
ChatCompletionMessageToolCallParam,
)

COT_FINAL_ANSWER_PROMPT = "Considering the above, return a final result."


ChatCompletionMessageIncludingLiteLLM: TypeAlias = Union[
ChatCompletionMessageParam, LiteLLMMessage
]


@dataclass
class BasicChatMessage:
role: Literal["system", "assistant", "user"]
Expand Down Expand Up @@ -90,6 +100,10 @@ def intermediate_outputs(self) -> Dict[str, str]:
"""Get the intermediate outputs from the chat formatter."""
return self._intermediate_outputs

def initial_messages(self) -> list[ChatCompletionMessageIncludingLiteLLM]:
"""Messages to seed the conversation. Empty for fresh runs; prior trace for continuation."""
return []

@abstractmethod
def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
"""Advance the conversation and return the next messages if any."""
Expand Down Expand Up @@ -236,6 +250,49 @@ def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
return None


class MultiturnFormatter(ChatFormatter):
"""
Formatter for continuing a multi-turn conversation with prior trace.
Takes prior_trace (existing conversation) and appends the new user message.
Produces a single turn: the new user message. Tool calls and multi-turn
model responses are handled by _run_model_turn's internal loop.
"""

def __init__(
self,
prior_trace: list[ChatCompletionMessageParam],
user_input: InputType,
) -> None:
super().__init__(
system_message="",
user_input=user_input,
thinking_instructions=None,
)
self._prior_trace = prior_trace

def initial_messages(self) -> list[ChatCompletionMessageIncludingLiteLLM]:
"""Messages to seed the conversation (prior trace)."""
return list(self._prior_trace)

def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
if self._state == "start":
# prior trace is already in the messages list and contains system and so on, we only need
# to append the latest new user message
user_msg = BasicChatMessage("user", format_user_message(self.user_input))
self._state = "awaiting_final"
self._messages.append(user_msg)
Comment on lines +273 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the prior trace in formatter-owned state.

initial_messages() carries the continuation history, but _messages is still empty here, so messages() and message_dicts() only describe the newly appended turn. Any caller that serializes the formatter state will drop the earlier conversation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/core/kiln_ai/adapters/chat/chat_formatter.py` around lines 273 - 283,
initial_messages() returns the prior trace but _messages is never populated from
_prior_trace, so messages() and message_dicts() only include new turns; ensure
formatter-owned state preserves the prior trace by seeding _messages from
_prior_trace when starting the conversation. Modify next_turn (the branch where
self._state == "start") to initialize or extend self._messages with
list(self._prior_trace) before appending the new BasicChatMessage, so the
existing prior trace is retained for subsequent messages() / message_dicts()
calls (references: initial_messages, next_turn, _prior_trace, _messages,
messages(), message_dicts()).

return ChatTurn(messages=[user_msg], final_call=True)

if self._state == "awaiting_final":
if previous_output is None:
raise ValueError("previous_output required for final step")
self._messages.append(BasicChatMessage("assistant", previous_output))
self._state = "done"
return None

return None


def get_chat_formatter(
strategy: ChatStrategy,
system_message: str,
Expand Down
71 changes: 71 additions & 0 deletions libs/core/kiln_ai/adapters/chat/test_chat_formatter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from kiln_ai.adapters.chat import ChatStrategy, get_chat_formatter
from kiln_ai.adapters.chat.chat_formatter import (
COT_FINAL_ANSWER_PROMPT,
MultiturnFormatter,
format_user_message,
)

Expand Down Expand Up @@ -119,6 +120,76 @@ def test_chat_formatter_r1_style():
assert formatter.intermediate_outputs() == {}


def test_multiturn_formatter_initial_messages():
prior_trace = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="new input")
assert formatter.initial_messages() == prior_trace


def test_multiturn_formatter_next_turn():
prior_trace = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="follow-up")

first = formatter.next_turn()
assert first is not None
assert len(first.messages) == 1
assert first.messages[0].role == "user"
assert first.messages[0].content == "follow-up"
assert first.final_call

assert formatter.next_turn("assistant response") is None


def test_multiturn_formatter_preserves_tool_call_messages():
prior_trace = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "4"},
{
"role": "assistant",
"content": "",
"reasoning_content": "Let me multiply 4 by 7.\n",
"tool_calls": [
{
"id": "call_abc123",
"function": {"arguments": '{"a": 4, "b": 7}', "name": "multiply"},
"type": "function",
}
],
},
{
"content": "28",
"role": "tool",
"tool_call_id": "call_abc123",
"kiln_task_tool_data": None,
},
{
"role": "assistant",
"content": "4 multiplied by 7 is 28.",
"reasoning_content": "Done.\n",
},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="now double it")
initial = formatter.initial_messages()
assert initial == prior_trace
assert initial[2]["tool_calls"][0]["id"] == "call_abc123"
assert initial[2]["tool_calls"][0]["function"]["name"] == "multiply"
assert initial[3]["role"] == "tool"
assert initial[3]["tool_call_id"] == "call_abc123"

first = formatter.next_turn()
assert first is not None
assert len(first.messages) == 1
assert first.messages[0].role == "user"
assert first.messages[0].content == "now double it"
assert first.final_call


def test_format_user_message():
# String
assert format_user_message("test input") == "test input"
Expand Down
Empty file.
60 changes: 60 additions & 0 deletions libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import Any, AsyncIterator, Optional, Union

import litellm
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
TextCompletionResponse,
)


class StreamingCompletion:
"""
Async iterable wrapper around ``litellm.acompletion`` with streaming.

Yields ``ModelResponseStream`` chunks as they arrive. After iteration
completes, the assembled ``ModelResponse`` is available via the
``.response`` property.

Usage::

stream = StreamingCompletion(model=..., messages=...)
async for chunk in stream:
# handle chunk however you like (print, log, send over WS, …)
pass
final = stream.response # fully assembled ModelResponse
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs = dict(kwargs)
kwargs.pop("stream", None)
self._args = args
self._kwargs = kwargs
self._response: Optional[Union[ModelResponse, TextCompletionResponse]] = None
self._iterated: bool = False

@property
def response(self) -> Optional[Union[ModelResponse, TextCompletionResponse]]:
"""The final assembled response. Only available after iteration."""
if not self._iterated:
raise RuntimeError(
"StreamingCompletion has not been iterated yet. "
"Use 'async for chunk in stream:' before accessing .response"
)
return self._response

async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
self._response = None
self._iterated = False

chunks: list[ModelResponseStream] = []
stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs)

async for chunk in stream:
chunks.append(chunk)
yield chunk

self._response = litellm.stream_chunk_builder(chunks)
self._iterated = True
Loading
Loading