Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1c120bf
last todos
sfierro Feb 20, 2026
2b69ecb
Merge pull request #1067 from Kiln-AI/sfierro/last-changes
scosman Feb 20, 2026
60e7a48
version bump for release
scosman Feb 20, 2026
e80c067
version bump for release (lockfile)
scosman Feb 20, 2026
18a0b60
refactor: autosave_runs in memory (not persisted)
leonardmq Feb 24, 2026
f5af283
cr: explicitly prevent dumping in memory to yaml
leonardmq Feb 24, 2026
ec915f1
refactor: combine in memory and persisted in settings
leonardmq Feb 24, 2026
69bb336
cr feedback (mostly about locking)
leonardmq Feb 24, 2026
c8091a3
cr lock in setattr
leonardmq Feb 24, 2026
ce6b564
telemetry docs
scosman Feb 24, 2026
07e0fe3
Merge pull request #1083 from Kiln-AI/scosman/telemetry_docs_2
scosman Feb 24, 2026
cf56e58
Allow tool server edit when it fails to connect
chiang-daniel Feb 24, 2026
905dee5
CR from Leonard
chiang-daniel Feb 25, 2026
05f9d4d
Merge pull request #1084 from Kiln-AI/dchiang/fix-tool-edit-button
chiang-daniel Feb 25, 2026
f04a785
update MCP error stack
chiang-daniel Feb 26, 2026
a520b78
use markdown
chiang-daniel Feb 26, 2026
d802886
better description
chiang-daniel Feb 26, 2026
2aee473
CR
chiang-daniel Feb 26, 2026
f086060
CR from Leonard
chiang-daniel Feb 26, 2026
cd21709
clean up
chiang-daniel Feb 26, 2026
6d0d3c9
update tests to use custom_error
chiang-daniel Feb 26, 2026
544ee18
Merge pull request #1086 from Kiln-AI/dchiang/KIL-415/mcp-error-cleanup
chiang-daniel Feb 26, 2026
c4326d8
chore: update litellm
leonardmq Feb 27, 2026
20d2450
Merge pull request #1090 from Kiln-AI/leonard/kil-439-chore-update-li…
leonardmq Feb 27, 2026
a0d1646
save tool id for tool use spec
sfierro Feb 27, 2026
33fd958
Merge pull request #1097 from Kiln-AI/sfierro/tool-id-bug
sfierro Feb 27, 2026
57896a8
Proof of concept streaming API
scosman Feb 18, 2026
3d6ced2
test: paid integration test for streaming
leonardmq Mar 3, 2026
1464fb8
test: add test for session + streaming together
leonardmq Mar 3, 2026
dee10b3
Update libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter…
leonardmq Mar 3, 2026
3fabd00
Merge pull request #1082 from Kiln-AI/leonard/kil-428-make-autosave_r…
leonardmq Mar 6, 2026
c520e27
fix: pin uv tools in CI and checks
leonardmq Mar 6, 2026
1eb0fe6
fix: pinned uv run in another workflow and mcp hooks
leonardmq Mar 6, 2026
f18aa3b
Merge pull request #1104 from Kiln-AI/leonard/kil-442-fix-pin-uv-tool…
leonardmq Mar 6, 2026
456088d
refactor: stream with support for AI SDK (with tool events) and OpenA…
leonardmq Mar 8, 2026
0ea65b4
refactor: ai sdk events as pydantic models
leonardmq Mar 8, 2026
a98d886
fix: model_dump implementation and remove to_see to leave transport s…
leonardmq Mar 8, 2026
e989dca
fix: should reset before next round of toolcalls
leonardmq Mar 8, 2026
11710f2
refactor: take in a trace instead of a task_run for session continuation
leonardmq Mar 8, 2026
3ad6a27
refactor: remove ability to continue task run at api level
leonardmq Mar 8, 2026
3f08ed5
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-420-a…
leonardmq Mar 8, 2026
4d8e99f
refactor: wrap stream iterators to allow exposing task run at the end
leonardmq Mar 8, 2026
eb537ed
fix: remove autosave_runs hardcoded
leonardmq Mar 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
60 changes: 60 additions & 0 deletions libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import Any, AsyncIterator, Optional, Union

