diff --git a/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs index 11391ddcbd..182e4e996d 100644 --- a/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs @@ -153,7 +153,6 @@ public async Task ShouldRetryAsync( CancellationToken cancellationToken) { this.retryContext = null; - ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync( cosmosResponseMessage?.StatusCode, cosmosResponseMessage?.Headers.SubStatusCode); @@ -180,6 +179,7 @@ public async Task ShouldRetryAsync( } return await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken); + } /// diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs index 126de9b10c..6c755fe2d1 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Documents using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.Linq; + using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Core.Trace; @@ -285,7 +286,18 @@ private async Task WritePrivateAsync( { using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN)) { - if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN)) +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + (bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) = + await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN); + + if (isThrottled && throttledResponse != null) + { + // Handle throttling by returning the throttled response + DefaultTrace.TraceWarning("WritePrivateAsync: Throttling occurred during write barrier. Returning throttled response."); + return throttledResponse; + } + + if (!isSuccess) { DefaultTrace.TraceError("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); @@ -302,7 +314,18 @@ private async Task WritePrivateAsync( { using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN)) { - if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN)) +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + (bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) = + await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN); + + if (isThrottled && throttledResponse != null) + { + // Handle throttling by returning the throttled response + DefaultTrace.TraceWarning("WritePrivateAsync: Throttling occurred during write barrier. Returning throttled response."); + return throttledResponse; + } + + if (!isSuccess) { DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); @@ -334,7 +357,8 @@ internal bool ShouldPerformWriteBarrierForGlobalStrong(StoreResult storeResult, return false; } - private Task WaitForWriteBarrierAsync( +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + private Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierAsync( DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { @@ -350,7 +374,7 @@ private Task WaitForWriteBarrierAsync( // (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back // This old implementation will be removed (and the environment // variable not been used anymore) after some bake time. - private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) + private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { int writeBarrierRetryCount = ConsistencyWriter.maxNumberOfWriteBarrierReadRetries; @@ -368,9 +392,20 @@ private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barr checkMinLSN: false, forceReadAll: false); - if (responses != null && responses.Any(response => response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn)) + if (responses != null) { - return true; + // Check if all replicas returned 429 + if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests)) + { + DefaultTrace.TraceWarning("WaitForWriteBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy."); + return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response + } + + // Check if any response satisfies the barrier condition + if (responses.Any(response => response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn)) + { + return (true, false, null); // Barrier condition met + } } //get max global committed lsn from current batch of responses, then update if greater than max of all batches. @@ -401,10 +436,10 @@ private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barr DefaultTrace.TraceInformation("ConsistencyWriter: Highest global committed lsn received for write barrier call is {0}", maxGlobalCommittedLsnReceived); - return false; + return (false, false, null); // Barrier condition not met } - - private async Task WaitForWriteBarrierNewAsync( +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForWriteBarrierNewAsync( DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { @@ -432,11 +467,18 @@ private async Task WaitForWriteBarrierNewAsync( long maxGlobalCommittedLsn = 0; if (responses != null) { + // Check if all replicas returned 429 + if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests)) + { + DefaultTrace.TraceWarning("WaitForWriteBarrierNewAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy."); + return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response + } + foreach (ReferenceCountedDisposable response in responses) { if (response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn) { - return true; + return (true, false, null); // Barrier condition met } if (response.Target.GlobalCommittedLSN >= maxGlobalCommittedLsn) @@ -478,7 +520,7 @@ private async Task WaitForWriteBarrierNewAsync( DefaultTrace.TraceInformation("ConsistencyWriter: Highest global committed lsn received for write barrier call is {0}", maxGlobalCommittedLsnReceived); - return false; + return (false, false, null); // Barrier condition not met } } } diff --git a/Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs b/Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs index 5a0d9b5bf9..cb719bb638 100644 --- a/Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs +++ b/Microsoft.Azure.Cosmos/src/direct/QuorumReader.cs @@ -5,13 +5,16 @@ namespace Microsoft.Azure.Documents { using System; using System.Collections.Generic; + using System.Collections.Specialized; using System.Diagnostics; using System.Globalization; using System.Linq; + using System.Net; using System.Runtime.ExceptionServices; using System.Text; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Documents.Collections; //================================================================================================================= // Strong read logic: @@ -110,13 +113,26 @@ public async Task ReadStrongAsync( this.authorizationTokenProvider, secondaryQuorumReadResult.SelectedLsn, secondaryQuorumReadResult.GlobalCommittedSelectedLsn); - if (await this.WaitForReadBarrierAsync( - barrierRequest, - allowPrimary: true, - readQuorum: readQuorumValue, - readBarrierLsn: secondaryQuorumReadResult.SelectedLsn, - targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn, - readMode: readMode)) +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + (bool isSuccess, bool isThrottled, StoreResponse? throttledResponse) = await this.WaitForReadBarrierAsync( + barrierRequest, + allowPrimary: true, + readQuorum: readQuorumValue, + readBarrierLsn: secondaryQuorumReadResult.SelectedLsn, + targetGlobalCommittedLSN: secondaryQuorumReadResult.GlobalCommittedSelectedLsn, + readMode: readMode); + + if (isThrottled && throttledResponse != null) + { + // Handle throttling by delegating to ResourceThrottleRetryPolicy + DefaultTrace.TraceWarning("ReadStrongAsync: All replicas returned 429 Too Many Requests. Delegating to ResourceThrottleRetryPolicy."); + + // Return the real 429 response upstream + return throttledResponse; + + } + + if (isSuccess) { return secondaryQuorumReadResult.GetResponseAndSkipStoreResultDispose(); } @@ -256,7 +272,6 @@ private async Task ReadQuorumAsync( useSessionToken: false, readMode: readMode)); IList> responseResult = disposableResponseResult.Value; - responsesForLogging = new StoreResult[responseResult.Count]; for (int i = 0; i < responseResult.Count; i++) { @@ -312,7 +327,15 @@ private async Task ReadQuorumAsync( // ReadBarrier required DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(entity, this.authorizationTokenProvider, readLsn, globalCommittedLSN); - if (!await this.WaitForReadBarrierAsync(barrierRequest, false, readQuorum, readLsn, globalCommittedLSN, readMode)) +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + (bool isSuccess, bool isThrottled, StoreResponse? throttledRespons) = await this.WaitForReadBarrierAsync( + barrierRequest, + false, + readQuorum, + readLsn, + globalCommittedLSN, + readMode); + if(!isSuccess) { return new ReadQuorumResult( entity.RequestContext.RequestChargeTracker, @@ -353,6 +376,17 @@ private async Task ReadPrimaryAsync( requiresValidLsn: true, useSessionToken: useSessionToken); StoreResult storeResult = disposableStoreResult.Target; + if (storeResult.StatusCode == StatusCodes.TooManyRequests) + { + // Let ResourceThrottleRetryPolicy handle 429 + // Instead of throwing an exception, return a result that indicates throttling + return new ReadPrimaryResult( + requestChargeTracker: entity.RequestContext.RequestChargeTracker, + isSuccessful: true, + shouldRetryOnSecondary: false, + response: disposableStoreResult.TryAddReference()); + } + if (!storeResult.IsValid) { ExceptionDispatchInfo.Capture(storeResult.GetException()).Throw(); @@ -469,8 +503,8 @@ private async Task WaitForPrimaryLsnAsync( return PrimaryReadOutcome.QuorumNotMet; } - - private Task WaitForReadBarrierAsync( +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + private Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierAsync( DocumentServiceRequest barrierRequest, bool allowPrimary, int readQuorum, @@ -490,7 +524,7 @@ private Task WaitForReadBarrierAsync( // (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back // This old implementation will be removed (and the environment // variable not been used anymore) after some bake time. - private async Task WaitForReadBarrierOldAsync( + private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierOldAsync( DocumentServiceRequest barrierRequest, bool allowPrimary, int readQuorum, @@ -506,7 +540,7 @@ private async Task WaitForReadBarrierOldAsync( while (readBarrierRetryCount-- > 0) { barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed(); - using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync( + using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync( barrierRequest, includePrimary: allowPrimary, replicaCountToRead: readQuorum, @@ -517,11 +551,18 @@ private async Task WaitForReadBarrierOldAsync( forceReadAll: true)); IList> responses = disposableResponses.Value; + // Check if all replicas returned 429 + if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests)) + { + DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy."); + return (false, true, responses.First().Target.ToResponse(null)); // Return the first 429 response + } + long maxGlobalCommittedLsnInResponses = responses.Count > 0 ? responses.Max(response => response.Target.GlobalCommittedLSN) : 0; if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) && (!(targetGlobalCommittedLSN > 0) || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN)) { - return true; + return (true, false, null); } maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ? @@ -547,7 +588,7 @@ private async Task WaitForReadBarrierOldAsync( while (readBarrierRetryCountMultiRegion-- > 0) { barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed(); - using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync( + using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync( barrierRequest, includePrimary: allowPrimary, replicaCountToRead: readQuorum, @@ -562,7 +603,7 @@ private async Task WaitForReadBarrierOldAsync( if ((responses.Count(response => response.Target.LSN >= readBarrierLsn) >= readQuorum) && maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN) { - return true; + return (true, false, null); } maxGlobalCommittedLsn = maxGlobalCommittedLsn > maxGlobalCommittedLsnInResponses ? @@ -590,10 +631,11 @@ private async Task WaitForReadBarrierOldAsync( DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}.", targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode); - return false; + return (false, false, null); } - private async Task WaitForReadBarrierNewAsync( +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + private async Task<(bool isSuccess, bool isThrottled, StoreResponse? throttledResponse)> WaitForReadBarrierNewAsync( DocumentServiceRequest barrierRequest, bool allowPrimary, int readQuorum, @@ -606,11 +648,11 @@ private async Task WaitForReadBarrierNewAsync( long maxGlobalCommittedLsn = 0; bool hasConvergedOnLSN = false; int readBarrierRetryCount = 0; - while(readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero) + while (readBarrierRetryCount < defaultBarrierRequestDelays.Length && remainingDelay >= TimeSpan.Zero) { barrierRequest.RequestContext.TimeoutHelper.ThrowGoneIfElapsed(); ValueStopwatch barrierRequestStopWatch = ValueStopwatch.StartNew(); - using StoreResultList disposableResponses = new(await this.storeReader.ReadMultipleReplicaAsync( + using StoreResultList disposableResponses = new (await this.storeReader.ReadMultipleReplicaAsync( barrierRequest, includePrimary: allowPrimary, replicaCountToRead: hasConvergedOnLSN ? 1 : readQuorum, // for GCLSN a single replica is sufficient @@ -621,11 +663,18 @@ private async Task WaitForReadBarrierNewAsync( forceReadAll: !hasConvergedOnLSN)); // for GCLSN a single replica is sufficient - and requests should be issued sequentially barrierRequestStopWatch.Stop(); IList> responses = disposableResponses.Value; + + // Check if all replicas returned 429 + if (responses.All(response => response.Target.StatusCode == StatusCodes.TooManyRequests)) + { + DefaultTrace.TraceWarning("WaitForReadBarrierOldAsync: All replicas returned 429 Too Many Requests. Yielding early to ResourceThrottleRetryPolicy."); + return (false, true, responses.First().Target.ToResponse(null)); // Yield early if all replicas return 429 + } TimeSpan previousBarrierRequestLatency = barrierRequestStopWatch.Elapsed; int readBarrierLsnReachedCount = 0; long maxGlobalCommittedLsnInResponses = 0; - foreach(ReferenceCountedDisposable response in responses) + foreach (ReferenceCountedDisposable response in responses) { maxGlobalCommittedLsnInResponses = Math.Max(maxGlobalCommittedLsnInResponses, response.Target.GlobalCommittedLSN); if (!hasConvergedOnLSN && response.Target.LSN >= readBarrierLsn) @@ -642,7 +691,7 @@ private async Task WaitForReadBarrierNewAsync( if (hasConvergedOnLSN && (targetGlobalCommittedLSN <= 0 || maxGlobalCommittedLsnInResponses >= targetGlobalCommittedLSN)) { - return true; + return (true, false, null); } maxGlobalCommittedLsn = Math.Max(maxGlobalCommittedLsn, maxGlobalCommittedLsnInResponses); @@ -670,7 +719,7 @@ private async Task WaitForReadBarrierNewAsync( } else if (shouldDelay) { - TimeSpan delay =maxDelay < remainingDelay ? maxDelay : remainingDelay; + TimeSpan delay = maxDelay < remainingDelay ? maxDelay : remainingDelay; await Task.Delay(delay); remainingDelay -= delay; } @@ -678,7 +727,7 @@ private async Task WaitForReadBarrierNewAsync( DefaultTrace.TraceInformation("QuorumReader: WaitForReadBarrierAsync - TargetGlobalCommittedLsn: {0}, MaxGlobalCommittedLsn: {1} ReadMode: {2}, HasLSNConverged:{3}.", targetGlobalCommittedLSN, maxGlobalCommittedLsn, readMode, hasConvergedOnLSN); - return false; + return (false, false, null); } private bool IsQuorumMet( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs index 21521a8cd0..ffd76793d3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs @@ -21,6 +21,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Threading.Tasks; using global::Azure; using Microsoft.Azure.Cosmos.FaultInjection; + using Microsoft.Azure.Cosmos.Fluent; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Services.Management.Tests.LinqProviderTests; using Microsoft.Azure.Cosmos.Telemetry; @@ -31,6 +32,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; + using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.TransportClientHelper; [TestClass] public class ClientTests @@ -927,6 +929,455 @@ public void PooledConnectionLifetimeTest() Assert.AreEqual(socketHandlerType, clientMessageHandlerType); } + // This Test is not part of this PR. It was written solely to verify the existing behaviour of 429 requests + // when exceptionLess is turned off for 429 errors.It still make multiple calls instead of just making call to one replica and yield. + [TestMethod] + public async Task AssertExisting429BehaviourByTurningOffExceptionLess() + { + int failureRequestRetryCount = 0; + + string databaseName = "newdatabase"; + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + TransportClientHandlerFactory = (transport) => new TransportClientWrapper(transport, + interceptorAfterResult: (request, storeResponse) => + { + if (request.OperationType == Documents.OperationType.Read) + { + + request.UseStatusCodeFor429 = false; + // Increment retry count for 429 requests + failureRequestRetryCount++; + storeResponse.Status = (int)HttpStatusCode.TooManyRequests; // Simulate 429 error + } + + return storeResponse; + }) + }; + + CosmosClient cosmosClient = new CosmosClientBuilder( + connectionString: "points to test environment with one read and one write region") + .WithThrottlingRetryOptions( + maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries + maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts + .WithTransportClientHandlerFactory(clientOptions.TransportClientHandlerFactory) + .WithConnectionModeDirect() + .Build(); + + + + Container container = cosmosClient.GetDatabase(databaseName).GetContainer("test"); + dynamic testObject = new + { + id = Guid.NewGuid().ToString(), + Company = "Microsoft", + State = "WA" + }; + await container.CreateItemAsync(testObject); + + try + { + // Attempt to read the item + ItemResponse itemResponse = await container.ReadItemAsync( + testObject.id, + new Cosmos.PartitionKey(testObject.id)); + + } + + catch (CosmosException ex) + { + // Handle other Cosmos exceptions + Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); + Console.WriteLine("Diagnostics:"); + Console.WriteLine(ex.Diagnostics.ToString()); + } + + Console.Write("429 count of requests: " + failureRequestRetryCount); + // Assert that retries occurred + Assert.IsTrue(failureRequestRetryCount > 0, "No retries were made for 429 error code."); + } + [TestMethod] + public async Task AssertBarrierCallsForStrongConsistencyWrite() + { + int barrier429Count = 0; + string databaseName = "newdatabase"; + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + TransportClientHandlerFactory = (transport) => new TransportClientWrapper(transport, + interceptorAfterResult: (request, storeResponse) => + { + // Force a barrier request on write item in strong consistency. + // There needs to be 2 regions and the GlobalCommittedLSN must be behind the LSN. + long lsn = storeResponse.LSN - 2; + if (request.OperationType == Documents.OperationType.Create) + { + // Simulate a barrier request by setting GLSN < LSN + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + } + // Simulate 429 errors for write barrier requests + if (request.OperationType == Documents.OperationType.Head) + { + request.UseStatusCodeFor429 = false; + // Simulate a 429 for barrier requests + storeResponse.Status = (int)HttpStatusCode.TooManyRequests; // Simulate 429 error + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + barrier429Count++; + } + + return storeResponse; + }) + }; + + CosmosClient cosmosClient = new CosmosClientBuilder( + connectionString: "points to test environment with one read and one write region") + .WithTransportClientHandlerFactory(clientOptions.TransportClientHandlerFactory) + .WithThrottlingRetryOptions( + maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries + maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts + .WithConnectionModeDirect() + .WithConsistencyLevel(Cosmos.ConsistencyLevel.Strong) + .Build(); + + Container container = cosmosClient.GetDatabase(databaseName).GetContainer("test"); + + dynamic testObject = new + { + id = Guid.NewGuid().ToString(), + Company = "Microsoft", + State = "WA" + }; + try + { + // Perform a write operation to trigger barrier calls + ItemResponse response = await container.CreateItemAsync( + testObject, + new Cosmos.PartitionKey(testObject.id)); + + Console.WriteLine("Diagnostics:"); + Console.WriteLine(response.Diagnostics.ToString()); + } + catch (CosmosException ex) + { + // Handle other Cosmos exceptions + Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); + Console.WriteLine("Diagnostics:"); + Console.WriteLine(ex.Diagnostics.ToString()); + } + + Console.WriteLine($"Total 429 responses on barrier calls: {barrier429Count}"); + // Assert that retries occurred + Assert.IsTrue(barrier429Count > 0, "No retries were made for 429 error code."); + } + + [TestMethod] + public async Task Real429ReturnsLsnWithNoRetry() + { + string databaseName = "newdatabase"; + string containerName = "testcontainer429"; + string connectionString = "points to test environment with one read and one write region"; + + CosmosClient cosmosClient = new CosmosClientBuilder(connectionString) + .WithConnectionModeDirect() + .WithThrottlingRetryOptions(TimeSpan.Zero, 0) // No retries on 429 + .Build(); + + // Ensure database and container exist + await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); + Container container = await cosmosClient.GetDatabase(databaseName) + .CreateContainerIfNotExistsAsync(new ContainerProperties(containerName, "/id"), throughput: 400); + + // Create a test item + dynamic testItem = new { id = Guid.NewGuid().ToString(), value = "test" }; + await container.CreateItemAsync(testItem); + + int concurrentReads = 1000; + ConcurrentQueue throttledExceptions = new ConcurrentQueue(); + List tasks = new List(); + + for (int i = 0; i < concurrentReads; i++) + { + tasks.Add(Task.Run(async () => + { + try + { + ItemResponse response = await container.ReadItemAsync(testItem.id, new Cosmos.PartitionKey(testItem.id)); + if(response.StatusCode == HttpStatusCode.TooManyRequests) + { + Console.WriteLine(response.Diagnostics.ToString()); + } + } + catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests) + { + throttledExceptions.Enqueue(ex); + } + })); + } + + await Task.WhenAll(tasks); + + Assert.IsTrue(throttledExceptions.Count > 0, "Did not receive any 429 responses from Cosmos DB."); + + } + + + [TestMethod] + public async Task AssertOneBarrierCallShouldResultInASuccess() + { + int barrier429Count = 0; + string databaseName = "newdatabase"; + bool isReadOperation = false; + bool hasFailedOnce = false; + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + TransportClientHandlerFactory = (transport) => new TransportClientWrapper(transport, + interceptorAfterResult: (request, storeResponse) => + { + // Force a barrier request on read item in strong consistency. + // There needs to be 2 regions and the GlobalCommittedLSN must be behind the LSN. + long lsn = storeResponse.LSN - 2; + if (request.OperationType == Documents.OperationType.Read) + { + // Simulate a barrier request by setting GLSN < LSN + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + } + // only simulate 429 errors for read operations not write + if (request.OperationType == Documents.OperationType.Head && isReadOperation && !hasFailedOnce) + { + + request.UseStatusCodeFor429 = false; + // Simulate a 429 for barrier requests + storeResponse.Status = (int)HttpStatusCode.TooManyRequests; // Simulate 429 error + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + barrier429Count++; + hasFailedOnce = true; // Ensure only one failure + } + + return storeResponse; + }) + }; + + CosmosClient cosmosClient = new CosmosClientBuilder( + connectionString: "points to test environment with one read and one write region") + .WithTransportClientHandlerFactory(clientOptions.TransportClientHandlerFactory) + .WithThrottlingRetryOptions( + maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries + maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts + .WithConnectionModeDirect() + .WithConsistencyLevel(Cosmos.ConsistencyLevel.Strong) + .Build(); + + Container container = cosmosClient.GetDatabase(databaseName).GetContainer("test"); + + dynamic testObject = new + { + id = Guid.NewGuid().ToString(), + Company = "Microsoft", + State = "WA" + }; + await container.CreateItemAsync(testObject); + + try + { + isReadOperation = true; + // Perform a read operation to trigger barrier calls + ItemResponse response = await container.ReadItemAsync( + testObject.id, + new Cosmos.PartitionKey(testObject.id)); + + + Console.WriteLine("Diagnostics:"); + Console.WriteLine(response.Diagnostics.ToString()); + } + catch (CosmosException ex) + { + // Handle other Cosmos exceptions + Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); + Console.WriteLine("Diagnostics:"); + Console.WriteLine(ex.Diagnostics.ToString()); + } + + Console.WriteLine($"Total 429 responses on barrier calls: {barrier429Count}"); + // Assert that retries occurred + Assert.IsTrue(barrier429Count > 0, "No retries were made for 429 error code."); + + } + + //StorePhysicalAddress + [TestMethod] + public async Task AssertBarrierCallsWhenQuorumStateIsQuorumSelected() + { + int barrier429Count = 0; + string databaseName = "newdatabase"; + bool isReadOperation = false; + + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + TransportClientHandlerFactory = (transport) => new TransportClientWrapper(transport, + interceptorAfterResult: (request, storeResponse) => + { + // Force a barrier request on read item in strong consistency. + // There needs to be 2 regions and the GlobalCommittedLSN must be behind the LSN. + long lsn = storeResponse.LSN - 2; + if (request.OperationType == Documents.OperationType.Read) + { + // Simulate a barrier request by setting GLSN < LSN + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + } + // only simulate 429 errors for read operations not write + if (request.OperationType == Documents.OperationType.Head && isReadOperation ) + { + // Simulate a 429 for barrier requests + storeResponse.Status = (int)HttpStatusCode.TooManyRequests; // Simulate 429 error + storeResponse.Headers.Set(Documents.WFConstants.BackendHeaders.GlobalCommittedLSN, lsn.ToString()); + barrier429Count++; + } + + return storeResponse; + }) + }; + + CosmosClient cosmosClient = new CosmosClientBuilder( + connectionString: "points to test environment with one read and one write region") + .WithTransportClientHandlerFactory(clientOptions.TransportClientHandlerFactory) + .WithThrottlingRetryOptions( + maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries + maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts + .WithConnectionModeDirect() + .WithConsistencyLevel(Cosmos.ConsistencyLevel.Strong) + .Build(); + + Container container = cosmosClient.GetDatabase(databaseName).GetContainer("test"); + + dynamic testObject = new + { + id = Guid.NewGuid().ToString(), + Company = "Microsoft", + State = "WA" + }; + await container.CreateItemAsync(testObject); + + try + { + isReadOperation = true; + // Perform a read operation to trigger barrier calls + ItemResponse response = await container.ReadItemAsync( + testObject.id, + new Cosmos.PartitionKey(testObject.id)); + + + Console.WriteLine("Diagnostics:"); + Console.WriteLine(response.Diagnostics.ToString()); + } + catch (CosmosException ex) + { + // Handle other Cosmos exceptions + Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); + Console.WriteLine("Diagnostics:"); + Console.WriteLine(ex.Diagnostics.ToString()); + } + + Console.WriteLine($"Total 429 responses on barrier calls: {barrier429Count}"); + // Assert that retries occurred + Assert.IsTrue(barrier429Count > 0, "No retries were made for 429 error code."); + + } + + + [TestMethod] + [DataRow(5 , 2, DisplayName = "Validate Read Item operation with simulated error count < 4 * (maxRetryAttempts + 1) means we will get eventually 200 status Code")] + [DataRow(50 ,2, DisplayName = "Validate Read Item operation with simulated error count > 4 * (maxRetryAttempts + 1) means no 200 status code")] + public async Task InjectTooManyRequestsFaultAndVerify429Count(int simulatedErrorCount , int maxRetryAttempts) + { + string databaseName = "newdatabase"; + + // Create a fault injection rule for TooManyRequests (429) in direct mode + FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder( + id: "TooManyRequestsRule-" + Guid.NewGuid(), + condition: new FaultInjectionConditionBuilder() + .WithOperationType(FaultInjectionOperationType.ReadItem) + .Build(), + result: FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.TooManyRequests) + .WithTimes(simulatedErrorCount) + .Build()) + .Build(); + + // Initialize the fault injector + FaultInjector faultInjector = new FaultInjector(new List { tooManyRequestsRule }); + + CosmosClient cosmosClient = new CosmosClientBuilder( + connectionString: "points to test environment with one read and one write region") + .WithConnectionModeDirect().WithFaultInjection(faultInjector) + .WithThrottlingRetryOptions( + maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries + maxRetryAttemptsOnThrottledRequests: maxRetryAttempts) // Maximum retry attempts + .Build(); + + + ContainerProperties containerProperties = new ContainerProperties( + id: "test", + partitionKeyPath: "/id"); + + + // Create database and container + await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); + Container container = await cosmosClient.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(containerProperties); + + dynamic testObject = new + { + id = Guid.NewGuid().ToString(), + Company = "Microsoft", + State = "WA" + + }; + + + await container.CreateItemAsync(testObject); + try + { + // Attempt to read the item + ItemResponse itemResponse = await container.ReadItemAsync( + testObject.id, + new Cosmos.PartitionKey(testObject.id)); + + // Print diagnostics + Console.WriteLine("Diagnostics:"); + Console.WriteLine(itemResponse.Diagnostics.ToString()); + } + + catch (CosmosException ex) + { + // Handle other Cosmos exceptions + Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); + Console.WriteLine("Diagnostics:"); + Console.WriteLine(ex.Diagnostics.ToString()); + } + long hitCount = tooManyRequestsRule.GetHitCount(); + if(simulatedErrorCount < 4 * (maxRetryAttempts + 1)) + { + Assert.AreEqual(simulatedErrorCount , hitCount, "$hitcount of {hitCount} does not match simulatedErrorCount {simulatedErrorCount}"); + } + else + { + Assert.AreEqual(4 * (maxRetryAttempts + 1), hitCount, "$hitcount of {hitCount} does not match number of replicas {4 * (maxRetryAttempts+1)}"); + } + + + } + + + [TestMethod] [TestCategory("MultiRegion")] public async Task MultiRegionAccountTest()