Skip to content

Commit 6f56e40

Browse files
CrossRegionHedgingAvailabilityStrategy: Fixes ArgumentNullException race condition in hedging cancellation (#5613)
## CrossRegionHedgingAvailabilityStrategy: Fixes `ArgumentNullException` race condition in hedging cancellation ### Bug Multiple production customers reported unobserved `ArgumentNullException: Value cannot be null. (Parameter 'request')` crashes originating from `CrossRegionHedgingAvailabilityStrategy.RequestSenderAndResultCheckAsync`. The exception was surfaced as a `TaskScheduler_UnobservedTaskException`, crashing the process. The affected code paths were: - `ContainerCore.ReadItemAsync` → `ReadItemStreamAsync` → `ProcessItemStreamAsync` → `RequestInvokerHandler.SendAsync` → `CrossRegionHedgingAvailabilityStrategy.ExecuteAvailabilityStrategyAsync` ### Root Cause A **race condition** caused by passing the wrong `CancellationToken` to the sender delegate: 1. `ExecuteAvailabilityStrategyAsync` creates a `hedgeRequestsCancellationTokenSource` (linked to the app-provided CT) to coordinate hedge request lifecycle. 2. `CloneAndSendAsync` clones the request inside a `using` block and calls `RequestSenderAndResultCheckAsync`. 3. **Bug:** `RequestSenderAndResultCheckAsync` called `sender.Invoke(request, cancellationToken)` with the **application-provided `CancellationToken`** — not `hedgeRequestsCancellationTokenSource.Token`. 4. When hedge Region B returned a final result (e.g., 200 OK), `hedgeRequestsCancellationTokenSource.Cancel()` was called. 5. **But the in-flight sender for Region A still held the app CT** (e.g., `CancellationToken.None`), which was **never cancelled**. 6. The `CloneAndSendAsync` `using` block exited, **disposing the cloned request**. 7. The Region A sender continued executing with a reference to the now-disposed request → **`ArgumentNullException: Value cannot be null. (Parameter 'request')`**. A secondary issue: when the application CT was cancelled (e2e timeout), the hedge timer (linked to app CT) would fire, and the old code would blindly continue the loop attempting to clone and send new requests on a cancelled path. ### Fix Two changes in `CrossRegionHedgingAvailabilityStrategy.cs`: **1. Pass `hedgeRequestsCancellationTokenSource.Token` to `sender.Invoke()` instead of the app CT** This ensures that when **any** hedge gets a final result and calls `hedgeRequestsCancellationTokenSource.Cancel()`, **all** in-flight senders immediately see their CT cancelled and stop before the cloned request is disposed. The `CancellationTokenSource` and `CancellationToken` parameters were also consolidated into a single `hedgeRequestsCancellationTokenSource` parameter passed through `CloneAndSendAsync` → `RequestSenderAndResultCheckAsync`. **2. Add `do/while` loop to handle spurious timer completions on app CT cancellation** When the app CT is cancelled (e2e timeout), the hedge timer fires via the linked CTS. The old code would `continue` the loop and try to clone a new request. The `do/while` loop now detects `applicationProvidedCancellationToken.IsCancellationRequested` and falls through to consolidate existing request outcomes instead of spawning new hedges. ### Tests Added (8 new unit tests) | Test | Validates | |---|---| | `HedgeCancellationCancelsInFlightRequests_NoNullRef` | Slow primary request's CT is cancelled when a hedge returns a final result — core regression test | | `SenderReceivesHedgeCancellationToken_NotAppToken` | Captures the actual CT passed to each sender and asserts all are from the hedge CTS, not the app CT | | `AppCancellationDuringHedging_DoesNotSpawnNewHedgeRequests` | E2e timeout (app CT cancelled) does not spawn new hedge requests — validates the do/while loop fix | | `MultiRegionHedging_RequestNotAccessedAfterDisposal` | Verifies the cloned request is still accessible when cancellation fires — exact scenario from the crash reports | | `HedgeCancellation_StreamRequest_NoNullRef` | Tests the stream-based code path (ReadItemStreamAsync) from the NullRef2/NullRef3 stack traces | | `PrimaryRequestFinalResult_NoAdditionalHedgesSent` | Fast primary response skips hedging entirely | | `AllHedgesTransientError_ReturnsLastResponse` | All regions return transient errors — strategy returns last response without NullRef | | `ConcurrentHedgingRequests_NoNullRef` | Stress test: 50 concurrent hedging requests with random delays — no NullRef under concurrency | ### Type of change - [x] Bug fix (non-breaking change which fixes an issue) --------- Co-authored-by: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com>
1 parent f9d76eb commit 6f56e40

2 files changed

Lines changed: 601 additions & 25 deletions

File tree

Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -125,23 +125,24 @@ internal bool ShouldHedge(RequestMessage request, CosmosClient client)
125125
/// <param name="sender"></param>
126126
/// <param name="client"></param>
127127
/// <param name="request"></param>
128-
/// <param name="cancellationToken"></param>
128+
/// <param name="applicationProvidedCancellationToken"></param>
129129
/// <returns>The response after executing cross region hedging</returns>
130130
internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
131131
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
132132
CosmosClient client,
133133
RequestMessage request,
134-
CancellationToken cancellationToken)
134+
CancellationToken applicationProvidedCancellationToken)
135135
{
136136
if (!this.ShouldHedge(request, client)
137137
|| client.DocumentClient.GlobalEndpointManager.ReadEndpoints.Count == 1)
138138
{
139-
return await sender(request, cancellationToken);
139+
return await sender(request, applicationProvidedCancellationToken);
140140
}
141141

142142
ITrace trace = request.Trace;
143143

144-
using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
144+
using (CancellationTokenSource hedgeRequestsCancellationTokenSource =
145+
CancellationTokenSource.CreateLinkedTokenSource(applicationProvidedCancellationToken))
145146
{
146147
using (CloneableStream clonedBody = (CloneableStream)(request.Content == null
147148
? null
@@ -161,7 +162,7 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
161162
{
162163
TimeSpan awaitTime = requestNumber == 0 ? this.Threshold : this.ThresholdStep;
163164

164-
using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
165+
using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(applicationProvidedCancellationToken))
165166
{
166167
CancellationToken timerToken = timerTokenSource.Token;
167168
using (Task hedgeTimer = Task.Delay(awaitTime, timerToken))
@@ -173,32 +174,50 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
173174
hedgeRegions: hedgeRegions,
174175
requestNumber: requestNumber,
175176
trace: trace,
176-
cancellationToken: cancellationToken,
177-
cancellationTokenSource: cancellationTokenSource);
177+
hedgeRequestsCancellationTokenSource: hedgeRequestsCancellationTokenSource);
178178

179179
requestTasks.Add(requestTask);
180180
requestTasks.Add(hedgeTimer);
181181

182-
Task completedTask = await Task.WhenAny(requestTasks);
183-
requestTasks.Remove(completedTask);
182+
Task completedTask;
183+
do
184+
{
185+
completedTask = await Task.WhenAny(requestTasks);
186+
requestTasks.Remove(completedTask);
187+
}
188+
while (
189+
completedTask == hedgeTimer &&
190+
// Ignore hedge timer signals if either the e2e timeout is hit
191+
// or the hedgeTimer task failed (or more commonly since this is a linked CTS was cancelled)
192+
// in both of these cases we do not want to spawn new hedge requests
193+
// but just consolidate the outcome of previous requests
194+
(!completedTask.IsCompleted || applicationProvidedCancellationToken.IsCancellationRequested));
184195

185196
if (completedTask == hedgeTimer)
186197
{
187198
continue;
188199
}
189200

190-
timerTokenSource.Cancel();
191201
requestTasks.Remove(hedgeTimer);
202+
timerTokenSource.Cancel();
192203

193-
if (completedTask.IsFaulted)
204+
if (completedTask.IsFaulted || completedTask.IsCanceled)
194205
{
195-
AggregateException innerExceptions = completedTask.Exception.Flatten();
206+
requestTasks.Remove(hedgeTimer);
207+
timerTokenSource.Cancel();
208+
209+
if (applicationProvidedCancellationToken.IsCancellationRequested)
210+
{
211+
await (Task<HedgingResponse>)completedTask;
212+
}
213+
214+
continue;
196215
}
197216

198217
hedgeResponse = await (Task<HedgingResponse>)completedTask;
199218
if (hedgeResponse.IsNonTransient)
200219
{
201-
cancellationTokenSource.Cancel();
220+
hedgeRequestsCancellationTokenSource.Cancel();
202221

203222
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
204223
HedgeConfig,
@@ -227,12 +246,19 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
227246
{
228247
AggregateException innerExceptions = completedTask.Exception.Flatten();
229248
lastException = innerExceptions.InnerExceptions.FirstOrDefault();
249+
continue;
250+
}
251+
252+
if (completedTask.IsCanceled)
253+
{
254+
lastException = new OperationCanceledException();
255+
continue;
230256
}
231257

232258
hedgeResponse = await (Task<HedgingResponse>)completedTask;
233259
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
234260
{
235-
cancellationTokenSource.Cancel();
261+
hedgeRequestsCancellationTokenSource.Cancel();
236262
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
237263
HedgeConfig,
238264
this.HedgeConfigText);
@@ -251,7 +277,16 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
251277
throw lastException;
252278
}
253279

254-
Debug.Assert(hedgeResponse != null);
280+
if (hedgeResponse == null)
281+
{
282+
if (applicationProvidedCancellationToken.IsCancellationRequested)
283+
{
284+
throw new CosmosOperationCanceledException(new OperationCanceledException(), trace);
285+
}
286+
287+
throw new InvalidOperationException("Cross-region hedging completed without producing a response.");
288+
}
289+
255290
return hedgeResponse.ResponseMessage;
256291
}
257292
}
@@ -264,8 +299,7 @@ private async Task<HedgingResponse> CloneAndSendAsync(
264299
IReadOnlyCollection<string> hedgeRegions,
265300
int requestNumber,
266301
ITrace trace,
267-
CancellationToken cancellationToken,
268-
CancellationTokenSource cancellationTokenSource)
302+
CancellationTokenSource hedgeRequestsCancellationTokenSource)
269303
{
270304
RequestMessage clonedRequest;
271305

@@ -287,8 +321,7 @@ private async Task<HedgingResponse> CloneAndSendAsync(
287321
sender,
288322
clonedRequest,
289323
hedgeRegions.ElementAt(requestNumber),
290-
cancellationToken,
291-
cancellationTokenSource,
324+
hedgeRequestsCancellationTokenSource,
292325
trace);
293326
}
294327
}
@@ -297,27 +330,30 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
297330
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
298331
RequestMessage request,
299332
string targetRegionName,
300-
CancellationToken cancellationToken,
301-
CancellationTokenSource cancellationTokenSource,
333+
CancellationTokenSource hedgeRequestsCancellationTokenSource,
302334
ITrace trace)
303335
{
304336
try
305337
{
306-
ResponseMessage response = await sender.Invoke(request, cancellationToken);
338+
ResponseMessage response = await sender.Invoke(request, hedgeRequestsCancellationTokenSource.Token);
307339
if (IsFinalResult((int)response.StatusCode, (int)response.Headers.SubStatusCode))
308340
{
309-
if (!cancellationToken.IsCancellationRequested)
341+
if (!hedgeRequestsCancellationTokenSource.IsCancellationRequested)
310342
{
311-
cancellationTokenSource.Cancel();
343+
// App has not reached e2e timeout - we can cancel any still remaining
344+
// hedge requests since we have a final response now
345+
hedgeRequestsCancellationTokenSource.Cancel();
312346
}
313347

314348
return new HedgingResponse(true, response, targetRegionName);
315349
}
316350

317351
return new HedgingResponse(false, response, targetRegionName);
318352
}
319-
catch (OperationCanceledException oce) when (cancellationTokenSource.IsCancellationRequested)
353+
catch (OperationCanceledException oce) when (hedgeRequestsCancellationTokenSource.IsCancellationRequested)
320354
{
355+
// hedgeRequestsCancellationTokenSource is a linked cancellation token source - so, would also signal
356+
// cancellation on e2e timeout via app provided CT
321357
throw new CosmosOperationCanceledException(oce, trace);
322358
}
323359
catch (Exception ex)

0 commit comments

Comments
 (0)