Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
CancellationToken cancellationToken)
{
this.retryContext = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undo this file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do. its just draft PR I will not include it in the direct package PR>


ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode);
Expand All @@ -180,6 +179,7 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}

return await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);

}

/// <summary>
Expand Down
98 changes: 74 additions & 24 deletions Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ namespace Microsoft.Azure.Documents
{
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.Collections;

//=================================================================================================================
// Strong read logic:
Expand Down Expand Up @@ -110,13 +113,26 @@ public async Task<StoreResponse> ReadStrongAsync(
this.authorizationTokenProvider,
secondaryQuorumReadResult.SelectedLsn,
secondaryQuorumReadResult.GlobalCommittedSelectedLsn);
if (await this.WaitForReadBarrierAsync(
barrierRequest,
allowPrimary: true,
readQuorum: readQuorumValue,
readBarrierLsn: secondaryQuorumReadResult.SelectedLsn,
targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn,
readMode: readMode))
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please check the QuorumWrite paths also once.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 429 errors and implement the same there?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) = await this.WaitForReadBarrierAsync(
barrierRequest,
allowPrimary: true,
readQuorum: readQuorumValue,
readBarrierLsn: secondaryQuorumReadResult.SelectedLsn,
targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn,
readMode: readMode);

if (isThrottled && throttledResponse != null)
{
// Handle throttling by delegating to ResourceThrottleRetryPolicy
DefaultTrace.TraceWarning("ReadStrongAsync: All replicas returned 429 Too Many Requests. Delegating to ResourceThrottleRetryPolicy.");

// Return the real 429 response upstream
return throttledResponse;

}

if (isSuccess)
{
return secondaryQuorumReadResult.GetResponseAndSkipStoreResultDispose();
}
Expand Down Expand Up @@ -256,7 +272,6 @@ private async Task<ReadQuorumResult> ReadQuorumAsync(
useSessionToken: false,
readMode: readMode));
IList<ReferenceCountedDisposable<StoreResult>> responseResult = disposableResponseResult.Value;

