Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
CancellationToken cancellationToken)
{
this.retryContext = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undo this file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do. its just draft PR I will not include it in the direct package PR>


ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode);
Expand All @@ -180,6 +179,7 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}

return await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);

}

/// <summary>
Expand Down
64 changes: 53 additions & 11 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Documents
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
Expand Down Expand Up @@ -285,7 +286,18 @@ private async Task<StoreResponse> WritePrivateAsync(
{
using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN))
{
if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN))
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) =
await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN);

if (isThrottled && throttledResponse != null)
{
// Handle throttling by returning the throttled response
DefaultTrace.TraceWarning("WritePrivateAsync: Throttling occurred during write barrier. Returning throttled response.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info is fine.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets trace the status and sub status codes along.

return throttledResponse;
}

if (!isSuccess)
{
DefaultTrace.TraceError("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN);
throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet);
Expand All @@ -302,7 +314,18 @@ private async Task<StoreResponse> WritePrivateAsync(
{
using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN))
{
if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN))
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) =
await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN);

if (isThrottled && throttledResponse != null)
{
// Handle throttling by returning the throttled response
DefaultTrace.TraceWarning("WritePrivateAsync: Throttling occurred during write barrier. Returning throttled response.");
return throttledResponse;
}

if (!isSuccess)
{
DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN);
throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet);
Expand Down Expand Up @@ -334,7 +357,8 @@ internal bool ShouldPerformWriteBarrierForGlobalStrong(StoreResult storeResult,
return false;
}

private Task<bool> WaitForWriteBarrierAsync(
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
private Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierAsync(
DocumentServiceRequest barrierRequest,
long selectedGlobalCommittedLsn)
{
Expand All @@ -350,7 +374,7 @@ private Task<bool> WaitForWriteBarrierAsync(
// (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back
// This old implementation will be removed (and the environment
// variable not been used anymore) after some bake time.
private async Task<bool> WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn)
private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn)
{
int writeBarrierRetryCount = ConsistencyWriter.maxNumberOfWriteBarrierReadRetries;

Expand All @@ -368,9 +392,20 @@ private async Task<bool> WaitForWriteBarrierOldAsync(DocumentServiceRequest barr
checkMinLSN: false,
forceReadAll: false);

if (responses != null && responses.Any(response => response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn))
if (responses != null)
{
return true;
// Check if all replicas returned 429
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a helper method help avoid duplication?

if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
{
DefaultTrace.TraceWarning("WaitForWriteBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response
}

// Check if any response satisfies the barrier condition
if (responses.Any(response => response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn))
{
return (true, false, null); // Barrier condition met
}
}

//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
Expand Down Expand Up @@ -401,10 +436,10 @@ private async Task<bool> WaitForWriteBarrierOldAsync(DocumentServiceRequest barr

DefaultTrace.TraceInformation("ConsistencyWriter: Highest global committed lsn received for write barrier call is {0}", maxGlobalCommittedLsnReceived);

return false;
return (false, false, null); // Barrier condition not met
}

private async Task<bool> WaitForWriteBarrierNewAsync(
#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierNewAsync(
DocumentServiceRequest barrierRequest,
long selectedGlobalCommittedLsn)
{
Expand Down Expand Up @@ -432,11 +467,18 @@ private async Task<bool> WaitForWriteBarrierNewAsync(
long maxGlobalCommittedLsn = 0;
if (responses != null)
{
// Check if all replicas returned 429
if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
{
DefaultTrace.TraceWarning("WaitForWriteBarrierNewAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response
}

foreach (ReferenceCountedDisposable<StoreResult> response in responses)
{
if (response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn)
{
return true;
return (true, false, null); // Barrier condition met
}

if (response.Target.GlobalCommittedLSN >= maxGlobalCommittedLsn)
Expand Down Expand Up @@ -478,7 +520,7 @@ private async Task<bool> WaitForWriteBarrierNewAsync(

DefaultTrace.TraceInformation("ConsistencyWriter: Highest global committed lsn received for write barrier call is {0}", maxGlobalCommittedLsnReceived);

return false;
return (false, false, null); // Barrier condition not met
}
}
}
Loading