[Client encryption] Adds streaming JSON processing support to feed iterators#5478
[Client encryption] Adds streaming JSON processing support to feed iterators#5478MartinSarkany wants to merge 103 commits into
Conversation
Introduces JsonProcessorPropertyBag to centralize handling of JsonProcessor overrides using RequestOptions.Properties. Updates encryption and decryption workflows to support processor selection, adds related tests and helpers, and refactors code to use the new override mechanism for both production and test scenarios.
Introduces CosmosDiagnosticsContext for lightweight diagnostics and scope tracking, with ActivitySource integration for telemetry. Updates EncryptionProcessor to record diagnostic scopes and enforce streaming mode restrictions. Adds comprehensive unit tests for diagnostics context and processor, and merges stream processor end-to-end tests into MdeCustomEncryptionTests.
Moved diagnostics scope creation for MDE decryption to cover the entire decryption logic in EncryptionProcessor. Updated tests to assert scope presence/absence more robustly and removed conditional compilation for ENCRYPTION_CUSTOM_PREVIEW. Project files now always define ENCRYPTION_CUSTOM_PREVIEW constant.
Introduces direct serialization to output streams via WriteToStream to reduce intermediate memory allocations. Refactors decryption logic to streamline processor selection and diagnostics scope naming, and adds EncryptionDiagnostics constants for improved diagnostics context management. Updates MDE encryption processor to support direct stream handling and processor selection for .NET 8+ preview builds.
Introduces TestEncryptorFactory to centralize and simplify the creation of mock Encryptor and DataEncryptionKey instances in tests, reducing repetitive Moq setup code. Updates all relevant test classes to use this factory. Refactors MdeEncryptionProcessor to improve fallback handling for legacy and unencrypted streams, ensuring correct stream positioning and graceful fallback to legacy decryption paths.
Replaces separate diagnostic prefixes for JSON processor selection with a unified prefix in EncryptionDiagnostics and updates all usages and tests accordingly. Adds documentation for the JSON processor property bag override and introduces new tests for large payloads, concurrency, and cancellation scenarios with the streaming JSON processor.
Adds validation to ensure that streaming encryption only processes JSON documents with an object as the root element. Root arrays and other root types are now explicitly rejected, improving contract enforcement and error handling.
Refactored root validation logic to ensure streaming encryption only accepts JSON documents with an object root. Added tests to verify rejection of root arrays and primitive values, improving contract enforcement and error messaging.
Expanded EncryptionProcessorDiagnosticsTests to cover diagnostics scope emission for various decryption paths, including MDE payloads, provided-output, stream override, malformed JSON, duplicate scope prevention, null input, cancellation, and encryption. Added a fake MDE DataEncryptionKey implementation and a SlowCancelableStream helper for cancellation tests. Conditional assertions ensure correct scope behavior for NET8 preview builds.
Introduces IMdeJsonProcessorAdapter and concrete adapters for Newtonsoft and Stream processors to unify JSON processing logic in MdeEncryptionProcessor. Diagnostic scope tracking for encryption and decryption is improved, and related tests are updated to verify scope tracking for both processors. Legacy logic for processor selection and property inspection is moved into the respective adapters for better maintainability.
Unifies and updates diagnostics scope naming for encryption and decryption selection, removing legacy/impl scopes and ensuring only one selection scope is emitted per operation. Refactors MDE processor logic to directly handle Newtonsoft and Stream processors, improving fallback and error handling. Adds comprehensive unit tests for NewtonsoftAdapter and StreamAdapter, covering encryption, decryption, legacy/unencrypted payloads, and diagnostics scope assertions.
…rypted stream used; sync request options with processor; add legacy override guard and fallback JObject decrypt
Added a test to verify that explicitly overriding the JSON processor with Newtonsoft maintains the expected encryption/decryption behavior. Also added assertions to change feed processor tests to ensure both documents are processed and the processor does not time out.
Changed expected diagnostics.Scopes.Count from 1 to 0 in multiple StreamAdapterTests to reflect updated behavior. Ensures tests align with current diagnostics tracking implementation.
Introduces EncryptionRequestOptionsExperimental for configuring the JSON processor pipeline via request options, including helper methods and tests. Refactors JsonProcessorPropertyBag for improved property handling and updates test helpers to use the new API.
Replaces static calls to RequestOptionsPropertiesExtensions with instance extension method calls on RequestOptions throughout the codebase and tests. This improves code readability and aligns with extension method usage patterns.
Simplifies and unifies decryption logic by removing preprocessor directives and consolidating JsonProcessor selection. Refactors MdeEncryptionProcessor to use switch expressions and helper methods for both stream and Newtonsoft processors, improving maintainability and clarity. Ensures consistent handling of legacy and MDE-encrypted documents, and introduces utility methods for stream position management and result writing.
| this.ResponseFactory, | ||
| this.Encryptor, | ||
| this.CosmosSerializer, | ||
| changeFeedRequestOptions?.GetJsonProcessor(this.DefaultJsonProcessor) ?? this.DefaultJsonProcessor); |
There was a problem hiding this comment.
nit: the last ?? this.DefaultJsonProcessor should not be needed, the extension method already returns the fallback processor if no override is found in the request options
…imeout (Azure#5689) ## Description ## DTX Commit Retry — Design ### Two-loop architecture DTX commits flow through two retry loops with distinct responsibilities: ``` DistributedTransactionCommitter.ExecuteCommitWithRetryAsync ← OUTER LOOP └─ clientContext.ProcessResourceOperationStreamAsync └─ AbstractRetryHandler (inner while-loop) └─ ClientRetryPolicy.ShouldRetryInternalAsync ← INNER LOOP ``` The coordinator returns **two response shapes** for the same HTTP status codes, which is why a single loop cannot own retries: | Shape | Status / sub-status | Body | Retriability signal | Owner | |---|---|---|---|---| | **Envelope failure** | 408, 449/5352, 500/5411-5413 | `Content == null` | implicit (status code) | Inner (CRP) | | **Semantic failure** | 408, 449/5352, 452 | JSON with per-op results | `"isRetriable": true` in body | Outer (committer) | | **Throttle** | 429/3200 | varies | `Retry-After` header | `ResourceThrottleRetryPolicy` | ### The amplification problem Without a routing rule, a 449/5352 carrying a body would be retried by **both** loops: inner CRP (10 attempts) × outer committer (10 attempts) = up to **100 wire requests per user call**. ### Options considered #### Option A — Status-code partition (outer-loop exclusion list) CRP retries its owned codes unconditionally. The committer adds an `IsOuterLoopRetriable()` helper that mirrors CRP's owned set and excludes them. - ❌ **Dual-maintenance hazard.** The owned-code set lives in two files. Any drift re-introduces the 10×10 amplification. - ❌ **Discards body data.** `AbstractRetryHandler` abandons the `ResponseMessage` without reading it on inner-loop retries. Per-op session tokens, ETags, and `ResourceStream` payloads are silently dropped on every CRP retry. - ❌ **Brittle to contract evolution.** If the coordinator starts attaching bodies to 449 in a future revision, CRP retries blindly and the `isRetriable` signal never reaches the committer. #### Option B — Body-presence deferral ✅ **(chosen)** Before CRP retries `{408, 449/5352, 429/3200}`, it checks `hasResponseBody = (responseMessage?.Content != null)`. If a body is present, CRP returns `NoRetry()` and defers to the outer loop, which parses `isRetriable` from that body. If no body, CRP retries immediately. - ✅ **Single source of truth** — one predicate, one file. - ✅ **Preserves per-op data** — body-bearing responses always reach `FromResponseMessageAsync → MergeSessionTokens`. - ✅ **Forward compatible** — new body-bearing codes route through `isRetriable` automatically with no SDK change. ### Retry budgets and delays (after this PR) | Code | Loop | Budget | Delay | |---|---|---|---| | 408, 449/5352 (no body) | Inner (CRP) | 10 | 1 s flat (honors `Retry-After`) | | 500/5411-5413 | Inner (CRP) | 10 | 100 ms → 5 s exponential + jitter | | 408, 449/5352, 452 (body + `isRetriable: true`) | Outer (committer) | 10 | 1 s → 32 s exponential + jitter | | 429/3200 | `ResourceThrottleRetryPolicy` | 9 (default) | `Retry-After` header | ### Other notable changes in this PR - **408 endpoint-marking guard.** Added `if (!isDtxRequest)` before `TryMarkEndpointUnavailableForPkRange`. DTX always goes through gateway; marking a partition endpoint unavailable on a coordinator 408 would skew routing for unrelated requests. - **Idempotency on retry.** `DistributedTransactionServerRequest` pre-serializes the body to `byte[]` and vends a fresh `MemoryStream` per attempt via `CreateBodyStream()`. The idempotency token header is re-applied by the `requestEnricher` closure on each attempt. Both inner and outer retries are safe. - **Body parser hardening.** `DistributedTransactionResponse` disposes the original `ResponseMessage` before constructing the synthetic 500 fallback, and emits a `DefaultTrace.TraceWarning` on `JsonException`. --- ## PR Summary This pull request introduces significant improvements to the retry logic for distributed transactions (DTX) in Cosmos DB, focusing on more robust, efficient, and nuanced handling of retriable error codes and infrastructure failures. The changes include new retry limits and backoff strategies specifically for DTX scenarios, improved differentiation of retry logic based on request type, and the introduction of jitter to avoid synchronized client retries. Additionally, the code is refactored to better support testing and configuration of retry behavior. **Distributed Transaction (DTX) Retry Logic Enhancements:** * Introduced new retry counters and configurable limits for DTX retries (`MaxDtxRetryCount`, `MaxDtxInfraFailureRetryCount`) and added request-type detection (`isDtxRequest`) to apply DTX-specific logic. [[1]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L26-R32) [[2]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4R43-R48) [[3]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4R229-R231) * Implemented DTX-specific handling in `ShouldRetryInternalAsync`, including distinct logic for various DTX error codes (e.g., 408, 449/5352, 429/3200, 500/5411-5413), with separate retry budgets and backoff strategies for infrastructure failures. * Added bounded exponential backoff with ±25% jitter for DTX infrastructure failures to prevent synchronized retries across clients. [[1]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L26-R32) [[2]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4R724-R748) * Ensured DTX retries are not mistakenly counted as endpoint failures for non-DTX traffic, preventing negative impact on routing. **General Retry and Committer Improvements:** * Updated `ShouldRetryAsync` and related methods to pass additional context (e.g., `RetryAfter`, response body presence) to internal retry logic, allowing more nuanced retry decisions. [[1]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L121-R131) [[2]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L135-R146) [[3]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4R185-R191) [[4]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L249-R269) * Refactored `DistributedTransactionCommitter` to allow configurable base delay and injected delay provider for improved testability and flexibility, and added thread-local jitter for retry delays. **Session Token Retry Policy Adjustments:** * Updated session token retry logic to clarify and simplify when to set the hub region processing header, and to stop retries after the correct number of attempts. [[1]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4R361-R368) [[2]](diffhunk://#diff-2b056512ca285b1d95e025e31f60345059fa92d958becc38f90a6fb54ce1bbb4L452-L474) These changes collectively make the retry policy for distributed transactions more resilient, efficient, and easier to maintain and test, especially in the face of transient infrastructure failures and complex error scenarios. ## Type of change Please delete options that are not relevant. - [] Bug fix (non-breaking change which fixes an issue) - [✓] New feature (non-breaking change which adds functionality) - [] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [] This change requires a documentation update ## Closing issues To automatically close an issue: closes #IssueNumber --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
|
Review findings:
|
All should be fixed now. |
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
kundadebdatta
left a comment
There was a problem hiding this comment.
Deep Review — [Client encryption] Adds streaming JSON processing support to feed iterators @ e09e0bd
Overall assessment: in good shape. The major concerns raised earlier by @juraj-blazek (depth‑1 guard for _ei/encrypted-path matching, decryptedStream disposal on FromStream failure, pooled-stream cascade through DecryptableFeedResponse) all hold up at head. No correctness blockers found in the streaming decrypt path itself.
Cross-PR interaction check: PRs #5780 / #5829 / #5870 / #5844 / #5920 all touch the core Microsoft.Azure.Cosmos client; this PR is entirely contained in Microsoft.Azure.Cosmos.Encryption.Custom. No file or surface overlap.
Findings below in severity order. None are merge-blockers; 1–3 are worth addressing before merge.
🟡 1. DecryptableFeedResponse.DisposeAsync aborts cascade on first item exception (buffer leak)
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/DecryptableFeedResponse.cs:87-96
if (this.Resource != null)
{
foreach (T item in this.Resource)
{
if (item is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
}
}If any one item''s DisposeAsync throws (e.g., a StreamDecryptableItem whose SemaphoreSlim.WaitAsync observes cancellation), the foreach is aborted and the remaining items never have their pooled buffers returned. The class''s own XML doc explicitly says callers "rely on this cascade to release those buffers" — that contract is silently violated.
Suggested fix: Capture per-item exceptions, dispose all items, then aggregate/rethrow:
List<Exception> errors = null;
foreach (T item in this.Resource)
{
if (item is IAsyncDisposable asyncDisposable)
{
try { await asyncDisposable.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { (errors ??= new()).Add(ex); }
}
}
if (errors != null) throw new AggregateException(errors);🟡 2. StreamDecryptableItem never disposes its SemaphoreSlim (handle leak)
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/StreamDecryptableItem.cs:30, 252-282
private readonly SemaphoreSlim asyncLock = new (1, 1);
...
public override async ValueTask DisposeAsync()
{
await this.asyncLock.WaitAsync().ConfigureAwait(false);
try { ... }
finally { this.asyncLock.Release(); }
GC.SuppressFinalize(this);
}SemaphoreSlim allocates a kernel ManualResetEvent lazily on first contended wait — that handle is only released by Dispose(). A query returning 1000 docs creates 1000 StreamDecryptableItems; under any contention these accumulate OS wait handles until GC.
Suggested fix: after the finally block, call this.asyncLock.Dispose();.
🟡 3. Over-broad NotSupportedException catch silently triggers fallback for unrelated errors
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionProcessor.cs:456-475
try
{
return await MdeEncryptionProcessor.DecryptJsonArrayStreamInPlaceAsync(
content, encryptor, CosmosDiagnosticsContext.Create(null), cancellationToken);
}
catch (NotSupportedException)
{
content.Position = 0;
return await DecryptJsonArrayNewtonsoftAsync(content, encryptor, cancellationToken);
}The catch is intended for the seekability check at StreamProcessor.Decryptor.cs:45-48. But NotSupportedException is also thrown from DecryptStreamAsync for EncryptionFormatVersion != Mde (StreamProcessor.Decryptor.cs:333) and from MdeEncryptor.GetAdapter (MdeEncryptionProcessor.cs:178) for an unsupported processor enum. Both would now be silently retried on Newtonsoft.
Suggested fix: Pre-validate seekability up-front and let the stream path''s exceptions propagate, or introduce a dedicated StreamNotSupportedException.
🟢 4. Fallback path is unreachable for its only legitimate trigger
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionProcessor.cs:471
catch (NotSupportedException)
{
content.Position = 0; // <- throws NotSupportedException on non-seekable streams
return await DecryptJsonArrayNewtonsoftAsync(content, encryptor, cancellationToken);
}The NotSupportedException this catch was designed to handle is raised exactly when !content.CanSeek. Setting Position = 0 on a non-seekable stream throws another NotSupportedException, masking the original and breaking the fallback. In practice Cosmos returns seekable MemoryStream content, so this never trips today, but the safety net doesn''t catch the case it advertises. Combined with finding 3 if you take the pre-validate approach this disappears.
🟢 5. _ei skip is fragile if the value isn''t a JSON object
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/Transformation/StreamProcessor.Decryptor.cs:469-477 (set) and :403-407 (reset)
else if (reader.ValueTextEquals(this.encryptionPropertiesNameBytes))
{
if (!reader.TrySkip())
{
isIgnoredBlock = true; // <- assumes the value is an object
}
break;
}
...
if (isIgnoredBlock && reader.CurrentDepth == 1 && tokenType == JsonTokenType.EndObject)
{
isIgnoredBlock = false; // <- only resets on EndObject
continue;
}The current serializer always emits _ei as a JSON object, but the reader makes no such guarantee. If a forged/corrupt response had _ei as an array or a primitive, isIgnoredBlock would never reset (no depth‑1 EndObject) and the remainder of the document would be silently dropped — including the closing }, which would then corrupt downstream JSON state.
Suggested fix: Validate that the next non-skipped token is StartObject before entering ignore mode, or also reset on EndArray/primitives at depth 1. Add a unit test covering _ei: [] and _ei: "foo".
🟢 6. StreamDecryptableItem caches the deserialized item as object — second GetItemAsync<U> cast throws opaquely
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/StreamDecryptableItem.cs:34-90
private object cachedItem;
...
if (this.isDecrypted)
{
return ((T)this.cachedItem, this.cachedDecryptionContext);
}After a successful GetItemAsync<Foo>(), the stream is disposed and only the boxed Foo remains. A later GetItemAsync<Bar>() does (Bar)this.cachedItem — throws InvalidCastException with no context. The previous DecryptableItemCore (Newtonsoft) didn''t have this restriction since it cached the JToken. Subtle behavioural divergence between the two processors.
Suggested fix: Store the first-materialized type and throw a descriptive exception on mismatch, or store enough state to re-deserialize.
🟢 7. Missing ConfigureAwait(false) on several awaits in netstandard2.0 paths
Files:
Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionFeedIterator.cs:42, 50Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionContainer.cs:1054, 1059Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionProcessor.cs:399, 401, 463, 467, 473
Several awaits lack ConfigureAwait(false) while the rest of the file/PR uses it. The library targets netstandard2.0 consumers (ASP.NET classic / WPF SynchronizationContext) where capturing the context can deadlock or cause unnecessary thread switches.
💬 8. Observation: UseStreamingJsonProcessingByDefault is a write-only, one-shot toggle with no synchronization story
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionContainer.cs:19, 1028-1031 + EncryptionContainerExtensions.cs:31-47
- Mutable state, no opposite method, no idempotency contract documented.
- Reads in
GetItemQueryIteratoretc. aren''t synchronized with the setter (enum writes are atomic in .NET so this is observably benign today, but the design is awkward). - The extension on
Containerreturns the sameContainerreference after mutation — easy to misuse (caller may assume a wrapping pattern).
Not a blocker. Consider an options/builder shape or constructor-injected default if revisiting the public API.
💬 9. Observation: TryExtractEncryptionProperties re-parses each captured object
File: Microsoft.Azure.Cosmos.Encryption.Custom/src/Transformation/StreamProcessor.Decryptor.cs:280-294, 173
For every document the streaming path first buffers the entire object, then JsonSerializer.Deserialize<EncryptionPropertiesWrapper>(...) just to check for _ei, then re-parses with Utf8JsonReader to do the actual transform. Two full passes per document on the supposedly "stream-fast" path. A single targeted pre-scan via Utf8JsonReader for the _ei property at depth 1 would avoid the second deserialize. Performance, not correctness — but a worthwhile follow-up.
Recommendation
No blocking findings. The two highest-impact items raised in the prior review thread are clearly addressed. Recommend tightening before merge:
- Finding 1 (cascade aborts on first failure) — small change, real leak.
- Finding 2 (SemaphoreSlim leak) — one-line fix.
- Finding 3 (overly-broad catch) — tighten to avoid silent fallback for unrelated errors.
Findings 4–9 are nice-to-haves and can land here or as a small follow-up. Otherwise approve-with-nits.
…litter throws mid-iteration ConvertResponseToDecryptableItemsStreamAsync builds a List<DecryptableItem> locally and only returns it after the JsonArrayStreamSplitter async-enumerator completes. If the splitter threw after yielding one or more documents (mid-feed transport error, cancellation, malformed payload past the first doc), every StreamDecryptableItem already added to the local list owned a PooledMemoryStream rented from ArrayPool<byte>.Shared. Because the partial list never reaches the caller and is never wrapped in a DecryptableFeedResponse, the disposal cascade addressed by an earlier round of review never sees those items. The rented buffers stay out of the pool (eventual GC reclamation only) and may retain plaintext residue. Wrap the foreach in try/catch, drain any partial items via IAsyncDisposable (swallowing per-item failures so we don't mask the original cause), then rethrow the original exception unchanged. Adds a regression test in EncryptionProcessorTests that feeds a ThrowAfterPrefixStream containing one complete document followed by an IOException, and asserts the original IOException identity is preserved. Also strengthens XML doc on UseStreamingJsonProcessingByDefault and DecryptableItem.DisposeAsync to call out the FeedResponse<DecryptableItem> disposal contract that stream-mode callers must follow (the cascade is real but the public type doesn't advertise IAsyncDisposable, so callers must cast). Adds the missing changelog entry for Azure#5478 under 1.0.0-preview09. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Picking this up while @MartinSarkany is on leave. New commit Adversarial-review verdict on the existing fixes (HEAD
One NEW issue this commit fixes (HIGH — orphan-leak narrower than #4 above):
The fix wraps the Also added the missing changelog entry for #5478 under Local validation: CI: The two failures on the previous build ( /azp run |
…ade, and stream-mode buffer cap Addresses second-pass review findings (HIGH/MEDIUM) on top of the orphan-leak fix already in 6e0d6d5. HIGH — StreamDecryptableItem loses the DEK id in EncryptionException Previously hard-coded dataEncryptionKeyId: string.Empty on every decryption failure, dropping the diagnostic surface customers use to correlate key-store / DEK-revocation failures. Newtonsoft DecryptableItemCore extracts _ei.DataEncryptionKeyId before throwing; stream mode now matches: 1. If decryption succeeded and serialization later threw, the DEK id is already in the in-flight DecryptionContext (most accurate source). 2. Else if contentStream is still readable, parse it out of _ei via the existing EncryptionPropertiesStreamReader (seekable-only path; resets Position to 0 on return so the subsequent encrypted-content read is unaffected). 3. Else fall back to string.Empty (matches the prior behavior and avoids re-throwing inside the catch — EncryptionException's ctor rejects null). HIGH — DecryptableFeedResponse.DisposeAsync aborts cascade on first throw The XML doc promises every pooled buffer is released, but a single throwing item silently stranded the rented buffers of every remaining item. The cascade is now best-effort: - Per-item DisposeAsync is wrapped in try/catch. - Failures are collected in a local list and surfaced AFTER the cascade. - Single failure: rethrown via ExceptionDispatchInfo.Capture so the original stack/identity is preserved (existing DisposeAsync_PropagatesExceptionFromItemDispose test still passes). - Multiple failures: surfaced as AggregateException. The isDisposed flag is also tightened to int + Interlocked.Exchange so concurrent disposers can't both pass the idempotency guard. MEDIUM — StreamProcessor per-document buffer grows unbounded JsonArrayStreamSplitter / JsonFeedStreamHelper already enforce a 64 MiB cap on growth; the per-document inner loop in StreamProcessor.Decryptor and StreamProcessor.Encryptor did not. A single malformed or maliciously-large encrypted property could drive the buffer to OOM rather than throwing a clean InvalidOperationException. Both inner loops now mirror the splitter cap and message-style. MEDIUM — JsonFeedStreamHelper.HandleLeftOver message clarified Trip condition is no-progress on the chunk (document or token doesn't fit), not strictly a token. Reworded accordingly. MEDIUM — UseStreamingJsonProcessingByDefault configure-once contract Added XML doc note on the EncryptionContainer instance method explaining the one-way, configure-before-use semantics and the undefined behavior of mutating it while iterators are in flight. (No code change — adding Interlocked here would be over-engineering for a configuration setter and could mislead callers into thinking mid-operation mutation is supported.) Tests - StreamDecryptableItemTests.GetItemAsync_WhenDecryptionFails_PopulatesDataEncryptionKeyIdOnException asserts EncryptionException.DataEncryptionKeyId == dekId after a forced decryption failure (extracted via EncryptionPropertiesStreamReader from _ei). - DecryptableFeedResponseTests.DisposeAsync_WhenItemThrows_StillDisposesRemainingItems asserts items before, between, and after a throwing item all see exactly one DisposeAsync call, and that the original InvalidOperationException identity is preserved (not wrapped in AggregateException for the single-failure case). - DecryptableFeedResponseTests.DisposeAsync_WhenMultipleItemsThrow_AggregatesAndStillDrains asserts that two throwing items surface as an AggregateException with both inner exceptions, and that the non-throwing item between them is still disposed. Local: net8.0 unit-test suite 692 passed / 0 failed (up from 689 after the three new regression tests). Build clean (0 warnings, 0 errors). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Second-pass review verdict (commit
Local validation: Status: PR HEAD is now /azp run |
…nts splitter test The test pins unusual semantics (yield from every Documents array seen when duplicated), which differs from the strict-JSON last-wins reading. The Cosmos gateway will never emit duplicate root-property keys, so the splitter is intentionally permissive rather than strict. Without this comment a future maintainer could read the test as accidentally exercising a bug and 'fix' it to last-wins, silently breaking the splitter's invariant that any object inside a Documents array is a document. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
H-4 and L-4 follow-up fixes pushed in commit
Validation: Status: PR HEAD is now /azp run |
Every existing Stream-variant decrypt test in StreamDecryptableItemTests encrypts with AEAes256CbcHmacSha256Randomized (legacy), which makes SystemTextJsonStreamAdapter.DecryptAsync throw NotSupportedException at its algorithm check and routes the request through the Newtonsoft legacy fallback (EncryptionProcessor.DecryptAsync with legacyFallback: true). Net effect: the MDE stream-decrypt path was never actually exercised by the StreamDecryptableItem suite, even though tests appeared to cover it. GetItemAsync_StreamMode_WithMdeAlgorithm_TakesGenuineMdeStreamPath encrypts with MdeAeadAes256CbcHmac256Randomized via TestEncryptorFactory.CreateMde (the only mock that supplies the DataEncryptionKey methods the MDE stream encryptor needs: EncryptByteCount / DecryptByteCount / EncryptData / DecryptData), then asserts via an ActivityListener on the 'Microsoft.Azure.Cosmos.Encryption.Custom' source that the ScopeDecryptModeSelectionPrefix+JsonProcessor.Stream scope WAS created AND the ScopeDecryptModeSelectionPrefix+JsonProcessor.Newtonsoft scope was NOT created. Together with round-trip equality assertions, this locks in 'we actually take the MDE/Stream branch', not the legacy fallback the existing tests silently exercised. Test-only; no shipped src changes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
b62e01e to
5cecb2b
Compare
|
Reverting L-4 (the Where CT could observe cancellation inside
Where CT cannot fire (the bulk of the work):
For the typical case (page already fetched, DEK cached, small-to-medium documents) Cost vs benefit: API-surface expansion + breaking change for external Reverted L-4 in commit Validation: Final PR HEAD: |
Pull Request Template
Description
What this PR does
Adds stream-mode JSON processing for encryption feed iterators (query, LINQ, change-feed) to
Microsoft.Azure.Cosmos.Encryption.Custom. Consumers opt in per-call viaRequestOptionsor per-container viaContainer.UseStreamingJsonProcessingByDefault(). Default remains Newtonsoft — existing callers see no behavioral change.New public API
EncryptionContainerExtensions.UseStreamingJsonProcessingByDefault(Container)— opt-in to stream mode per containerDecryptableItem.DisposeAsync()—DecryptableItemis nowIAsyncDisposablesoStreamDecryptableItemcan release its pooled bufferNo public API removals. No behavioral changes to existing surface.
Dependency changes
Microsoft.IO.RecyclableMemoryStream 3.0.1Microsoft.Azure.Cosmos.Encryption.Custom.Performance.TestsPooledMemoryStreamSystem.Text.RegularExpressions 4.3.1Microsoft.Azure.Cosmos.Encryption.CustomNo additions.
#4678
Type of change
Please delete options that are not relevant.