import litellm
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
TextCompletionResponse,
)


class StreamingCompletion:
"""
Async iterable wrapper around ``litellm.acompletion`` with streaming.

Yields ``ModelResponseStream`` chunks as they arrive. After iteration
completes, the assembled ``ModelResponse`` is available via the
``.response`` property.

Usage::

stream = StreamingCompletion(model=..., messages=...)
async for chunk in stream:
# handle chunk however you like (print, log, send over WS, …)
pass
final = stream.response # fully assembled ModelResponse
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs = dict(kwargs)
kwargs.pop("stream", None)
self._args = args
self._kwargs = kwargs
self._response: Optional[Union[ModelResponse, TextCompletionResponse]] = None
self._iterated: bool = False

@property
def response(self) -> Optional[Union[ModelResponse, TextCompletionResponse]]:
"""The final assembled response. Only available after iteration."""
if not self._iterated:
raise RuntimeError(
"StreamingCompletion has not been iterated yet. "
"Use 'async for chunk in stream:' before accessing .response"
)
return self._response

async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
self._response = None
self._iterated = False

chunks: list[ModelResponseStream] = []
stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs)

async for chunk in stream:
chunks.append(chunk)
yield chunk

self._response = litellm.stream_chunk_builder(chunks)
self._iterated = True
139 changes: 139 additions & 0 deletions libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from __future__ import annotations

from types import SimpleNamespace
from typing import Any, List
from unittest.mock import MagicMock, patch

import pytest

from kiln_ai.adapters.litellm_utils.litellm_streaming import StreamingCompletion


def _make_chunk(content: str | None = None, finish_reason: str | None = None) -> Any:
"""Build a minimal chunk object matching litellm's streaming shape."""
delta = SimpleNamespace(content=content, role="assistant")
choice = SimpleNamespace(delta=delta, finish_reason=finish_reason, index=0)
return SimpleNamespace(choices=[choice], id="chatcmpl-test", model="test-model")


async def _async_iter(items: List[Any]):
"""Turn a plain list into an async iterator."""
for item in items:
yield item


@pytest.fixture
def mock_acompletion():
with patch("litellm.acompletion") as mock:
yield mock


@pytest.fixture
def mock_chunk_builder():
with patch("litellm.stream_chunk_builder") as mock:
yield mock


class TestStreamingCompletion:
async def test_yields_all_chunks(self, mock_acompletion, mock_chunk_builder):
chunks = [_make_chunk("Hello"), _make_chunk(" world"), _make_chunk("!")]
mock_acompletion.return_value = _async_iter(chunks)
mock_chunk_builder.return_value = MagicMock(name="final_response")

stream = StreamingCompletion(model="test", messages=[])
received = [chunk async for chunk in stream]

assert received == chunks

async def test_response_available_after_iteration(
self, mock_acompletion, mock_chunk_builder
):
chunks = [_make_chunk("hi")]
mock_acompletion.return_value = _async_iter(chunks)
sentinel = MagicMock(name="final_response")
mock_chunk_builder.return_value = sentinel

stream = StreamingCompletion(model="test", messages=[])
async for _ in stream:
pass

assert stream.response is sentinel

async def test_response_raises_before_iteration(self):
stream = StreamingCompletion(model="test", messages=[])
with pytest.raises(RuntimeError, match="not been iterated"):
_ = stream.response

async def test_stream_kwarg_is_stripped(self, mock_acompletion, mock_chunk_builder):
mock_acompletion.return_value = _async_iter([])
mock_chunk_builder.return_value = None

stream = StreamingCompletion(model="test", messages=[], stream=False)
async for _ in stream:
pass

_, call_kwargs = mock_acompletion.call_args
assert call_kwargs["stream"] is True

async def test_passes_args_and_kwargs_through(
self, mock_acompletion, mock_chunk_builder
):
mock_acompletion.return_value = _async_iter([])
mock_chunk_builder.return_value = None

stream = StreamingCompletion(
model="gpt-4", messages=[{"role": "user", "content": "hi"}], temperature=0.5
)
async for _ in stream:
pass