responsesForLogging = new StoreResult[responseResult.Count];
for (int i = 0; i < responseResult.Count; i++)
{
Expand Down Expand Up @@ -312,7 +327,15 @@ private async Task<ReadQuorumResult> ReadQuorumAsync(

// ReadBarrier required
DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(entity, this.authorizationTokenProvider, readLsn, globalCommittedLSN);
if (!await this.WaitForReadBarrierAsync(barrierRequest, false, readQuorum, readLsn, globalCommittedLSN, readMode))
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
(bool isSuccess, bool isThrottled, StoreResponse? throttledRespons) = await this.WaitForReadBarrierAsync(
barrierRequest,
false,
readQuorum,
readLsn,
globalCommittedLSN,
readMode);
if(!isSuccess)
{
return new ReadQuorumResult(
entity.RequestContext.RequestChargeTracker,
Expand Down Expand Up @@ -353,6 +376,17 @@ private async Task<ReadPrimaryResult> ReadPrimaryAsync(
requiresValidLsn: true,
useSessionToken: useSessionToken);
StoreResult storeResult = disposableStoreResult.Target;
if (storeResult.StatusCode == StatusCodes.TooManyRequests)
{
// Let ResourceThrottleRetryPolicy handle 429
// Instead of throwing an exception, return a result that indicates throttling
return new ReadPrimaryResult(
requestChargeTracker: entity.RequestContext.RequestChargeTracker,
isSuccessful: true,
shouldRetryOnSecondary: false,
response: disposableStoreResult.TryAddReference());
}

if (!storeResult.IsValid)
{
ExceptionDispatchInfo.Capture(storeResult.GetException()).Throw();
Expand Down Expand Up @@ -469,8 +503,8 @@ private async Task<PrimaryReadOutcome> WaitForPrimaryLsnAsync(

return PrimaryReadOutcome.QuorumNotMet;
}

private Task<bool> WaitForReadBarrierAsync(
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
private Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierAsync(
DocumentServiceRequest barrierRequest,
bool allowPrimary,
int readQuorum,
Expand All @@ -490,7 +524,7 @@ private Task<bool> WaitForReadBarrierAsync(
// (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back
// This old implementation will be removed (and the environment
// variable not been used anymore) after some bake time.
private async Task<bool> WaitForReadBarrierOldAsync(
private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierOldAsync(
DocumentServiceRequest barrierRequest,
bool allowPrimary,
int readQuorum,
Expand All @@ -506,7 +540,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
while (readBarrierRetryCount-- > 0)
{
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
barrierRequest,
includePrimary: allowPrimary,
replicaCountToRead: readQuorum,
Expand All @@ -517,11 +551,19 @@ private async Task<bool> WaitForReadBarrierOldAsync(
forceReadAll: true));
IList<ReferenceCountedDisposable<StoreResult>> responses = disposableResponses.Value;

// Check if all replicas returned 429
// TO DO: check with Kiran/Fabian if 429 from one replica is going to be enough to yield but that may be turn out to be flase alarm considering calls are async so the 429 state could be transient
if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any or All?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think All make sense.I will remove the comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With exceptions is the behavior all 429's from both replica?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with one replica failure its returning successful response as expected. updated the PR with additional test case coverage and included the diagnostics for the same

{
DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response
}

long maxGlobalCommittedLsnInResponses = responses.Count > 0 ? responses.Max(response => response.Target.GlobalCommittedLSN) : 0;
if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) &&
(!(targetGlobalCommittedLSN > 0) || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN))
{
return true;
return (true, false, null);
}

maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ?
Expand All @@ -547,7 +589,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
while (readBarrierRetryCountMultiRegion-- > 0)
{
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
barrierRequest,
includePrimary: allowPrimary,
replicaCountToRead: readQuorum,
Expand All @@ -562,7 +604,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) &&
maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN)
{
return true;
return (true, false, null);
}

maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ?
Expand Down Expand Up @@ -590,10 +632,11 @@ private async Task<bool> WaitForReadBarrierOldAsync(

DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}.",
targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode);
return false;
return (false, false, null);
}

private async Task<bool> WaitForReadBarrierNewAsync(
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierNewAsync(
DocumentServiceRequest barrierRequest,
bool allowPrimary,
int readQuorum,
Expand All @@ -606,11 +649,11 @@ private async Task<bool> WaitForReadBarrierNewAsync(
long maxGlobalCommittedLsn = 0;
bool hasConvergedOnLSN = false;
int readBarrierRetryCount = 0;
while(readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero)
while (readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero)
{
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
ValueStopwatch barrierRequestStopWatch = ValueStopwatch.StartNew();
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
barrierRequest,
includePrimary: allowPrimary,
replicaCountToRead: hasConvergedOnLSN ? 1 : readQuorum, // for GCLSN a single replica is sufficient
Expand All @@ -621,11 +664,18 @@ private async Task<bool> WaitForReadBarrierNewAsync(
forceReadAll: !hasConvergedOnLSN)); // for GCLSN a single replica is sufficient - and requests should be issued sequentially
barrierRequestStopWatch.Stop();
IList<ReferenceCountedDisposable<StoreResult>> responses = disposableResponses.Value;

// Check if all replicas returned 429
if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
Comment thread
kirankumarkolli marked this conversation as resolved.
{
DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
return (false, true, responses.First().Target.ToResponse(null)); // Yield early if all replicas return 429
}
TimeSpan previousBarrierRequestLatency = barrierRequestStopWatch.Elapsed;

int readBarrierLsnReachedCount = 0;
long maxGlobalCommittedLsnInResponses = 0;
foreach(ReferenceCountedDisposable<StoreResult> response in responses)
foreach (ReferenceCountedDisposable<StoreResult> response in responses)
{
maxGlobalCommittedLsnInResponses = Math.Max(maxGlobalCommittedLsnInResponses, response.Target.GlobalCommittedLSN);
if (!hasConvergedOnLSN && response.Target.LSN >= readBarrierLsn)
Expand All @@ -642,7 +692,7 @@ private async Task<bool> WaitForReadBarrierNewAsync(
if (hasConvergedOnLSN &&
(targetGlobalCommittedLSN <= 0 || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN))
{
return true;
return (true, false, null);
}

maxGlobalCommittedLsn = Math.Max(maxGlobalCommittedLsn, maxGlobalCommittedLsnInResponses);
Expand Down Expand Up @@ -670,15 +720,15 @@ private async Task<bool> WaitForReadBarrierNewAsync(
}
else if (shouldDelay)
{
TimeSpan delay =maxDelay < remainingDelay ? maxDelay : remainingDelay;
TimeSpan delay = maxDelay < remainingDelay ? maxDelay : remainingDelay;
await Task.Delay(delay);
remainingDelay -= delay;
}
}

DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}, HasLSNConverged:{3}.",
targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode, hasConvergedOnLSN);
return false;
return (false, false, null);
}

private bool IsQuorumMet(
Expand Down
Loading