Skip to content

Commit d86e498

Browse files
committed
fix: add support for barrier calls
1 parent 458bef4 commit d86e498

2 files changed

Lines changed: 141 additions & 22 deletions

File tree

Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ namespace Microsoft.Azure.Documents
55
{
66
using System;
77
using System.Collections.Generic;
8+
using System.Collections.Specialized;
89
using System.Diagnostics;
910
using System.Globalization;
1011
using System.Linq;
12+
using System.Net;
1113
using System.Runtime.ExceptionServices;
1214
using System.Text;
1315
using System.Threading.Tasks;
1416
using Microsoft.Azure.Cosmos.Core.Trace;
17+
using Microsoft.Azure.Documents.Collections;
1518

1619
//=================================================================================================================
1720
// Strong read logic:
@@ -110,13 +113,27 @@ public async Task<StoreResponse> ReadStrongAsync(
110113
this.authorizationTokenProvider,
111114
secondaryQuorumReadResult.SelectedLsn,
112115
secondaryQuorumReadResult.GlobalCommittedSelectedLsn);
113-
if (await this.WaitForReadBarrierAsync(
114-
barrierRequest,
115-
allowPrimary: true,
116-
readQuorum: readQuorumValue,
117-
readBarrierLsn: secondaryQuorumReadResult.SelectedLsn,
118-
targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn,
119-
readMode: readMode))
116+
117+
(bool isSuccess, bool isThrottled) = await this.WaitForReadBarrierAsync(
118+
barrierRequest,
119+
allowPrimary: true,
120+
readQuorum: readQuorumValue,
121+
readBarrierLsn: secondaryQuorumReadResult.SelectedLsn,
122+
targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn,
123+
readMode: readMode);
124+
125+
if (isThrottled)
126+
{
127+
// Handle throttling by delegating to ResourceThrottleRetryPolicy
128+
DefaultTrace.TraceWarning("ReadStrongAsync: All replicas returned 429 Too Many Requests. Delegating to ResourceThrottleRetryPolicy.");
129+
return new StoreResponse
130+
{
131+
Status = (int)StatusCodes.TooManyRequests,
132+
Headers = new DictionaryNameValueCollection()
133+
};
134+
}
135+
136+
if (isSuccess)
120137
{
121138
return secondaryQuorumReadResult.GetResponseAndSkipStoreResultDispose();
122139
}
@@ -311,7 +328,14 @@ private async Task<ReadQuorumResult> ReadQuorumAsync(
311328

312329
// ReadBarrier required
313330
DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(entity, this.authorizationTokenProvider, readLsn, globalCommittedLSN);
314-
if (!await this.WaitForReadBarrierAsync(barrierRequest, false, readQuorum, readLsn, globalCommittedLSN, readMode))
331+
(bool isSuccess, bool isThrottled) = await this.WaitForReadBarrierAsync(
332+
barrierRequest,
333+
false,
334+
readQuorum,
335+
readLsn,
336+
globalCommittedLSN,
337+
readMode);
338+
if(!isSuccess)
315339
{
316340
return new ReadQuorumResult(
317341
entity.RequestContext.RequestChargeTracker,
@@ -480,7 +504,7 @@ private async Task<PrimaryReadOutcome> WaitForPrimaryLsnAsync(
480504
return PrimaryReadOutcome.QuorumNotMet;
481505
}
482506

483-
private Task<bool> WaitForReadBarrierAsync(
507+
private Task<(bool isSuccess, bool isThrottled)> WaitForReadBarrierAsync(
484508
DocumentServiceRequest barrierRequest,
485509
bool allowPrimary,
486510
int readQuorum,
@@ -500,7 +524,7 @@ private Task<bool> WaitForReadBarrierAsync(
500524
// (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back
501525
// This old implementation will be removed (and the environment
502526
// variable not been used anymore) after some bake time.
503-
private async Task<bool> WaitForReadBarrierOldAsync(
527+
private async Task<(bool isSuccess, bool isThrottled)> WaitForReadBarrierOldAsync(
504528
DocumentServiceRequest barrierRequest,
505529
bool allowPrimary,
506530
int readQuorum,
@@ -516,7 +540,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
516540
while (readBarrierRetryCount-- > 0)
517541
{
518542
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
519-
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
543+
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
520544
barrierRequest,
521545
includePrimary: allowPrimary,
522546
replicaCountToRead: readQuorum,
@@ -526,12 +550,17 @@ private async Task<bool> WaitForReadBarrierOldAsync(
526550
checkMinLSN: false,
527551
forceReadAll: true));
528552
IList<ReferenceCountedDisposable<StoreResult>> responses = disposableResponses.Value;
553+
if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
554+
{
555+
DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
556+
return (false, true); // Indicate throttling
557+
}
529558

530559
long maxGlobalCommittedLsnInResponses = responses.Count > 0 ? responses.Max(response => response.Target.GlobalCommittedLSN) : 0;
531560
if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) &&
532561
(!(targetGlobalCommittedLSN > 0) || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN))
533562
{
534-
return true;
563+
return (true, false);
535564
}
536565

537566
maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ?
@@ -557,7 +586,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
557586
while (readBarrierRetryCountMultiRegion-- > 0)
558587
{
559588
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
560-
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
589+
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
561590
barrierRequest,
562591
includePrimary: allowPrimary,
563592
replicaCountToRead: readQuorum,
@@ -572,7 +601,7 @@ private async Task<bool> WaitForReadBarrierOldAsync(
572601
if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) &&
573602
maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN)
574603
{
575-
return true;
604+
return (true, false);
576605
}
577606

578607
maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ?
@@ -600,10 +629,10 @@ private async Task<bool> WaitForReadBarrierOldAsync(
600629

601630
DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}.",
602631
targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode);
603-
return false;
632+
return (false, false);
604633
}
605634

606-
private async Task<bool> WaitForReadBarrierNewAsync(
635+
private async Task<(bool isSuccess, bool isThrottled)> WaitForReadBarrierNewAsync(
607636
DocumentServiceRequest barrierRequest,
608637
bool allowPrimary,
609638
int readQuorum,
@@ -616,11 +645,11 @@ private async Task<bool> WaitForReadBarrierNewAsync(
616645
long maxGlobalCommittedLsn = 0;
617646
bool hasConvergedOnLSN = false;
618647
int readBarrierRetryCount = 0;
619-
while(readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero)
648+
while (readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero)
620649
{
621650
barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed();
622651
ValueStopwatch barrierRequestStopWatch = ValueStopwatch.StartNew();
623-
using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync(
652+
using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync(
624653
barrierRequest,
625654
includePrimary: allowPrimary,
626655
replicaCountToRead: hasConvergedOnLSN ? 1 : readQuorum, // for GCLSN a single replica is sufficient
@@ -631,11 +660,18 @@ private async Task<bool> WaitForReadBarrierNewAsync(
631660
forceReadAll: !hasConvergedOnLSN)); // for GCLSN a single replica is sufficient - and requests should be issued sequentially
632661
barrierRequestStopWatch.Stop();
633662
IList<ReferenceCountedDisposable<StoreResult>> responses = disposableResponses.Value;
663+
664+
// Check if all replicas returned 429
665+
if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests))
666+
{
667+
DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy.");
668+
return (false, true); // Yield early if all replicas return 429
669+
}
634670
TimeSpan previousBarrierRequestLatency = barrierRequestStopWatch.Elapsed;
635671

636672
int readBarrierLsnReachedCount = 0;
637673
long maxGlobalCommittedLsnInResponses = 0;
638-
foreach(ReferenceCountedDisposable<StoreResult> response in responses)
674+
foreach (ReferenceCountedDisposable<StoreResult> response in responses)
639675
{
640676
maxGlobalCommittedLsnInResponses = Math.Max(maxGlobalCommittedLsnInResponses, response.Target.GlobalCommittedLSN);
641677
if (!hasConvergedOnLSN && response.Target.LSN >= readBarrierLsn)
@@ -652,7 +688,7 @@ private async Task<bool> WaitForReadBarrierNewAsync(
652688
if (hasConvergedOnLSN &&
653689
(targetGlobalCommittedLSN <= 0 || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN))
654690
{
655-
return true;
691+
return (true, false);
656692
}
657693

658694
maxGlobalCommittedLsn = Math.Max(maxGlobalCommittedLsn, maxGlobalCommittedLsnInResponses);
@@ -680,15 +716,15 @@ private async Task<bool> WaitForReadBarrierNewAsync(
680716
}
681717
else if (shouldDelay)
682718
{
683-
TimeSpan delay =maxDelay < remainingDelay ? maxDelay : remainingDelay;
719+
TimeSpan delay = maxDelay < remainingDelay ? maxDelay : remainingDelay;
684720
await Task.Delay(delay);
685721
remainingDelay -= delay;
686722
}
687723
}
688724

689725
DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}, HasLSNConverged:{3}.",
690726
targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode, hasConvergedOnLSN);
691-
return false;
727+
return (false, false);
692728
}
693729

694730
private bool IsQuorumMet(

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,89 @@ public async Task AssertExisting429BehaviourByTurningOffExceptionLess()
10001000
Assert.IsTrue(failureRequestRetryCount > 0, "No retries were made for 429 error code.");
10011001
}
10021002

1003+
[TestMethod]
1004+
public async Task AssertBarrierCallsWhenQuorumStateIsQuorumSelected()
1005+
{
1006+
int barrier429Count = 0;
1007+
string databaseName = "newdatabase";
1008+
bool isReadOperation = false;
1009+
1010+
CosmosClientOptions clientOptions = new CosmosClientOptions()
1011+
{
1012+
ConnectionMode = ConnectionMode.Direct,
1013+
ConnectionProtocol = Protocol.Tcp,
1014+
TransportClientHandlerFactory = (transport) => new TransportClientWrapper(transport,
1015+
interceptorAfterResult: (request, storeResponse) =>
1016+
{
1017+
// Force a barrier request on read item in strong consistency.
1018+
// There needs to be 2 regions and the GlobalCommittedLSN must be behind the LSN.
1019+
long lsn = storeResponse.LSN - 2;
1020+
if (request.OperationType == Documents.OperationType.Read)
1021+
{
1022+
// Simulate a barrier request by setting GLSN < LSN
1023+
storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString());
1024+
}
1025+
// only simulate 429 errors for read operations not write
1026+
if (request.OperationType == Documents.OperationType.Head && isReadOperation)
1027+
{
1028+
1029+
request.UseStatusCodeFor429 = false;
1030+
// Simulate a 429 for barrier requests
1031+
storeResponse.Status = (int)HttpStatusCode.TooManyRequests; // Simulate 429 error
1032+
storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString());
1033+
barrier429Count++;
1034+
}
1035+
1036+
return storeResponse;
1037+
})
1038+
};
1039+
1040+
CosmosClient cosmosClient = new CosmosClientBuilder(
1041+
connectionString: "points to test environment with one read and one write region")
1042+
.WithTransportClientHandlerFactory(clientOptions.TransportClientHandlerFactory)
1043+
.WithThrottlingRetryOptions(
1044+
maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries
1045+
maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts
1046+
.WithConnectionModeDirect()
1047+
.WithConsistencyLevel(Cosmos.ConsistencyLevel.Strong)
1048+
.Build();
1049+
1050+
Container container = cosmosClient.GetDatabase(databaseName).GetContainer("test");
1051+
1052+
dynamic testObject = new
1053+
{
1054+
id = Guid.NewGuid().ToString(),
1055+
Company = "Microsoft",
1056+
State = "WA"
1057+
};
1058+
await container.CreateItemAsync<dynamic>(testObject);
1059+
1060+
try
1061+
{
1062+
isReadOperation = true;
1063+
// Perform a read operation to trigger barrier calls
1064+
ItemResponse<dynamic> response = await container.ReadItemAsync<dynamic>(
1065+
testObject.id,
1066+
new Cosmos.PartitionKey(testObject.id));
1067+
1068+
1069+
Console.WriteLine("Diagnostics:");
1070+
Console.WriteLine(response.Diagnostics.ToString());
1071+
}
1072+
catch (CosmosException ex)
1073+
{
1074+
// Handle other Cosmos exceptions
1075+
Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}");
1076+
Console.WriteLine("Diagnostics:");
1077+
Console.WriteLine(ex.Diagnostics.ToString());
1078+
}
1079+
1080+
Console.WriteLine($"Total 429 responses on barrier calls: {barrier429Count}");
1081+
// Assert that retries occurred
1082+
Assert.IsTrue(barrier429Count > 0, "No retries were made for 429 error code.");
1083+
1084+
}
1085+
10031086

10041087
[TestMethod]
10051088
[DataRow(5 , 2, DisplayName = "Validate Read Item operation with simulated error count < 4 * (maxRetryAttempts + 1) means we will get eventually 200 status Code")]

0 commit comments

Comments
 (0)