Add timing metadata for streaming ParallelExecution responses#613
Merged
gtopper merged 6 commits intomlrun:developmentfrom Feb 12, 2026
Merged
Add timing metadata for streaming ParallelExecution responses#613gtopper merged 6 commits intomlrun:developmentfrom
ParallelExecution responses#613gtopper merged 6 commits intomlrun:developmentfrom
Conversation
[ML-11879](https://iguazio.atlassian.net/browse/ML-11879) Streaming responses from `ParallelExecution` were missing the when and `microsec` timing metadata that non-streaming responses include. This metadata is required for model monitoring in MLRun. Changes * Add `_StreamingResult` class to wrap streaming generators with timing info * Set timing metadata on events before emitting streaming chunks * Handle both in-process streaming (`_StreamingResult`) and process-based streaming (raw generators) Notes For streaming, `microsec` is set to `None` since total runtime isn't available until streaming completes For process-based streaming, when uses the timestamp when chunks start arriving (timing from subprocess isn't available)
ParallelExecution responses
gtopper
added a commit
to mlrun/mlrun
that referenced
this pull request
Feb 16, 2026
[ML-11879](https://iguazio.atlassian.net/browse/ML-11879) Enables model monitoring for streaming `ModelRunnerStep`s (MRS). When a serving function has streaming enabled, the MM pipeline needs to aggregate streaming chunks into a single event before processing. **Key changes:** * Bumps storey to get the changes in mlrun/storey#613 - **Collector insertion** (`server.py`): When `streaming=True`, dynamically inserts a `storey.Collector` step between each MRS and the MM pipeline to aggregate streaming chunks into a single event. - **Chunk aggregation** (`system_steps.py`): Adds aggregation logic to `MonitoringPreProcessor` that detects collected streaming chunks and merges them — concatenating string outputs, summing numeric metrics, and taking first values for other fields. - **ProcessEndpointEvent fixes** (`stream_processing.py`): Returns event with `body=None` instead of bare `None` on validation failure (fixes downstream `AttributeError`). Removes `microsec` `is_not_none` validation since streaming latency is calculated asynchronously by `Collector`. ## Tests * Unit tests for chunk detection and all aggregation paths * Graph structure test verifying `Collector` insertion when streaming is enabled * System test (`test_monitoring_with_streaming_model_runner`) verifying end-to-end: streaming model → `Collector` → MM pipeline → model endpoint creation * `test_app_flow` passed [ML-11879]: https://iguazio.atlassian.net/browse/ML-11879?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
ML-11879
Streaming responses from
ParallelExecutionwere missing the when andmicrosectiming metadata that non-streaming responses include. This metadata is required for model monitoring in MLRun.Changes
_StreamingResultclass to wrap streaming generators with timing info_StreamingResult) and process-based streaming (raw generators)Collectorcalculates total streaming duration (microsec) when the stream completesNotes
ParallelExecutionsetsmicrosectoNoneinitially since total runtime isn't known until streaming completesCollectorcalculates the actualmicrosecvalue (total elapsed time from stream start to completion)whenuses the timestamp when chunks start arriving