fix(presidio): handle Anthropic native SSE bytes in streaming unmask hook#22882
fix(presidio): handle Anthropic native SSE bytes in streaming unmask hook#22882firestaerter3 wants to merge 9 commits intoBerriAI:mainfrom
Conversation
…put_response() All concrete implementations of process_output_response() already accept a request_data keyword argument, but the abstract base class (BaseTranslation) did not declare it, causing a type/mypy inconsistency. Add request_data: Optional[dict] = None to the abstract signature so the interface matches all implementations. Part of BerriAI#22821 — Presidio guardrail: end-to-end PII masking broken with Anthropic native API.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Roland seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Greptile SummaryThis PR fixes two related issues blocking PII unmasking on the Anthropic native streaming path: (1) the streaming hook now detects raw Key observations:
Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/guardrails/guardrail_hooks/presidio.py | Core streaming hook change: adds Anthropic native SSE bytes handling for both masking and unmasking paths. The unmasking path correctly parses SSE events, reassembles text deltas, calls _unmask_pii_text, and rebuilds the stream in original order — but the rebuilt text_delta_event includes a trailing \n that produces three newlines when the \n\n event terminator is appended, violating the SSE spec. The apply_to_output=True bytes-only path silently skips masking (only a WARNING is logged), which is a behavior change not covered by the PR description. |
| litellm/llms/anthropic/chat/guardrail_translation/handler.py | Adds request_data: Optional[dict] = None parameter to process_output_response() and merges it into effective_request_data so pii_tokens forwarded from the input masking phase are preserved and passed to apply_guardrail(). |
| litellm/llms/base_llm/guardrail_translation/base_translation.py | Abstract base updated to declare request_data: Optional[dict] = None on both process_output_response() and process_output_streaming_response(), keeping the interface in sync with all implementing subclasses. |
| litellm/proxy/guardrails/guardrail_hooks/unified_guardrail/unified_guardrail.py | Threads the full request_data (or data) dict through to each process_output_response() and process_output_streaming_response() call so downstream handlers receive pii_tokens without any other side effects. |
| litellm/llms/openai/chat/guardrail_translation/handler.py | Mirrors the Anthropic handler change: adds request_data parameter, renames local dict to effective_request_data, and merges caller data before passing to apply_guardrail(). Both the non-streaming and streaming output paths are updated. |
| tests/guardrails_tests/test_presidio_sse_unmask.py | Four new mock-only unit tests covering: (1) process_output_response() preserves pii_tokens from caller request_data, (2) streaming hook unmasks PII in Anthropic native SSE bytes, (3) pass-through when output_parse_pii=False, (4) token reassembly across split delta events. All tests use mocks with no real network calls, satisfying the project rule. Uses __new__ to bypass __init__ which is fragile but works given the limited attribute access in the tested paths. |
| tests/guardrails_tests/test_guardrail_request_data_passthrough.py | Two introspection-based tests verifying that BaseTranslation and a representative set of concrete handlers all declare the request_data parameter with a None default, guarding against signature drift. |
Sequence Diagram
sequenceDiagram
participant Client
participant UnifiedGuardrail
participant PresidioHook
participant AnthropicHandler
participant Anthropic
Client->>UnifiedGuardrail: async_post_call_streaming_iterator_hook(request_data)
UnifiedGuardrail->>PresidioHook: async_post_call_streaming_iterator_hook(response, request_data)
Note over PresidioHook: output_parse_pii=True, pii_tokens in request_data
PresidioHook->>PresidioHook: collect bytes chunks (Anthropic native SSE)
Note over PresidioHook: remaining_bytes_chunks populated
PresidioHook->>PresidioHook: split on \\n\\n → raw_events
PresidioHook->>PresidioHook: classify events (text_delta vs non-text)
PresidioHook->>PresidioHook: _unmask_pii_text(combined_text, pii_tokens)
PresidioHook->>PresidioHook: rebuild merged event at original slot position
PresidioHook-->>Client: yield event + b"\\n\\n" (per event)
Client->>UnifiedGuardrail: async_post_call_success_hook(response, request_data)
UnifiedGuardrail->>AnthropicHandler: process_output_response(response, guardrail, request_data)
Note over AnthropicHandler: effective_request_data = {**request_data, "response": response}
AnthropicHandler->>PresidioHook: apply_guardrail(inputs, effective_request_data)
PresidioHook-->>AnthropicHandler: guardrailed_inputs (pii_tokens preserved)
AnthropicHandler-->>UnifiedGuardrail: updated response
Comments Outside Diff (1)
-
litellm/proxy/guardrails/guardrail_hooks/presidio.py, line 1229-1232 (link)Malformed SSE output — extra trailing newline in rebuilt
text_delta_eventtext_delta_eventis constructed with a trailing\nafter thedata:line. When it is later yielded viayield event + b"\n\n", the resulting byte stream has three consecutive newlines (\n\n\n) instead of the required two (\n\n) that terminate an SSE event.Non-text events don't have this problem because they arrive from
combined.split(b"\n\n")and therefore carry no trailing newline before the\n\nis appended.A strict SSE parser will correctly terminate the event at the first blank line, but will then see an extra empty "event" from the residual third newline. While most consumers silently discard empty events, this is a protocol violation that can trip up spec-compliant clients.
Last reviewed commit: 8b1eb08
| b"event: content_block_delta\n" | ||
| b"data: " + json.dumps(text_delta_payload).encode() + b"\n" | ||
| ) | ||
| rebuilt = b"\n\n".join(non_text_events + [text_delta_event]) + b"\n\n" |
There was a problem hiding this comment.
Broken SSE event ordering — protocol events emitted before text content
The rebuilt SSE byte stream is assembled as non_text_events + [text_delta_event]. However, a real Anthropic SSE stream delivers lifecycle events that come after text deltas (i.e., content_block_stop, message_delta, message_stop) in that same non_text_events list. Placing them all before the new text_delta_event breaks the Anthropic SSE protocol order for any consumer that follows the spec.
Example: for a normal stream content_block_start → content_block_delta → content_block_stop → message_delta → message_stop, the rebuilt stream becomes:
content_block_start ← correct
content_block_stop ← wrong: comes before the text delta
message_delta ← wrong: comes before the text delta
message_stop ← wrong: comes before the text delta
content_block_delta ← actual text, after "stop" events
A correct approach would preserve the original stream order, inserting the merged unmasked content_block_delta in the position of the first text_delta event, and dropping subsequent text_delta events:
# Instead of splitting into text/non-text buckets, preserve event order
rebuilt_events: List[bytes] = []
text_injected = False
for raw_event in raw_events:
# ... parse as before ...
if is_text_delta:
if not text_injected:
rebuilt_events.append(merged_text_delta_event)
text_injected = True
# else: skip additional text_delta events (merged above)
else:
rebuilt_events.append(raw_event)| "index": 0, | ||
| "delta": {"type": "text_delta", "text": unmasked_text}, | ||
| } |
There was a problem hiding this comment.
Hardcoded index: 0 loses the original content block index
The rebuilt text_delta_payload always uses "index": 0. In responses that contain multiple content blocks (e.g., extended thinking at index 0 and actual text at index 1), all collected text parts are re-emitted at index 0, misreporting the block identity.
The correct index should be tracked from the original text_delta events — all text deltas are expected to share the same block index, so the index from the first text_delta event can be reused:
# Track the original index of the text content block
text_block_index = 0 # default fallback
for raw_event in raw_events:
# ... parse as before ...
if is_text_delta:
text_block_index = payload.get("index", 0) # capture once
text_parts.append(delta.get("text", ""))
text_delta_payload = {
"type": "content_block_delta",
"index": text_block_index, # use original index
"delta": {"type": "text_delta", "text": unmasked_text},
}| # delivery at response completion when output_parse_pii is enabled. | ||
| # This is unavoidable for correct PII token restoration because tokens | ||
| # may span multiple delta events. | ||
| if remaining_bytes_chunks and not remaining_chunks: |
There was a problem hiding this comment.
Mixed-type chunks — bytes chunks are silently dropped
The branch condition if remaining_bytes_chunks and not remaining_chunks: only enters the Anthropic native SSE path when all chunks are bytes. If a stream happens to contain both ModelResponseStream and bytes chunks (e.g., a wrapped or middleware-modified stream), the bytes chunks are silently discarded — they are collected into remaining_bytes_chunks (line 1119) but are never yielded or processed, because the code falls through to the ModelResponseStream path.
The exception handler (lines 1223–1224) does yield both types, confirming mixed streams were considered possible. The normal path should also handle them:
if remaining_bytes_chunks and not remaining_chunks:
# ... Anthropic native SSE path ...
elif remaining_bytes_chunks and remaining_chunks:
# Mixed stream — unexpected, log and pass bytes through
verbose_proxy_logger.warning(
"Unexpected mixed bytes+ModelResponseStream chunks; bytes chunks passed through unmodified"
)
for chunk in remaining_bytes_chunks:
yield chunk # type: ignore[misc]
# Continue to handle ModelResponseStream chunks below…handlers The abstract base was updated in the first commit, but two things were missing: 1. unified_guardrail.py did not pass request_data=data at the call site 2. All 13 concrete handler signatures lacked request_data: Optional[dict] = None, which would cause TypeError if the call site were corrected Also fix a pytest.skip() inside a for-loop bug in the test — use continue instead so the loop processes all modules rather than terminating on the first ImportError. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix variable shadowing in 6 process_output_response handlers: merge incoming request_data instead of replacing it with a new dict, so upstream context (e.g. pii_tokens) is not discarded. - Add request_data param to process_output_streaming_response in base class and concrete handlers (openai/chat, a2a/chat); propagate from unified_guardrail.py call sites and through internal process_output_response delegate call in openai/chat streaming path. - Add docstring for request_data param in base class abstract method. - Strengthen test: add validated_count guard so the test cannot pass vacuously if all handler imports fail. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hook When the LLM is called via the Anthropic native API path (not OpenAI compat), streaming chunks are raw SSE bytes rather than ModelResponseStream objects. stream_chunk_builder() cannot handle bytes, so PII tokens embedded in the response text were never unmasked. Additionally, AnthropicMessagesHandler.process_output_response() constructed a fresh request_data dict, discarding any pii_tokens stored during input masking. Changes (Patches 3, 6a, 6b): presidio.py — async_post_call_streaming_iterator_hook (Patch 3): - Detect bytes chunks alongside ModelResponseStream in the collection loop - Add Anthropic native SSE path: buffer all bytes, parse SSE events, concatenate text_delta texts, apply _unmask_pii_text() in one pass (handles tokens split across multiple delta events), rebuild SSE bytes with unmasked text, yield as a single chunk - Extend the exception fallback to replay bytes chunks on error anthropic/chat/guardrail_translation/handler.py (Patches 6a + 6b): - Accept request_data: Optional[dict] = None in process_output_response() - Merge caller request_data with the local response dict so pii_tokens forwarded by unified_guardrail (via PR 1) are preserved for apply_guardrail() Trade-off: when output_parse_pii is enabled, progressive streaming is replaced by a single delivery at response completion. This is unavoidable because PII tokens may span multiple delta events and must be restored from the full text. Depends on fix/guardrail-request-data-passthrough for pii_tokens plumbing. Part of BerriAI#22821. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix broken SSE event ordering: the previous implementation collected all non-text events and appended the merged text_delta at the end, placing message_stop before text content and violating the Anthropic SSE protocol. The new slot-based approach inserts the merged text_delta at the position of the FIRST original text_delta event, preserving correct protocol order. - Fix hardcoded index:0 in rebuilt text_delta event: capture the `index` value from the original text_delta payloads and use it in the rebuilt event. - Add warning log when both bytes and ModelResponseStream chunks are present in the same stream (bytes will be discarded in that case), making the silent discard visible for diagnostics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
8e8a0fa to
914ca4f
Compare
In the mixed bytes+model chunk case, pass the bytes chunks through to the caller unmodified rather than silently discarding them. Also update the warning message to reflect the new behaviour. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
presidio.py: - Lock onto first text content-block index to avoid merging text from distinct content blocks (e.g. thinking block at index 0 + reply at index 1 would previously be concatenated incorrectly) - Yield each rebuilt SSE event individually instead of one large chunk to preserve chunked-transfer framing for downstream consumers handler files (anthropic, openai/chat, openai/completion, openai/responses, openai/transcriptions, a2a): - Rename local `request_data: dict = ...` rebinding to `effective_request_data` to avoid mypy "Name already defined" errors caused by re-annotating the Optional[dict] parameter with a narrower type in the same scope
masking path: collect bytes chunks alongside ModelResponseStream; when only bytes arrive (Anthropic native SSE), pass them through unmodified with a warning instead of silently dropping the entire stream. test: fix Test 2 chunk count assertion — hook yields one chunk per SSE event (text_delta + message_stop = 2 chunks, not 1); assert on chunks[0] for the text_delta content. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ming - Add explicit mixed bytes+model chunk handling in the apply_to_output=True masking path; bytes chunks are now yielded unmodified (with a warning) before model chunks are processed through stream_chunk_builder, matching the symmetric handling already present in the unmask path - Remove spurious trailing \n from _make_sse_bytes test helper data lines; the \n\n event separator was creating triple newlines, violating SSE spec Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Superseded by #23037 — a single clean PR covering the full Presidio PII round-trip fix end-to-end (position bug, cross-instance token loss, apply_guardrail unmask path, Anthropic native dict + SSE streaming). Closing in favour of that PR. |
Problem
Two related issues prevent PII unmasking from working on the Anthropic native
streaming path:
1. Wrong chunk type in streaming hook
async_post_call_streaming_iterator_hookassembles chunks withstream_chunk_builder(), which expectsModelResponseStreamobjects. With theAnthropic native API path, chunks are raw SSE
bytes. The hook silentlyyields them unprocessed — PII tokens in the response text are never restored.
2.
pii_tokensdiscarded in non-streaming output handlerAnthropicMessagesHandler.process_output_response()builds a fresh localrequest_data = {"response": response}, discarding anypii_tokenstheunified guardrail forwarded from the input masking phase (via #22879).
Part of #22821.
Approach and trade-off
PII unmasking requires the complete response text before tokens can be
restored — token strings may span multiple SSE delta events. This
implementation buffers all chunks, detects the format (
bytesvsModelResponseStream), and for Anthropic native SSE:\n\n)text_deltatexts_unmask_pii_text()in one passcontent_block_deltacontaining the fully unmasked text
Known trade-off: when
output_parse_piiis enabled, progressive streamingoutput is replaced by a single delivery at response completion. This is
unavoidable for correct PII token restoration.
Changes
presidio.py— streaming hook (Patch 3)byteschunks alongsideModelResponseStreamin the accumulation loopanthropic/chat/guardrail_translation/handler.py(Patches 6a + 6b)request_data: Optional[dict] = Noneinprocess_output_response()request_data = {**(request_data or {}), "response": response}so
pii_tokensforwarded by the unified guardrail are preservedDependency
Requires #22879 (
fix/guardrail-request-data-passthrough) forpii_tokensplumbing through the unified guardrail layer.
Testing
tests/guardrails_tests/test_presidio_sse_unmask.py— four tests:process_output_response()merges callerrequest_dataincludingpii_tokensoutput_parse_pii=False