_, call_kwargs = mock_acompletion.call_args
assert call_kwargs["model"] == "gpt-4"
assert call_kwargs["messages"] == [{"role": "user", "content": "hi"}]
assert call_kwargs["temperature"] == 0.5
assert call_kwargs["stream"] is True

async def test_chunks_passed_to_builder(self, mock_acompletion, mock_chunk_builder):
chunks = [_make_chunk("a"), _make_chunk("b")]
mock_acompletion.return_value = _async_iter(chunks)
mock_chunk_builder.return_value = MagicMock()

stream = StreamingCompletion(model="test", messages=[])
async for _ in stream:
pass

mock_chunk_builder.assert_called_once_with(chunks)

async def test_re_iteration_resets_state(
self, mock_acompletion, mock_chunk_builder
):
first_chunks = [_make_chunk("first")]
second_chunks = [_make_chunk("second")]
first_response = MagicMock(name="first_response")
second_response = MagicMock(name="second_response")

mock_acompletion.side_effect = [
_async_iter(first_chunks),
_async_iter(second_chunks),
]
mock_chunk_builder.side_effect = [first_response, second_response]

stream = StreamingCompletion(model="test", messages=[])

async for _ in stream:
pass
assert stream.response is first_response

async for _ in stream:
pass
assert stream.response is second_response

async def test_empty_stream(self, mock_acompletion, mock_chunk_builder):
mock_acompletion.return_value = _async_iter([])
mock_chunk_builder.return_value = None

stream = StreamingCompletion(model="test", messages=[])
received = [chunk async for chunk in stream]

assert received == []
assert stream.response is None
17 changes: 14 additions & 3 deletions libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
from abc import ABCMeta, abstractmethod
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Dict, Tuple

from litellm.types.utils import ModelResponseStream

