@@ -97,6 +97,7 @@ class ServingSpec(nuclio_function.NuclioSpec):
9797 "default_class" ,
9898 "secret_sources" ,
9999 "track_models" ,
100+ "streaming" ,
100101 ]
101102
102103 def __init__ (
@@ -154,6 +155,7 @@ def __init__(
154155 model_endpoint_creation_task_name = None ,
155156 serving_spec = None ,
156157 auth = None ,
158+ streaming : Optional [bool ] = None ,
157159 ):
158160 super ().__init__ (
159161 command = command ,
@@ -212,6 +214,7 @@ def __init__(
212214 self .secret_sources = secret_sources or []
213215 self .default_content_type = default_content_type
214216 self .model_endpoint_creation_task_name = model_endpoint_creation_task_name
217+ self .streaming = streaming
215218
216219 @property
217220 def graph (self ) -> Union [RouterStep , RootFlowStep ]:
@@ -384,6 +387,63 @@ def set_tracking(
384387 if stream_args :
385388 self .spec .parameters ["stream_args" ] = stream_args
386389
390+ def set_streaming (self , enabled : bool = True ) -> None :
391+ """Enable or disable streaming mode for the serving function.
392+
393+ When streaming is enabled, the function handler yields results as they
394+ arrive from streaming steps in the graph, allowing for real-time
395+ streaming responses (e.g., for LLM token streaming).
396+
397+ Streaming is only supported with HTTP triggers. When streaming is enabled,
398+ non-HTTP triggers cannot be added to the function.
399+
400+ :param enabled: Enable or disable streaming mode. Default is True.
401+
402+ Example::
403+
404+ # Create a serving function with streaming enabled
405+ serving_fn = mlrun.code_to_function(kind="serving")
406+ serving_fn.set_topology("flow", engine="async")
407+ serving_fn.set_streaming(enabled=True)
408+
409+ """
410+ # Validate that only HTTP triggers are configured when enabling streaming
411+ if enabled :
412+ # Triggers are stored as "spec.triggers.<name>" keys in the config dict
413+ for key , trigger_spec in self .spec .config .items ():
414+ if key .startswith ("spec.triggers." ):
415+ trigger_name = key .split ("." )[- 1 ]
416+ trigger_kind = trigger_spec .get ("kind" , "http" )
417+ if trigger_kind != "http" :
418+ raise mlrun .errors .MLRunInvalidArgumentError (
419+ f"Streaming is only supported with HTTP triggers. "
420+ f"Found non-HTTP trigger '{ trigger_name } ' of kind '{ trigger_kind } '. "
421+ f"Remove non-HTTP triggers before enabling streaming."
422+ )
423+
424+ self .spec .streaming = enabled
425+
426+ def add_trigger (self , name , spec ):
427+ """Add a nuclio trigger object/dict.
428+
429+ Overrides parent to validate streaming compatibility.
430+
431+ :param name: trigger name
432+ :param spec: trigger object or dict
433+ """
434+ # Validate streaming compatibility
435+ if self .spec .streaming :
436+ trigger_spec = spec .to_dict () if hasattr (spec , "to_dict" ) else spec
437+ trigger_kind = trigger_spec .get ("kind" , "http" )
438+ if trigger_kind != "http" :
439+ raise mlrun .errors .MLRunInvalidArgumentError (
440+ f"Cannot add non-HTTP trigger '{ name } ' (kind='{ trigger_kind } ') "
441+ f"when streaming is enabled. Streaming only supports HTTP triggers. "
442+ f"Either disable streaming with set_streaming(False) or use HTTP triggers only."
443+ )
444+
445+ return super ().add_trigger (name , spec )
446+
387447 def add_model (
388448 self ,
389449 key : str ,
@@ -889,6 +949,13 @@ def to_job(
889949 f"Cannot convert function '{ self .metadata .name } ' to a job because it has child functions"
890950 )
891951
952+ if self .spec .streaming :
953+ raise mlrun .errors .MLRunInvalidArgumentError (
954+ f"Cannot convert function '{ self .metadata .name } ' to a job because streaming "
955+ f"is enabled. Streaming functions return real-time HTTP responses and cannot "
956+ f"run as batch jobs. Please disable streaming with set_streaming(False) first."
957+ )
958+
892959 self ._add_steps_requirements ()
893960
894961 spec = pod_runtime .KubeResourceSpec (
0 commit comments