feat(iorails): IORails support for streaming output rails#1765
Conversation
Greptile SummaryThis PR adds streaming output rail support to One logic issue in
|
| Filename | Overview |
|---|---|
| nemoguardrails/guardrails/iorails.py | Core streaming implementation: adds stream_async(), _generation_task(), _wrapped_iterator(), and _run_output_rails_in_streaming(); has a logic bug where content chunks preceding an error in the same buffer batch are silently dropped in stream_first=True mode. |
| nemoguardrails/guardrails/model_engine.py | Adds stream_call() with correct SSE line-by-line parsing via readline(); refactors shared request-building logic into _prepare_request()/_ensure_running()/_raise_for_status(); sock_read timeout reuses total timeout which may be too tight for slow generators. |
| nemoguardrails/guardrails/model_manager.py | Adds stream_async() as a thin async generator proxy over engine.stream_call(); straightforward and correct. |
| tests/guardrails/test_iorails_streaming.py | New streaming test suite covering validation, no-output-rails path, stream_first/gated modes, error propagation, and concurrency; does not assert against content-chunk loss when error shares a buffer batch with real chunks. |
| tests/guardrails/test_iorails.py | Adds two tests for generate_async: dict options forwarding and exception propagation; both correct. |
| tests/guardrails/test_model_engine.py | Tests for the new stream_call() and refactored helper methods; coverage looks adequate. |
| tests/guardrails/test_model_manager.py | Adds tests for ModelManager.stream_async(); straightforward and correct. |
Sequence Diagram
sequenceDiagram
participant Caller
participant _wrapped_iterator
participant _generation_task
participant StreamingHandler
participant RollingBuffer
participant RailsManager
participant ModelManager
Caller->>_wrapped_iterator: async for chunk in stream_async()
_wrapped_iterator->>_wrapped_iterator: semaphore.acquire()
_wrapped_iterator->>_generation_task: asyncio.create_task()
par Background Task
_generation_task->>RailsManager: is_input_safe(messages)
RailsManager-->>_generation_task: RailResult
alt input safe
_generation_task->>ModelManager: stream_async("main", messages)
loop SSE chunks
ModelManager-->>StreamingHandler: push_chunk(delta)
end
_generation_task->>StreamingHandler: push_chunk(END_OF_STREAM)
else input blocked
_generation_task->>StreamingHandler: push_chunk(REFUSAL_MESSAGE)
_generation_task->>StreamingHandler: push_chunk(END_OF_STREAM)
end
and Consumer (with output rails)
_wrapped_iterator->>RollingBuffer: buffer_strategy(streaming_handler)
loop chunk_batch from RollingBuffer
RollingBuffer->>RollingBuffer: accumulate until chunk_size tokens
RollingBuffer-->>_wrapped_iterator: ChunkBatch
alt stream_first=True
_wrapped_iterator-->>Caller: yield user_output_chunks
end
_wrapped_iterator->>RailsManager: is_output_safe(messages, processing_context)
RailsManager-->>_wrapped_iterator: RailResult
alt output safe AND stream_first=False
_wrapped_iterator-->>Caller: yield user_output_chunks
else output blocked
_wrapped_iterator-->>Caller: yield error JSON
end
end
end
_wrapped_iterator->>_generation_task: task.cancel() (if not done)
_wrapped_iterator->>_wrapped_iterator: semaphore.release()
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemoguardrails/guardrails/iorails.py
Line: 352-363
Comment:
**Content chunks silently dropped in same-batch error scenario**
The error-detection loop runs before the `if stream_first: yield` block. When a generation error and content chunks land in the same flush batch (e.g. `chunk_size=5` but the stream raises after only 2 tokens), all preceding content chunks in `user_output_chunks` are discarded — only the error JSON is yielded. In `stream_first=True` mode this directly contradicts the "deliver eagerly" contract.
This is reproducible with the existing `_mid_stream_failure` mock (`chunk_size=5`, 2 real chunks then raise): all three items end up in a single flush at `END_OF_STREAM`, `real1`/`real2` are silently lost.
```python
# Separate error from real content first
real_chunks: list = []
error_chunk: str | None = None
for chunk in user_output_chunks:
try:
parsed = json.loads(chunk)
if isinstance(parsed, dict) and parsed.get("error", {}).get("type") == _GENERATION_ERROR_TYPE:
error_chunk = chunk
break
except (json.JSONDecodeError, TypeError):
real_chunks.append(chunk)
if stream_first:
for chunk in real_chunks:
yield chunk
if error_chunk:
yield error_chunk
return
# Run output rails on the accumulated context
req_id = get_request_id()
log.info("[%s] Running output rails", req_id)
output_result = await self.rails_manager.is_output_safe(messages, bot_response_chunk)
if not output_result.is_safe:
log.info("[%s] Output blocked: %s", req_id, output_result.reason)
error_data = {
"error": {
"message": f"Blocked by output rails: {output_result.reason}",
"type": "guardrails_violation",
"code": "content_blocked",
}
}
yield json.dumps(error_data)
return
if not stream_first:
for chunk in real_chunks:
yield chunk
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemoguardrails/guardrails/model_engine.py
Line: 286-290
Comment:
**`sock_read` reuses the non-streaming total timeout**
`self._timeout.total` was sized for a complete blocking request. Using it as the per-chunk `sock_read` deadline means a model configured with `timeout=30` will abort streaming if no SSE chunk arrives within 30 seconds — which is plausible for a slow first-token scenario. Consider exposing a separate `timeout_sock_read` parameter, or documenting that operators should set `timeout` to the expected inter-chunk interval rather than the total generation duration.
```python
stream_timeout = aiohttp.ClientTimeout(
total=None,
connect=self._timeout.connect,
sock_read=float(params.get("timeout_sock_read", self._timeout.total)),
)
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (6): Last reviewed commit: "Address Greptile feedback on SSE parsing..." | Re-trigger Greptile
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
📝 WalkthroughWalkthroughThis PR introduces streaming support to the NemoGuardrails framework across multiple layers. It adds a Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant IORails
participant ModelManager
participant ModelEngine
participant BufferStrategy
Client->>IORails: stream_async(messages, options)
IORails->>IORails: _validate_streaming_with_output_rails()
IORails->>IORails: Apply input rails (synchronous)
alt Input Unsafe
IORails-->>Client: REFUSAL_MESSAGE
else Input Safe
IORails->>ModelManager: stream_async(model_type, messages)
ModelManager->>ModelEngine: stream_call(messages, kwargs)
loop Streaming
ModelEngine-->>ModelManager: SSE chunk (delta.content)
ModelManager-->>IORails: chunk
alt Output Rails Enabled
IORails->>BufferStrategy: Buffer chunk
BufferStrategy-->>IORails: buffered content
IORails->>IORails: Check is_output_safe()
alt stream_first=true
IORails-->>Client: Yield chunk (optimistic)
alt Unsafe Detected
IORails-->>Client: Error JSON payload
end
else stream_first=false
alt Unsafe Detected
IORails-->>Client: Error JSON payload
else Safe
IORails-->>Client: Yield chunk
end
end
else Output Rails Disabled
IORails-->>Client: Yield chunk
end
end
ModelEngine-->>IORails: [DONE] signal
IORails-->>Client: Stream completion
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nemoguardrails/guardrails/iorails.py`:
- Around line 284-302: The output-buffering path is incompatible with
metadata-emitting chunks: before calling get_buffer_strategy(buffer_strategy)
and async for chunk_batch in buffer_strategy(streaming_handler) detect if
output_streaming_config.rails.output.streaming is enabled and the incoming
streaming_handler yields dicts/has include_metadata=True; either raise a clear
error rejecting this configuration or wrap/normalize the streaming_handler into
a transformer that maps dict chunks to plain text (extracting the text field)
before passing to buffer_strategy and then reattach the original metadata to
each yielded chunk after buffering/rails checks; update logic around
streaming_handler, buffer_strategy, get_buffer_strategy, and stream_first to use
the chosen approach so buffering always consumes List[str] and output yields
preserve metadata.
- Around line 252-256: The pre-check using self._stream_semaphore._value is
racy; replace it with a truly non-blocking acquisition attempt using
asyncio.wait_for to avoid blocking. Change the code to attempt await
asyncio.wait_for(self._stream_semaphore.acquire(), timeout=0) inside a
try/except asyncio.TimeoutError and on TimeoutError raise
asyncio.QueueFull("Streaming concurrency limit reached"); keep using
self._stream_semaphore.acquire() for the actual acquisition but only via the
non-blocking wait_for wrapper so the loser does not block.
- Around line 275-278: The finally block in _wrapped_iterator() releases
self._stream_semaphore before cancelling/awaiting self._generation_task, which
lets the streaming slot free while the background generation keeps pulling
tokens; change the teardown to first cancel the background task (call
self._generation_task.cancel()), then await it while suppressing
asyncio.CancelledError using contextlib.suppress (import suppress from
contextlib), and only after the task has been cancelled/awaited release
self._stream_semaphore and call reset_request_id(token); ensure this ordering
covers both early consumer exit and _run_output_rails_in_streaming() early
returns.
In `@nemoguardrails/guardrails/model_engine.py`:
- Around line 318-321: The SSE parser in model_engine.py currently assumes
chunk.get("choices", [{}])[0] exists and will IndexError on events with an empty
choices list; change the code to retrieve choices = chunk.get("choices") and
skip/continue if not choices (or if len(choices) == 0) before accessing [0],
then extract delta = choices[0].get("delta", {}) and yield content as before.
Also add a regression test that feeds an SSE event with "choices": [] (the
include_usage case) into the parser to assert it does not raise and simply skips
that event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8a3dadd0-d15f-49e8-84ee-1f776e73e4ba
📒 Files selected for processing (6)
examples/configs/gs_content_safety/config/config.ymlnemoguardrails/guardrails/iorails.pynemoguardrails/guardrails/model_engine.pynemoguardrails/guardrails/model_manager.pytests/guardrails/test_iorails_streaming.pytests/guardrails/test_model_engine.py
|
@greptile check the latest commit and update summary and score |
|
@coderabbitai Re: Pre-merge checks. Re-check docstring coverage for latest commit. For "Test Results For Major Changes" see the |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chainThe PR description image shows a Jupyter notebook titled "IORails streaming integration test" serving as the Test Plan. It demonstrates:
This confirms end-to-end streaming and non-streaming functionality works with the content safety rails config. Now let me check docstring coverage on the latest commit's changed files. 🏁 Script executed: Length of output: 1668
🔍 Pre-Merge Checks — Updated
Summary of docstring coverage by file:
🧠 Learnings used |
|
@Pouyanpi , @cparisien , @trebedea This is ready for review |
Pouyanpi
left a comment
There was a problem hiding this comment.
Looks good 👍🏻 just some comments to review before merging. just opened an issue which seems necessary #1780
the only duplication is output-rails-in-streaming glue loop If we do extract, the shared helper would look like:
async def apply_output_rails_to_stream(
streaming_handler,
buffer_strategy,
stream_first: bool,
check_fn: Callable[[str], Awaitable[RailResult]],
) -> AsyncIterator[str]:no need to address in this PR we might do a follow up, but i don't see any need for that either.
…nc. Make logging and request ID generation symmetric over both
…s streaming output rails
Description
Adds output-rail streaming to IORails. Integrations into the API service, chat, etc via Guardrails top-level in follow-on PR #1766 . Key points:
from it
High-level overview of the change below:
Test Plan
Pre-commit
Unit-tests
Local integration test
20260406_iorails_streaming_integ_tests.ipynb
Checklist