from kiln_ai.adapters.chat.chat_formatter import (
ChatFormatter,
MultiturnFormatter,
Expand Down Expand Up @@ -49,6 +52,8 @@
from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam

StreamCallback = Callable[[ModelResponseStream], Awaitable[None]]


@dataclass
class AdapterConfig:
Expand Down Expand Up @@ -128,9 +133,10 @@ async def invoke(
input: InputType,
input_source: DataSource | None = None,
existing_run: TaskRun | None = None,
on_chunk: StreamCallback | None = None,
) -> TaskRun:
run_output, _ = await self.invoke_returning_run_output(
input, input_source, existing_run
input, input_source, existing_run, on_chunk=on_chunk
)
return run_output

Expand All @@ -139,6 +145,7 @@ async def _run_returning_run_output(
input: InputType,
input_source: DataSource | None = None,
existing_run: TaskRun | None = None,
on_chunk: StreamCallback | None = None,
) -> Tuple[TaskRun, RunOutput]:
# validate input, allowing arrays
if self.input_schema is not None:
Expand Down Expand Up @@ -166,7 +173,9 @@ async def _run_returning_run_output(
formatted_input = formatter.format_input(input)

# Run
run_output, usage = await self._run(formatted_input, prior_trace=prior_trace)
run_output, usage = await self._run(
formatted_input, prior_trace=prior_trace, on_chunk=on_chunk
)

# Parse
provider = self.model_provider()
Expand Down Expand Up @@ -256,6 +265,7 @@ async def invoke_returning_run_output(
input: InputType,
input_source: DataSource | None = None,
existing_run: TaskRun | None = None,
on_chunk: StreamCallback | None = None,
) -> Tuple[TaskRun, RunOutput]:
# Determine if this is the root agent (no existing run context)
is_root_agent = get_agent_run_id() is None
Expand All @@ -266,7 +276,7 @@ async def invoke_returning_run_output(

try:
return await self._run_returning_run_output(
input, input_source, existing_run
input, input_source, existing_run, on_chunk=on_chunk
)
finally:
if is_root_agent:
Expand All @@ -289,6 +299,7 @@ async def _run(
self,
input: InputType,
prior_trace: list[ChatCompletionMessageParam] | None = None,
on_chunk: StreamCallback | None = None,
) -> Tuple[RunOutput, Usage | None]:
pass

Expand Down
17 changes: 13 additions & 4 deletions libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple

import litellm
from litellm.types.utils import (
ChatCompletionMessageToolCall,
ChoiceLogprobs,
Expand All @@ -20,6 +19,7 @@

import kiln_ai.datamodel as datamodel
from kiln_ai.adapters.chat import ChatCompletionMessageIncludingLiteLLM
from kiln_ai.adapters.litellm_utils.litellm_streaming import StreamingCompletion
from kiln_ai.adapters.ml_model_list import (
KilnModelProvider,
ModelProviderName,
Expand All @@ -29,6 +29,7 @@
AdapterConfig,
BaseAdapter,
RunOutput,
StreamCallback,
Usage,
)
from kiln_ai.adapters.model_adapters.litellm_config import LiteLlmConfig
Expand Down Expand Up @@ -95,6 +96,7 @@ async def _run_model_turn(
prior_messages: list[ChatCompletionMessageIncludingLiteLLM],
top_logprobs: int | None,
skip_response_format: bool,
on_chunk: StreamCallback | None = None,
) -> ModelTurnResult:
"""
Call the model for a single top level turn: from user message to agent message.
Expand All @@ -118,7 +120,7 @@ async def _run_model_turn(

# Make the completion call
model_response, response_choice = await self.acompletion_checking_response(
**completion_kwargs
on_chunk=on_chunk, **completion_kwargs
)

# count the usage
Expand Down Expand Up @@ -185,6 +187,7 @@ async def _run(
self,
input: InputType,
prior_trace: list[ChatCompletionMessageParam] | None = None,
on_chunk: StreamCallback | None = None,
) -> tuple[RunOutput, Usage | None]:
usage = Usage()

Expand Down Expand Up @@ -229,6 +232,7 @@ async def _run(
messages,
self.base_adapter_config.top_logprobs if turn.final_call else None,
skip_response_format,
on_chunk=on_chunk,
)

usage += turn_result.usage
Expand Down Expand Up @@ -297,9 +301,14 @@ def _extract_reasoning_to_intermediate_outputs(
intermediate_outputs["reasoning"] = stripped_reasoning_content

async def acompletion_checking_response(
self, **kwargs
self, on_chunk: StreamCallback | None = None, **kwargs
) -> Tuple[ModelResponse, Choices]:
response = await litellm.acompletion(**kwargs)
stream = StreamingCompletion(**kwargs)
async for chunk in stream:
if on_chunk is not None:
await on_chunk(chunk)
response = stream.response

if (
not isinstance(response, ModelResponse)
or not response.choices
Expand Down
11 changes: 9 additions & 2 deletions libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import json
from typing import Tuple

from kiln_ai.adapters.model_adapters.base_adapter import AdapterConfig, BaseAdapter
from kiln_ai.adapters.model_adapters.base_adapter import (
AdapterConfig,
BaseAdapter,
StreamCallback,
)
from kiln_ai.adapters.parsers.json_parser import parse_json_string
from kiln_ai.adapters.run_output import RunOutput
from kiln_ai.datamodel import DataSource, Task, TaskRun, Usage
Expand Down Expand Up @@ -46,6 +50,7 @@ async def _run(
self,
input: InputType,
prior_trace: list[ChatCompletionMessageParam] | None = None,
on_chunk: StreamCallback | None = None,
) -> Tuple[RunOutput, Usage | None]:
if prior_trace is not None:
raise NotImplementedError(
Expand Down Expand Up @@ -86,6 +91,7 @@ async def invoke(
input: InputType,
input_source: DataSource | None = None,
existing_run: TaskRun | None = None,
on_chunk: StreamCallback | None = None,
) -> TaskRun:
if existing_run is not None:
raise NotImplementedError(
Expand All @@ -94,7 +100,7 @@ async def invoke(
)

run_output, _ = await self.invoke_returning_run_output(
input, input_source, existing_run
input, input_source, existing_run, on_chunk=on_chunk
)
return run_output

Expand All @@ -103,6 +109,7 @@ async def invoke_returning_run_output(
input: InputType,
input_source: DataSource | None = None,
existing_run: TaskRun | None = None,
on_chunk: StreamCallback | None = None,
) -> Tuple[TaskRun, RunOutput]:
"""
Runs the task and returns both the persisted TaskRun and raw RunOutput.
Expand Down
Loading
Loading