Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a2ac67d
Add support for stream responses
gtopper Jan 20, 2026
904f2a6
Improvement and more tests
gtopper Jan 20, 2026
7abd93b
More tests and fixes
gtopper Jan 20, 2026
404cf24
New tests and related fixes
gtopper Jan 20, 2026
388bfd7
Fmt
gtopper Jan 20, 2026
78e676c
Shorten code
gtopper Jan 20, 2026
cf293ab
Improve comment
gtopper Jan 20, 2026
260f105
Fix style
gtopper Jan 20, 2026
d6b4f0c
Remove unreachable yields
gtopper Jan 20, 2026
e3bdfdc
Local refactor
gtopper Jan 22, 2026
b563da7
Refactor
gtopper Jan 22, 2026
228785d
Add async test
gtopper Jan 22, 2026
e081b1a
Use Union until #606 is merged
gtopper Jan 22, 2026
17e1ccb
Minor improvements to streaming tests
gtopper Jan 22, 2026
c8960fb
Merge remote-tracking branch 'mlrun/development' into ML-11875
gtopper Jan 25, 2026
951da8d
Delete unused method
gtopper Jan 25, 2026
bf0f33e
Use repr
gtopper Jan 25, 2026
51a1bfb
Improve comments, minor refactoring
gtopper Jan 25, 2026
4ff9e23
Dedup local code
gtopper Jan 25, 2026
824f5bf
Minor refactoring
gtopper Jan 25, 2026
5e47bbf
Remove unnecessary elif
gtopper Jan 25, 2026
3d2a238
Minor local refactoring
gtopper Jan 25, 2026
ff95a06
Minor local refactoring
gtopper Jan 25, 2026
3e24d55
Improve execution_mechanism + streaming validation
gtopper Jan 25, 2026
9981f46
Minor change for static check
gtopper Jan 25, 2026
ed6b1ab
Replace Reduce with Complete in streaming test suite
gtopper Jan 25, 2026
5e52f64
Merge streaming + error handling test suites
gtopper Jan 25, 2026
4369c59
Improve test
gtopper Jan 25, 2026
a458ae7
Merge two streaming completion-related test suites
gtopper Jan 25, 2026
1a61f3e
Improve test names, descriptions
gtopper Jan 25, 2026
896ed90
Extract common streaming test helpers
gtopper Jan 25, 2026
9b30186
Improve docstrings and type annotations
gtopper Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions storey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .dtypes import FixedWindowType # noqa: F401
from .dtypes import LateDataHandling # noqa: F401
from .dtypes import SlidingWindows # noqa: F401
from .dtypes import StreamingError # noqa: F401
from .flow import Batch # noqa: F401
from .flow import Choice # noqa: F401
from .flow import Complete # noqa: F401
Expand Down Expand Up @@ -66,6 +67,7 @@
from .sources import SQLSource # noqa: F401
from .sources import SyncEmitSource # noqa: F401
from .sql_driver import SQLDriver # noqa: F401
from .steps import Collector # noqa: F401
from .table import Table # noqa: F401
from .targets import CSVTarget # noqa: F401
from .targets import KafkaTarget # noqa: F401
Expand Down
47 changes: 46 additions & 1 deletion storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class Event:
:param content_type: Request content type (HTTP only) (Optional)
:param awaitable_result: Generally not passed directly. (Optional)
:type awaitable_result: AwaitableResult (Optional)

Streaming attributes (set dynamically by streaming steps):
streaming_step: Name of the step that initiated the stream (set on chunk events).
chunk_id: Sequential identifier for chunks within a stream (0, 1, 2, ...).
"""

def __init__(
Expand Down Expand Up @@ -88,7 +92,7 @@ def __eq__(self, other):
) # noqa: E127

def __str__(self):
return f"Event(id={self.id}, key={str(self.key)}, body={self.body})"
return f"Event(id={self.id!r}, key={str(self.key)!r}, body={self.body!r})"


class V3ioError(Exception):
Expand All @@ -103,6 +107,47 @@ class FlowError(Exception):
pass


class StreamingError(Exception):
"""Exception raised for streaming-related errors."""

pass


class StreamChunk:
"""Wrapper for streaming chunks in the AwaitableResult queue.

When a streaming step emits chunks, Complete wraps each chunk body
in a StreamChunk before pushing to the queue. This allows await_result()
to detect streaming responses and return a generator.
"""

def __init__(self, body):
self.body = body

def __repr__(self):
return f"StreamChunk({self.body!r})"


class StreamCompletion:
"""Sentinel marking the end of a stream from a specific step.

When a streaming step finishes yielding chunks, this sentinel is
pushed downstream and eventually to the AwaitableResult queue to
signal that the stream is complete.

:param streaming_step: Name of the step that originated the stream.
:param original_event: Reference to the original event that was streamed.
"""

def __init__(self, streaming_step: str, original_event: Event):
self.streaming_step = streaming_step
self.original_event = original_event

def __repr__(self):
event_id = self.original_event.id if self.original_event else None
return f"StreamCompletion(streaming_step={self.streaming_step!r}, event_id={event_id!r})"


class WindowBase:
def __init__(self, window, period, window_str):
self.window_millis = window
Expand Down
Loading