diff --git a/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionRuleProcessor.cs b/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionRuleProcessor.cs index 6fa1c6e56d..37aa3e1a73 100644 --- a/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionRuleProcessor.cs +++ b/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionRuleProcessor.cs @@ -189,7 +189,8 @@ await BackoffRetryUtility>.ExecuteAsync( result.GetDelay(), result.GetSuppressServiceRequests(), result.GetInjectionRate(), - this.applicationContext)); + this.applicationContext, + this.globalEndpointManager)); } private async Task GetEffectiveConnectionErrorRule(FaultInjectionRule rule) diff --git a/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionServerErrorResultInternal.cs b/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionServerErrorResultInternal.cs index 6cac1ccbd1..e16cdb5489 100644 --- a/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionServerErrorResultInternal.cs +++ b/Microsoft.Azure.Cosmos/FaultInjection/src/implementation/FaultInjectionServerErrorResultInternal.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos.FaultInjection using System.Net; using System.Net.Http.Headers; using System.Text; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; using Microsoft.Azure.Documents.Rntbd; @@ -23,6 +24,7 @@ internal class FaultInjectionServerErrorResultInternal private readonly bool suppressServiceRequest; private readonly double injectionRate; private readonly FaultInjectionApplicationContext applicationContext; + private readonly GlobalEndpointManager globalEndpointManager; /// /// Constructor for FaultInjectionServerErrorResultInternal @@ -32,13 +34,15 @@ internal class FaultInjectionServerErrorResultInternal /// /// /// + /// public FaultInjectionServerErrorResultInternal( - FaultInjectionServerErrorType serverErrorType, - int times, - TimeSpan delay, + FaultInjectionServerErrorType serverErrorType, + int times, + TimeSpan delay, bool suppressServiceRequest, double injectionRate, - FaultInjectionApplicationContext applicationContext) + FaultInjectionApplicationContext applicationContext, + GlobalEndpointManager globalEndpointManager) { this.serverErrorType = serverErrorType; this.times = times; @@ -46,6 +50,7 @@ public FaultInjectionServerErrorResultInternal( this.suppressServiceRequest = suppressServiceRequest; this.injectionRate = injectionRate; this.applicationContext = applicationContext; + this.globalEndpointManager = globalEndpointManager; } /// @@ -164,7 +169,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru Headers = retryWithHeaders, ResponseBody = new MemoryStream(FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}")) }; - + return storeResponse; case FaultInjectionServerErrorType.TooManyRequests: @@ -205,7 +210,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru Headers = internalServerErrorHeaders, ResponseBody = new MemoryStream(FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}")) }; - + return storeResponse; case FaultInjectionServerErrorType.ReadSessionNotAvailable: @@ -223,7 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru Headers = readSessionHeaders, ResponseBody = new MemoryStream(FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Read Session Not Available, rule: {ruleId}")) }; - + return storeResponse; case FaultInjectionServerErrorType.PartitionIsMigrating: @@ -237,7 +242,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru Headers = partitionMigrationHeaders, ResponseBody = new MemoryStream(FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Partition Migrating, rule: {ruleId}")) }; - + return storeResponse; case FaultInjectionServerErrorType.PartitionIsSplitting: @@ -284,23 +289,27 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st //Global or Local lsn? string lsn = dsr.RequestContext.QuorumSelectedLSN.ToString(CultureInfo.InvariantCulture); INameValueCollection headers = dsr.Headers; + bool isProxyCall = this.IsProxyCall(dsr); switch (this.serverErrorType) { case FaultInjectionServerErrorType.Gone: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.Gone, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Gone, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.Gone, (int)SubStatusCodes.ServerGenerated410, "Gone", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Gone, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -309,43 +318,48 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.TooManyRequests: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.TooManyRequests, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: TooManyRequests, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.TooManyRequests, (int)SubStatusCodes.RUBudgetExceeded, "TooManyRequests", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: TooManyRequests, rule: {ruleId}"))), }; - - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.RetryAfter = new RetryConditionHeaderValue(TimeSpan.FromMilliseconds(500)); httpResponse.Headers.Add( - WFConstants.BackendHeaders.SubStatus, + WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture)); httpResponse.Headers.Add(WFConstants.BackendHeaders.LocalLSN, lsn); return httpResponse; case FaultInjectionServerErrorType.Timeout: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.RequestTimeout, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.RequestTimeout, (int)SubStatusCodes.Unknown, "Timeout", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -355,19 +369,22 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.InternalServerError: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.InternalServerError, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.InternalServerError, (int)SubStatusCodes.Unknown, "InternalServerError", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: InternalServerError, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -377,20 +394,23 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.ReadSessionNotAvailable: - + const string badSesstionToken = "1:1#1#1=1#1=1"; httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.NotFound, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Read Session Not Available, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.NotFound, (int)SubStatusCodes.ReadSessionNotAvailable, "ReadSessionNotAvailable", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: ReadSessionNotAvailable, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -401,19 +421,22 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.PartitionIsMigrating: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.Gone, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: PartitionIsMigrating, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.Gone, (int)SubStatusCodes.CompletingPartitionMigration, "PartitionIsMigrating", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: PartitionIsMigrating, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -423,19 +446,22 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.PartitionIsSplitting: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.Gone, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: PartitionIsSplitting, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.Gone, (int)SubStatusCodes.CompletingSplit, "PartitionIsSplitting", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: PartitionIsSplitting, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -448,16 +474,19 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.ServiceUnavailable, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.ServiceUnavailable, (int)SubStatusCodes.Unknown, "ServiceUnavailable", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: ServiceUnavailable, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -467,19 +496,22 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st return httpResponse; case FaultInjectionServerErrorType.DatabaseAccountNotFound: - + httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.Forbidden, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: DatabaseAccountNotFound, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.Forbidden, (int)SubStatusCodes.DatabaseAccountNotFound, "DatabaseAccountNotFound", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: DatabaseAccountNotFound, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -492,16 +524,19 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st httpResponse = new HttpResponseMessage { + Version = isProxyCall + ? new Version(2, 0) + : new Version(1, 1), StatusCode = HttpStatusCode.Gone, Content = new FauntInjectionHttpContent( new MemoryStream( - FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: LeaseNotFound, rule: {ruleId}"))), + isProxyCall + ? FaultInjectionResponseEncoding.GetBytes( + GetProxyResponseMessageString((int)StatusCodes.Gone, (int)SubStatusCodes.LeaseNotFound, "LeaseNotFound", ruleId)) + : FaultInjectionResponseEncoding.GetBytes($"Fault Injection Server Error: LeaseNotFound, rule: {ruleId}"))), }; - foreach (string header in headers.AllKeys()) - { - httpResponse.Headers.Add(header, headers.Get(header)); - } + this.SetHttpHeaders(httpResponse, headers, isProxyCall); httpResponse.Headers.Add( WFConstants.BackendHeaders.SubStatus, @@ -515,6 +550,41 @@ public HttpResponseMessage GetInjectedServerError(DocumentServiceRequest dsr, st } } + private bool IsProxyCall(DocumentServiceRequest dsr) + { + string gwUriString = dsr.Headers.Get("FAULTINJECTION_IS_PROXY"); + + return !string.IsNullOrEmpty(gwUriString); + } + + private void SetHttpHeaders( + HttpResponseMessage httpResponse, + INameValueCollection headers, + bool isProxyCall) + { + foreach (string header in headers.AllKeys()) + { + if (header != "FAULTINJECTION_IS_PROXY") + { + httpResponse.Headers.Add(header, headers.Get(header)); + } + } + + if (isProxyCall) + { + httpResponse.Headers.Add(ThinClientConstants.RoutedViaProxy,"1"); + } + } + + private static string GetProxyResponseMessageString( + int statusCode, + int subStatusCode, + string message, + string faultInjectionRuleId) + { + return $"{{\"code\": \"{statusCode}:{subStatusCode}\",\"message\":\"Fault Injection Server Error: {message}, rule: {faultInjectionRuleId}\"}}"; + } + internal class FauntInjectionHttpContent : HttpContent { private readonly Stream content; @@ -536,7 +606,7 @@ protected override bool TryComputeLength(out long length) } } - internal static class FaultInjectionResponseEncoding + internal static class FaultInjectionResponseEncoding { private static readonly UTF8Encoding Encoding = new UTF8Encoding(false); @@ -544,6 +614,11 @@ public static byte[] GetBytes(string value) { return Encoding.GetBytes(value); } + + public static byte[] GetBytesFromHexString(string hexString) + { + return Convert.FromHexString(hexString); + } } } } diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index dadeff6682..2311dd2c5d 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -1088,7 +1088,8 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeCli httpClient: this.httpClient, globalPartitionEndpointManager: this.PartitionKeyRangeLocation, isThinClientEnabled: this.isThinClientEnabled, - userAgentContainer: this.ConnectionPolicy.UserAgentContainer); + userAgentContainer: this.ConnectionPolicy.UserAgentContainer, + this.chaosInterceptor); this.GatewayStoreModel = gatewayStoreModel; diff --git a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs index 55597a8852..5a20581cf1 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs @@ -17,7 +17,8 @@ namespace Microsoft.Azure.Cosmos using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; - using Microsoft.Azure.Documents.Collections; + using Microsoft.Azure.Documents.Collections; + using Microsoft.Azure.Documents.FaultInjection; using Newtonsoft.Json; // Marking it as non-sealed in order to unit test it using Moq framework @@ -26,7 +27,8 @@ internal class GatewayStoreModel : IStoreModelExtension, IDisposable private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString(); private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager; private readonly ISessionContainer sessionContainer; - private readonly DocumentClientEventSource eventSource; + private readonly DocumentClientEventSource eventSource; + private readonly IChaosInterceptor chaosInterceptor; internal readonly GlobalEndpointManager endpointManager; internal readonly ConsistencyLevel defaultConsistencyLevel; @@ -48,13 +50,15 @@ public GatewayStoreModel( CosmosHttpClient httpClient, GlobalPartitionEndpointManager globalPartitionEndpointManager, bool isThinClientEnabled, - UserAgentContainer userAgentContainer = null) + UserAgentContainer userAgentContainer = null, + IChaosInterceptor chaosInterceptor = null) { this.endpointManager = endpointManager; this.sessionContainer = sessionContainer; this.defaultConsistencyLevel = defaultConsistencyLevel; this.eventSource = eventSource; - this.globalPartitionEndpointManager = globalPartitionEndpointManager; + this.globalPartitionEndpointManager = globalPartitionEndpointManager; + this.chaosInterceptor = chaosInterceptor; this.gatewayStoreClient = new GatewayStoreClient( httpClient, this.eventSource, @@ -68,7 +72,8 @@ public GatewayStoreModel( userAgentContainer, this.eventSource, globalPartitionEndpointManager, - serializerSettings); + serializerSettings, + this.chaosInterceptor); } this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask( diff --git a/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs b/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs index 75b42b2bf9..25b46ad0a6 100644 --- a/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs +++ b/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs @@ -359,7 +359,6 @@ private async Task SendHttpHelperAsync( { if (this.chaosInterceptor != null && documentServiceRequest != null) { - this.SetFaultInjectionHeader(documentServiceRequest, requestMessage); (bool hasFault, HttpResponseMessage fiResponseMessage) = await this.InjectFaultsAsync(cancellationTokenSource, documentServiceRequest, requestMessage); if (hasFault) { @@ -374,7 +373,6 @@ private async Task SendHttpHelperAsync( if (this.chaosInterceptor != null && documentServiceRequest != null) { - this.SetFaultInjectionHeader(documentServiceRequest, requestMessage); CancellationToken fiToken = cancellationTokenSource.Token; fiToken.ThrowIfCancellationRequested(); await this.chaosInterceptor.OnAfterHttpSendAsync(documentServiceRequest, fiToken); @@ -469,11 +467,6 @@ private async Task SendHttpHelperAsync( } } - private void SetFaultInjectionHeader(DocumentServiceRequest documentServiceRequest, HttpRequestMessage requestMessage) - { - documentServiceRequest.Headers.Set("FAULTINJECTION_GW_URI", requestMessage.RequestUri.ToString()); - } - private async Task<(bool, HttpResponseMessage)> InjectFaultsAsync( CancellationTokenSource cancellationTokenSource, DocumentServiceRequest documentServiceRequest, diff --git a/Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs b/Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs index ca80f9f24a..8279210ac8 100644 --- a/Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs +++ b/Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.FaultInjection; using Newtonsoft.Json; using static Microsoft.Azure.Cosmos.ThinClientTransportSerializer; @@ -26,13 +27,15 @@ internal class ThinClientStoreClient : GatewayStoreClient private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager; private readonly ObjectPool bufferProviderWrapperPool; private readonly UserAgentContainer userAgentContainer; + private readonly IChaosInterceptor chaosInterceptor; public ThinClientStoreClient( CosmosHttpClient httpClient, UserAgentContainer userAgentContainer, ICommunicationEventSource eventSource, GlobalPartitionEndpointManager globalPartitionEndpointManager, - JsonSerializerSettings serializerSettings = null) + JsonSerializerSettings serializerSettings = null, + IChaosInterceptor chaosInterceptor = null) : base(httpClient, eventSource, globalPartitionEndpointManager, @@ -43,6 +46,7 @@ public ThinClientStoreClient( this.userAgentContainer = userAgentContainer ?? throw new ArgumentNullException(nameof(userAgentContainer), "UserAgentContainer cannot be null when initializing ThinClientStoreClient."); + this.chaosInterceptor = chaosInterceptor; } public override async Task InvokeAsync( @@ -63,6 +67,19 @@ public override async Task InvokeAsync( clientCollectionCache, cancellationToken)) { + if (this.chaosInterceptor != null) + { + request.Headers.Set("FAULTINJECTION_IS_PROXY", "true"); + (bool hasFault, HttpResponseMessage fiResponseMessage) = await this.chaosInterceptor.OnHttpRequestCallAsync(request, cancellationToken); + if (hasFault) + { + DefaultTrace.TraceInformation("Chaos interceptor injected fault for request: {0}", request); + fiResponseMessage.RequestMessage = responseMessage.RequestMessage; + request.Headers.Remove("FAULTINJECTION_IS_PROXY"); + return await ThinClientStoreClient.ParseResponseAsync(fiResponseMessage, request.SerializerSettings ?? base.SerializerSettings, request); + } + } + HttpResponseMessage proxyResponse = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); return await ThinClientStoreClient.ParseResponseAsync(proxyResponse, request.SerializerSettings ?? base.SerializerSettings, request); }