Skip to content

Commit 74e361b

Browse files
committed
Implement support for stream responses
Using the new functionality introduced in storey 1.11.8 / mlrun/storey#605. [ML-11876](https://iguazio.atlassian.net/browse/ML-11876)
1 parent 1026887 commit 74e361b

File tree

18 files changed

+2007
-1307
lines changed

18 files changed

+2007
-1307
lines changed

dockerfiles/gpu/locked-requirements.txt

Lines changed: 155 additions & 155 deletions
Large diffs are not rendered by default.

dockerfiles/jupyter/locked-requirements.txt

Lines changed: 155 additions & 155 deletions
Large diffs are not rendered by default.

dockerfiles/mlrun-api/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ ENV UV_SYSTEM_PYTHON=true UV_LINK_MODE=copy UV_COMPILE_BYTECODE=1
4141
RUN --mount=from=uv-image,source=/uv,target=/bin/uv \
4242
--mount=type=cache,id=pip-${MLRUN_PYTHON_VERSION},target=/root/.cache/uv \
4343
--mount=type=bind,source=dockerfiles/mlrun-api/locked-requirements.txt,target=locked-requirements.txt \
44-
uv pip sync --require-hashes locked-requirements.txt --python-version ${MLRUN_PYTHON_VERSION}
44+
uv pip sync locked-requirements.txt --python-version ${MLRUN_PYTHON_VERSION}
4545

4646
WORKDIR /tmp/mlrun
4747
COPY *.txt *.md *.py *.toml ./

dockerfiles/mlrun-api/locked-requirements.txt

Lines changed: 158 additions & 158 deletions
Large diffs are not rendered by default.

dockerfiles/mlrun-kfp/locked-requirements.txt

Lines changed: 155 additions & 155 deletions
Large diffs are not rendered by default.

dockerfiles/mlrun/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ENV UV_SYSTEM_PYTHON=true UV_LINK_MODE=copy UV_COMPILE_BYTECODE=1
7070
RUN --mount=from=uv-image,source=/uv,target=/bin/uv \
7171
--mount=type=cache,id=pip-${MLRUN_PYTHON_VERSION},target=/root/.cache/uv \
7272
--mount=type=bind,source=dockerfiles/mlrun/locked-requirements.txt,target=locked-requirements.txt \
73-
uv pip install --no-deps --require-hashes -r locked-requirements.txt --python-version ${MLRUN_PYTHON_VERSION}
73+
uv pip install --no-deps -r locked-requirements.txt --python-version ${MLRUN_PYTHON_VERSION}
7474

7575
WORKDIR /tmp/mlrun
7676

dockerfiles/mlrun/locked-requirements.txt

Lines changed: 155 additions & 155 deletions
Large diffs are not rendered by default.

dockerfiles/test-system/locked-requirements.txt

Lines changed: 254 additions & 254 deletions
Large diffs are not rendered by default.

dockerfiles/test/locked-requirements.txt

Lines changed: 257 additions & 257 deletions
Large diffs are not rendered by default.

mlrun/runtimes/nuclio/serving.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class ServingSpec(nuclio_function.NuclioSpec):
9797
"default_class",
9898
"secret_sources",
9999
"track_models",
100+
"streaming",
100101
]
101102

102103
def __init__(
@@ -154,6 +155,7 @@ def __init__(
154155
model_endpoint_creation_task_name=None,
155156
serving_spec=None,
156157
auth=None,
158+
streaming: Optional[bool] = None,
157159
):
158160
super().__init__(
159161
command=command,
@@ -212,6 +214,7 @@ def __init__(
212214
self.secret_sources = secret_sources or []
213215
self.default_content_type = default_content_type
214216
self.model_endpoint_creation_task_name = model_endpoint_creation_task_name
217+
self.streaming = streaming
215218

216219
@property
217220
def graph(self) -> Union[RouterStep, RootFlowStep]:
@@ -384,6 +387,63 @@ def set_tracking(
384387
if stream_args:
385388
self.spec.parameters["stream_args"] = stream_args
386389

390+
def set_streaming(self, enabled: bool = True) -> None:
391+
"""Enable or disable streaming mode for the serving function.
392+
393+
When streaming is enabled, the function handler yields results as they
394+
arrive from streaming steps in the graph, allowing for real-time
395+
streaming responses (e.g., for LLM token streaming).
396+
397+
Streaming is only supported with HTTP triggers. When streaming is enabled,
398+
non-HTTP triggers cannot be added to the function.
399+
400+
:param enabled: Enable or disable streaming mode. Default is True.
401+
402+
Example::
403+
404+
# Create a serving function with streaming enabled
405+
serving_fn = mlrun.code_to_function(kind="serving")
406+
serving_fn.set_topology("flow", engine="async")
407+
serving_fn.set_streaming(enabled=True)
408+
409+
"""
410+
# Validate that only HTTP triggers are configured when enabling streaming
411+
if enabled:
412+
# Triggers are stored as "spec.triggers.<name>" keys in the config dict
413+
for key, trigger_spec in self.spec.config.items():
414+
if key.startswith("spec.triggers."):
415+
trigger_name = key.split(".")[-1]
416+
trigger_kind = trigger_spec.get("kind", "http")
417+
if trigger_kind != "http":
418+
raise mlrun.errors.MLRunInvalidArgumentError(
419+
f"Streaming is only supported with HTTP triggers. "
420+
f"Found non-HTTP trigger '{trigger_name}' of kind '{trigger_kind}'. "
421+
f"Remove non-HTTP triggers before enabling streaming."
422+
)
423+
424+
self.spec.streaming = enabled
425+
426+
def add_trigger(self, name, spec):
427+
"""Add a nuclio trigger object/dict.
428+
429+
Overrides parent to validate streaming compatibility.
430+
431+
:param name: trigger name
432+
:param spec: trigger object or dict
433+
"""
434+
# Validate streaming compatibility
435+
if self.spec.streaming:
436+
trigger_spec = spec.to_dict() if hasattr(spec, "to_dict") else spec
437+
trigger_kind = trigger_spec.get("kind", "http")
438+
if trigger_kind != "http":
439+
raise mlrun.errors.MLRunInvalidArgumentError(
440+
f"Cannot add non-HTTP trigger '{name}' (kind='{trigger_kind}') "
441+
f"when streaming is enabled. Streaming only supports HTTP triggers. "
442+
f"Either disable streaming with set_streaming(False) or use HTTP triggers only."
443+
)
444+
445+
return super().add_trigger(name, spec)
446+
387447
def add_model(
388448
self,
389449
key: str,
@@ -889,6 +949,13 @@ def to_job(
889949
f"Cannot convert function '{self.metadata.name}' to a job because it has child functions"
890950
)
891951

952+
if self.spec.streaming:
953+
raise mlrun.errors.MLRunInvalidArgumentError(
954+
f"Cannot convert function '{self.metadata.name}' to a job because streaming "
955+
f"is enabled. Streaming functions return real-time HTTP responses and cannot "
956+
f"run as batch jobs. Please disable streaming with set_streaming(False) first."
957+
)
958+
892959
self._add_steps_requirements()
893960

894961
spec = pod_runtime.KubeResourceSpec(

0 commit comments

Comments
 (0)