diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index c5fe1350f3..27d69c1920 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6797,7 +6797,12 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory } private void CreateStoreModel(bool subscribeRntbdStatus) - { + { + AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties; + + bool enableNRegionSynchronousCommit = accountProperties.EnableNRegionSynchronousCommit; + AccountConfigurationProperties accountConfigurationProperties = new (EnableNRegionSynchronousCommit: enableNRegionSynchronousCommit); + //EnableReadRequestsFallback, if not explicity set on the connection policy, //is false if the account's consistency is bounded staleness, //and true otherwise. @@ -6812,7 +6817,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, enableReplicaValidation: this.isReplicaAddressValidationEnabled, - sessionRetryOptions: this.ConnectionPolicy.SessionRetryOptions); + sessionRetryOptions: this.ConnectionPolicy.SessionRetryOptions, + accountConfigurationProperties: accountConfigurationProperties); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/RMResources.Designer.cs b/Microsoft.Azure.Cosmos/src/RMResources.Designer.cs index 6093b9c9dd..8b91aea5c3 100644 --- a/Microsoft.Azure.Cosmos/src/RMResources.Designer.cs +++ b/Microsoft.Azure.Cosmos/src/RMResources.Designer.cs @@ -1211,6 +1211,17 @@ internal static string GlobalStrongWriteBarrierNotMet } } + /// + /// Looks up a localized string similar to Global Strong write barrier has not been met for the request.. + /// + internal static string NRegionCommitSynchronousWriteBarrierNotMet + { + get + { + return ResourceManager.GetString("NRegionCommitSynchronousWriteBarrierNotMet", resourceCulture); + } + } + /// /// Looks up a localized string similar to The requested resource is no longer available at the server.. /// @@ -3221,6 +3232,17 @@ internal static string Server_GlobalStrongWriteBarrierNotMet } } + /// + /// Looks up a localized string similar to Nregion commit barrier not met for less than strong accounts. + /// + internal static string Server_NRegionCommitWriteBarrierNotMet + { + get + { + return ResourceManager.GetString("Server_NRegionCommitWriteBarrierNotMet", resourceCulture); + } + + } /// /// Looks up a localized string similar to Container was re-created, exceeded retries to resolve new identifier. Please contact support with the full exception. /// diff --git a/Microsoft.Azure.Cosmos/src/RMResources.resx b/Microsoft.Azure.Cosmos/src/RMResources.resx index 56ca90f30d..112496d185 100644 --- a/Microsoft.Azure.Cosmos/src/RMResources.resx +++ b/Microsoft.Azure.Cosmos/src/RMResources.resx @@ -759,6 +759,9 @@ Global Strong write barrier has not been met for the request. + + NRegion Commit write barrier has not been met for the request. + At this time, only write operations made from the MongoDB SDKs are supported. Modifications to MongoDB collections using other SDKs is temporarily blocked. @@ -1307,6 +1310,9 @@ If you would like to serve this query through continuation tokens, then please r Service is currently unavailable. More info: https://aka.ms/cosmosdb-tsg-service-unavailable. Could not achieve backend quorum for Strong or Bounded Staleness after barrier requests. + + Service is currently unavailable. More info: https://aka.ms/cosmosdb-tsg-service-unavailable. N-Region Commit write barrier has not been met for the request. + Service is currently unavailable. More info: https://aka.ms/cosmosdb-tsg-service-unavailable. Container was re-created, exceeded retries to resolve new identifier. Please contact support with the full exception. diff --git a/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs b/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs index 9157964b9e..e8cd9ac67e 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs @@ -269,5 +269,7 @@ private IDictionary QueryStringToDictConverter() [JsonExtensionData] internal IDictionary AdditionalProperties { get; set; } + [JsonProperty(PropertyName = Constants.Properties.EnableNRegionSynchronousCommit, NullValueHandling = NullValueHandling.Ignore)] + internal bool EnableNRegionSynchronousCommit { get; set; } } } diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs index 44dbbc9ae5..51f770e16b 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs @@ -11,7 +11,9 @@ namespace Microsoft.Azure.Documents using System.Linq; using System.Threading; using System.Threading.Tasks; + using global::Azure.Core; using Microsoft.Azure.Cosmos.Core.Trace; + /* ConsistencyWriter has two modes for writing - local quorum-acked write and globally strong write. @@ -33,6 +35,16 @@ 1. Issue write request to write region 5. Each barrier response will contain its own LSN and GlobalCommittedLSN, check for any response that satisfies GlobalCommittedLSN >= SelectedGlobalCommittedLSN 6. Return to caller on success. +For Less than Strong accounts with EnableNRegionSynchronousCommit feature enabled: +Business logic: We send single request to primary of the Write region,which will take care of replicating to its secondaries, one of which is XPPrimary. XPPrimary in this case will replicate this request to n read regions, which will ack from within their region. + In the write region where the original request was sent to , the request returns from the backend once write quorum number of replicas commits the write - but at this time, the response cannot be returned to caller, since linearizability guarantees will be violated. + ConsistencyWriter will continuously issue barrier head requests against the partition in question, until GlobalNRegionCommittedGLSN is at least as big as the lsn of the original response. +Sequence of steps: +1. After receiving response from primary of write region, look at GlobalNRegionCommittedGLSN and LSN headers. +2. If GlobalNRegionCommittedGLSN == LSN, return response to caller +3. If GlobalNRegionCommittedGLSN < LSN && storeResponse.NumberOFReadRegions > 0 , cache LSN in request as SelectedGlobalNRegionCommittedGLSN, and issue barrier requests against any/all replicas. +4. Each barrier response will contain its own LSN and GlobalNRegionCommittedGLSN, check for any response that satisfies GlobalNRegionCommittedGLSN >= SelectedGlobalNRegionCommittedGLSN +5. Return to caller on success. */ [SuppressMessage("", "AvoidMultiLineComments", Justification = "Multi line business logic")] @@ -57,6 +69,7 @@ internal sealed class ConsistencyWriter private readonly IAuthorizationTokenProvider authorizationTokenProvider; private readonly bool useMultipleWriteLocations; private readonly ISessionRetryOptions sessionRetryOptions; + private readonly AccountConfigurationProperties accountConfigurationProperties; public ConsistencyWriter( AddressSelector addressSelector, @@ -82,6 +95,7 @@ public ConsistencyWriter( new AddressEnumerator(), sessionContainer: null, enableReplicaValidation); //we need store reader only for global strong, no session is needed*/ + this.accountConfigurationProperties = accountConfigurationProperties; } // Test hook @@ -264,54 +278,38 @@ private async Task WritePrivateAsync( throw new InternalServerErrorException(); } - if (ReplicatedResourceClient.IsGlobalStrongEnabled() && this.ShouldPerformWriteBarrierForGlobalStrong(storeResult.Target, request.OperationType)) - { - long lsn = storeResult.Target.LSN; - long globalCommittedLsn = storeResult.Target.GlobalCommittedLSN; - - if (lsn == -1 || globalCommittedLsn == -1) - { - DefaultTrace.TraceWarning("ConsistencyWriter: LSN {0} or GlobalCommittedLsn {1} is not set for global strong request", - lsn, globalCommittedLsn); - // Service Generated because no lsn and glsn set by service - throw new GoneException(RMResources.Gone, SubStatusCodes.ServerGenerated410); - } - - request.RequestContext.GlobalStrongWriteStoreResult = storeResult; - request.RequestContext.GlobalCommittedSelectedLSN = lsn; - - //if necessary we would have already refreshed cache by now. - request.RequestContext.ForceRefreshAddressCache = false; - - DefaultTrace.TraceInformation("ConsistencyWriter: globalCommittedLsn {0}, lsn {1}", globalCommittedLsn, lsn); - //barrier only if necessary, i.e. when write region completes write, but read regions have not. - if (globalCommittedLsn < lsn) - { -#pragma warning disable SA1001 // Commas should be spaced correctly - using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN , includeRegionContext: true)) - { - if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN)) - { - DefaultTrace.TraceError("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); - throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); - } - } -#pragma warning restore SA1001 // Commas should be spaced correctly - } - } - else + WriteBarrierKind barrierKind = this.ComputeBarrierKind(storeResult, request); + if (barrierKind == WriteBarrierKind.None) { + // If barrier is not performed, we can return the store result directly. return storeResult.Target.ToResponse(); } + + await this.TryBarrierRequestForWritesAsync(storeResult, request, barrierKind); + } else { - using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, request.RequestContext.GlobalCommittedSelectedLSN, includeRegionContext: true)) + WriteBarrierKind barrierKind = this.ComputeBarrierKind(request.RequestContext.GlobalStrongWriteStoreResult, request); + using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, this.authorizationTokenProvider, null, + request.RequestContext.GlobalCommittedSelectedLSN, + includeRegionContext: true)) { - if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN)) + Func lsnAttributeSelector = barrierKind == WriteBarrierKind.GlobalStrongWrite ? (sr => sr.GlobalCommittedLSN) : (sr => sr.GlobalNRegionCommittedGLSN); + if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN, + lsnAttributeSelector)) { - DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); - throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); + if (barrierKind == WriteBarrierKind.GlobalStrongWrite) + { + DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); + throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); + } + else + { + DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for n region synchronous commit request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); + throw new GoneException(RMResources.NRegionCommitSynchronousWriteBarrierNotMet, SubStatusCodes.Server_NRegionCommitWriteBarrierNotMet); + } + } } } @@ -339,24 +337,112 @@ internal bool ShouldPerformWriteBarrierForGlobalStrong(StoreResult storeResult, return false; } +#pragma warning disable CS1570 + /** + Attempt barrier requests if applicable. + Cases in which barrier head requests are applicable (refer to comments on the class definition for more details on the whole write protocol. + For globally strong write: + 1. After receiving response from primary of write region, look at GlobalCommittedLsn and LSN headers. + 2. If GlobalCommittedLSN == LSN, return response to caller + 3. If GlobalCommittedLSN < LSN, cache LSN in request as SelectedGlobalCommittedLSN, and issue barrier requests against any/all replicas. + 4. Each barrier response will contain its own LSN and GlobalCommittedLSN, check for any response that satisfies GlobalCommittedLSN >= SelectedGlobalCommittedLSN + 5. Return to caller on success. + For less than Strong Accounts and If EnableNRegionSynchronousCommit is enabled for the account: + 1. After receiving response from primary of write region, look at GlobalCommittedLsn and LSN headers. + 2. If GlobalNRegionCommittedGLSN == LSN, return response to caller + 3. If GlobalNRegionCommittedGLSN < LSN && storeResponse.NumberOFReadRegions > 0 , cache LSN in request as SelectedGlobalNRegionCommittedGLSN, and issue barrier requests against any/all replicas. + 4. Each barrier response will contain its own LSN and GlobalNRegionCommittedGLSN, check for any response that satisfies GlobalNRegionCommittedGLSN >= SelectedGlobalNRegionCommittedGLSN + 5. Return to caller on success. + **/ +#pragma warning restore CS1570 + private async Task TryBarrierRequestForWritesAsync(ReferenceCountedDisposable storeResult, DocumentServiceRequest request, WriteBarrierKind barrierKind) + { + long lsn = storeResult.Target.LSN; + long globalCommitLsnToBeTracked = -1; + Func lsnAttributeSelector = null; + + //No need to run any barriers. + if (barrierKind == WriteBarrierKind.None) + { + return true; + } + + string warningMessage; + if (barrierKind == WriteBarrierKind.GlobalStrongWrite) + { + globalCommitLsnToBeTracked = storeResult.Target.GlobalCommittedLSN; + warningMessage = "ConsistencyWriter: LSN {0} or GlobalCommittedLsn {1} is not set for global strong request"; + + DefaultTrace.TraceInformation("ConsistencyWriter: globalCommittedLsn {0}, lsn {1}", globalCommitLsnToBeTracked, lsn); + lsnAttributeSelector = sr => sr.GlobalCommittedLSN; + } + else + { + globalCommitLsnToBeTracked = storeResult.Target.GlobalNRegionCommittedGLSN; + warningMessage = "ConsistencyWriter: LSN {0} or globalNRegionCommittedLsn {1} is not set for less than strong request with EnableNRegionSynchronousCommit property enabled "; + lsnAttributeSelector = sr => sr.GlobalNRegionCommittedGLSN; + } + + if (lsn == -1 || globalCommitLsnToBeTracked == -1) + { + DefaultTrace.TraceWarning(warningMessage, lsn, globalCommitLsnToBeTracked); + // Service Generated because no lsn and glsn set by service + throw new GoneException(RMResources.Gone, SubStatusCodes.ServerGenerated410); + } + + request.RequestContext.GlobalCommittedSelectedLSN = lsn; + request.RequestContext.GlobalStrongWriteStoreResult = storeResult; + + //if necessary we would have already refreshed cache by now. + request.RequestContext.ForceRefreshAddressCache = false; + + //barrier only if necessary, i.e. when write region completes write, but read regions have not. + if (globalCommitLsnToBeTracked < lsn) + { +#pragma warning disable SA1001 // Commas should be spaced correctly + using (DocumentServiceRequest barrierRequest = await BarrierRequestHelper.CreateAsync(request, + this.authorizationTokenProvider, null, + request.RequestContext.GlobalCommittedSelectedLSN, + includeRegionContext: true)) + { + if (!await this.WaitForWriteBarrierAsync(barrierRequest, request.RequestContext.GlobalCommittedSelectedLSN, lsnAttributeSelector)) + { + if (barrierKind == WriteBarrierKind.GlobalStrongWrite) + { + DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); + throw new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet); + } + else + { + DefaultTrace.TraceWarning("ConsistencyWriter: Write barrier has not been met for n region synchronous commit request. SelectedGlobalCommittedLsn: {0}", request.RequestContext.GlobalCommittedSelectedLSN); + throw new GoneException(RMResources.NRegionCommitSynchronousWriteBarrierNotMet, SubStatusCodes.Server_NRegionCommitWriteBarrierNotMet); + } + } + } +#pragma warning restore SA1001 // Commas should be spaced correctly + } + return true; + } private Task WaitForWriteBarrierAsync( DocumentServiceRequest barrierRequest, - long selectedGlobalCommittedLsn) + long selectedGlobalCommittedLsn, + Func lsnAttributeSelector) { if (BarrierRequestHelper.IsOldBarrierRequestHandlingEnabled) { - return this.WaitForWriteBarrierOldAsync(barrierRequest, selectedGlobalCommittedLsn); + return this.WaitForWriteBarrierOldAsync(barrierRequest, selectedGlobalCommittedLsn, lsnAttributeSelector); } - return this.WaitForWriteBarrierNewAsync(barrierRequest, selectedGlobalCommittedLsn); + return this.WaitForWriteBarrierNewAsync(barrierRequest, selectedGlobalCommittedLsn, lsnAttributeSelector); } // NOTE this is only temporarily kept to have a feature flag // (Env variable 'AZURE_COSMOS_OLD_BARRIER_REQUESTS_HANDLING_ENABLED' allowing to fall back // This old implementation will be removed (and the environment // variable not been used anymore) after some bake time. - private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) + private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn, + Func lsnAttributeSelector) { int writeBarrierRetryCount = ConsistencyWriter.maxNumberOfWriteBarrierReadRetries; @@ -374,13 +460,13 @@ private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barr checkMinLSN: false, forceReadAll: false); - if (responses != null && responses.Any(response => response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn)) + if (responses != null && responses.Any(response => lsnAttributeSelector(response.Target) >= selectedGlobalCommittedLsn)) { return true; } //get max global committed lsn from current batch of responses, then update if greater than max of all batches. - long maxGlobalCommittedLsn = responses != null ? responses.Select(s => s.Target.GlobalCommittedLSN).DefaultIfEmpty(0).Max() : 0; + long maxGlobalCommittedLsn = responses != null ? responses.Select(s => lsnAttributeSelector(s.Target)).DefaultIfEmpty(0).Max() : 0; maxGlobalCommittedLsnReceived = maxGlobalCommittedLsnReceived > maxGlobalCommittedLsn ? maxGlobalCommittedLsnReceived : maxGlobalCommittedLsn; //only refresh on first barrier call, set to false for subsequent attempts. @@ -412,7 +498,8 @@ private async Task WaitForWriteBarrierOldAsync(DocumentServiceRequest barr private async Task WaitForWriteBarrierNewAsync( DocumentServiceRequest barrierRequest, - long selectedGlobalCommittedLsn) + long selectedGlobalCommittedLsn, + Func lsnAttributeSelector) { TimeSpan remainingDelay = totalAllowedBarrierRequestDelay; @@ -442,14 +529,15 @@ private async Task WaitForWriteBarrierNewAsync( { foreach (ReferenceCountedDisposable response in responses) { - if (response.Target.GlobalCommittedLSN >= selectedGlobalCommittedLsn) + long selectedLsn = lsnAttributeSelector(response.Target); + if (selectedLsn >= selectedGlobalCommittedLsn) { return true; } - if (response.Target.GlobalCommittedLSN >= maxGlobalCommittedLsn) + if (selectedLsn >= maxGlobalCommittedLsn) { - maxGlobalCommittedLsn = response.Target.GlobalCommittedLSN; + maxGlobalCommittedLsn = selectedLsn; } } } @@ -488,5 +576,27 @@ private async Task WaitForWriteBarrierNewAsync( return false; } + + public enum WriteBarrierKind + { + None = 0, // No barrier needed + GlobalStrongWrite = 1, // Barrier for global strong consistency writes + NRegionSynchronousCommit = 2 // Barrier for N-region synchronous commit writes + } + + private WriteBarrierKind ComputeBarrierKind(ReferenceCountedDisposable storeResult, DocumentServiceRequest request) + { + if (ReplicatedResourceClient.IsGlobalStrongEnabled() && this.ShouldPerformWriteBarrierForGlobalStrong(storeResult.Target, request.OperationType)) + { + return WriteBarrierKind.GlobalStrongWrite; + } + else if (this.serviceConfigReader.DefaultConsistencyLevel != ConsistencyLevel.Strong + && storeResult.Target.GlobalNRegionCommittedGLSN != -1 + && this.accountConfigurationProperties.EnableNRegionSynchronousCommit && storeResult.Target.NumberOfReadRegions > 0) + { + return WriteBarrierKind.NRegionSynchronousCommit; + } + return WriteBarrierKind.None; + } } } diff --git a/Microsoft.Azure.Cosmos/src/direct/HttpConstants.cs b/Microsoft.Azure.Cosmos/src/direct/HttpConstants.cs index 417d60c3cb..b2eb5782df 100644 Binary files a/Microsoft.Azure.Cosmos/src/direct/HttpConstants.cs and b/Microsoft.Azure.Cosmos/src/direct/HttpConstants.cs differ diff --git a/Microsoft.Azure.Cosmos/src/direct/ServiceUnavailableException.cs b/Microsoft.Azure.Cosmos/src/direct/ServiceUnavailableException.cs index 54c788c089..f02758cef4 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ServiceUnavailableException.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ServiceUnavailableException.cs @@ -109,6 +109,8 @@ private static string GetExceptionMessage(SubStatusCodes? subStatusCode) return RMResources.ServerGenerated410; case SubStatusCodes.Server_GlobalStrongWriteBarrierNotMet: return RMResources.Server_GlobalStrongWriteBarrierNotMet; + case SubStatusCodes.Server_NRegionCommitWriteBarrierNotMet: + return RMResources.Server_NRegionCommitWriteBarrierNotMet; case SubStatusCodes.Server_ReadQuorumNotMet: return RMResources.Server_ReadQuorumNotMet; case SubStatusCodes.ServerGenerated503: diff --git a/Microsoft.Azure.Cosmos/src/direct/StatusCodes.cs b/Microsoft.Azure.Cosmos/src/direct/StatusCodes.cs index 8dc8b460dc..e0b01f2d27 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StatusCodes.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StatusCodes.cs @@ -376,6 +376,7 @@ internal enum SubStatusCodes ServerGenerated503 = 21008, Server_NoValidStoreResponse = 21009, // ServerGenerated408 = 21010 - currently only applicable in Java + Server_NRegionCommitWriteBarrierNotMet = 21011, // Data Transfer Application related MissingPartitionKeyInDataTransfer = 22001, diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreResult.cs b/Microsoft.Azure.Cosmos/src/direct/StoreResult.cs index 38fd910f36..d52f8022f9 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreResult.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreResult.cs @@ -44,6 +44,7 @@ public static ReferenceCountedDisposable CreateStoreResult( long globalCommittedLSN = -1; int numberOfReadRegions = -1; long itemLSN = -1; + long globalNRegionCommittedGLSN = -1; if (storeResponse.TryGetHeaderValue( useLocalLSNBasedHeaders ? WFConstants.BackendHeaders.QuorumAckedLocalLSN : WFConstants.BackendHeaders.QuorumAckedLSN, out headerValue)) @@ -97,6 +98,11 @@ public static ReferenceCountedDisposable CreateStoreResult( lsn = storeResponse.LSN; } + if (storeResponse.TryGetHeaderValue(WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, out headerValue)) + { + globalNRegionCommittedGLSN = long.Parse(headerValue, CultureInfo.InvariantCulture); + } + ISessionToken sessionToken = null; if (StoreResult.UseSessionTokenHeader) { @@ -136,7 +142,8 @@ public static ReferenceCountedDisposable CreateStoreResult( backendRequestDurationInMs: backendRequestDurationMilliseconds, retryAfterInMs: retryAfterInMs, transportRequestStats: storeResponse.TransportRequestStats, - replicaHealthStatuses: replicaHealthStatuses)); + replicaHealthStatuses: replicaHealthStatuses, + globalNRegionCommittedGLSN: globalNRegionCommittedGLSN)); } else { @@ -148,6 +155,7 @@ public static ReferenceCountedDisposable CreateStoreResult( int currentWriteQuorum = -1; long globalCommittedLSN = -1; int numberOfReadRegions = -1; + long globalNRegionCommittedGLSN = -1; string headerValue = documentClientException.Headers[useLocalLSNBasedHeaders ? WFConstants.BackendHeaders.QuorumAckedLocalLSN : WFConstants.BackendHeaders.QuorumAckedLSN]; if (!string.IsNullOrEmpty(headerValue)) { @@ -199,6 +207,11 @@ public static ReferenceCountedDisposable CreateStoreResult( lsn = documentClientException.LSN; } + headerValue = documentClientException.Headers[WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN]; + if (!string.IsNullOrEmpty(headerValue)) + { + globalNRegionCommittedGLSN = long.Parse(headerValue, CultureInfo.InvariantCulture); + } ISessionToken sessionToken = null; if (StoreResult.UseSessionTokenHeader) { @@ -237,7 +250,8 @@ public static ReferenceCountedDisposable CreateStoreResult( backendRequestDurationInMs: documentClientException.Headers[HttpConstants.HttpHeaders.BackendRequestDurationMilliseconds], retryAfterInMs: documentClientException.Headers[HttpConstants.HttpHeaders.RetryAfterInMilliseconds], transportRequestStats: documentClientException.TransportRequestStats, - replicaHealthStatuses: replicaHealthStatuses)); + replicaHealthStatuses: replicaHealthStatuses, + globalNRegionCommittedGLSN: globalNRegionCommittedGLSN)); } else { @@ -262,7 +276,8 @@ public static ReferenceCountedDisposable CreateStoreResult( backendRequestDurationInMs: null, retryAfterInMs: null, transportRequestStats: null, - replicaHealthStatuses: replicaHealthStatuses)); + replicaHealthStatuses: replicaHealthStatuses, + globalNRegionCommittedGLSN: -1)); } } } @@ -272,7 +287,7 @@ public static ReferenceCountedDisposable CreateForTesting(StoreResp return new ReferenceCountedDisposable( new StoreResult( storeResponse, exception: null, null, default, default, default, default, default, default, - default, default, default, default, default, default, default, default, default, default, default)); + default, default, default, default, default, default, default, default, default, default, default, default)); } public static ReferenceCountedDisposable CreateForTesting(TransportRequestStats transportRequestStats) @@ -303,7 +318,8 @@ public static ReferenceCountedDisposable CreateForTesting(Transport "http://storephysicaladdress-2s.com:Unknown", "http://storephysicaladdress-3s.com:Unhealthy", "http://storephysicaladdress-4s.com:Unknown" - })); + }, + globalNRegionCommittedGLSN: -1)); } public static ReferenceCountedDisposable CreateForTesting(string partitionKeyRangeId) @@ -328,7 +344,8 @@ public static ReferenceCountedDisposable CreateForTesting(string pa backendRequestDurationInMs: "10", retryAfterInMs: "20", transportRequestStats: new TransportRequestStats(), - replicaHealthStatuses: null)); + replicaHealthStatuses: null, + globalNRegionCommittedGLSN: -1)); } private StoreResult( @@ -351,7 +368,8 @@ private StoreResult( string backendRequestDurationInMs, string retryAfterInMs, TransportRequestStats transportRequestStats, - IEnumerable replicaHealthStatuses) + IEnumerable replicaHealthStatuses, + long globalNRegionCommittedGLSN) { if (storeResponse == null && exception == null) { @@ -379,8 +397,9 @@ private StoreResult( this.RetryAfterInMs = retryAfterInMs; this.TransportRequestStats = transportRequestStats; this.ReplicaHealthStatuses = replicaHealthStatuses; + this.GlobalNRegionCommittedGLSN = globalNRegionCommittedGLSN; - this.StatusCode = (StatusCodes) (this.storeResponse != null ? this.storeResponse.StatusCode : + this.StatusCode = (StatusCodes)(this.storeResponse != null ? this.storeResponse.StatusCode : ((this.Exception != null && this.Exception.StatusCode.HasValue) ? this.Exception.StatusCode : 0)); this.SubStatusCode = this.storeResponse != null ? this.storeResponse.SubStatusCode : @@ -429,6 +448,8 @@ private StoreResult( public IEnumerable ReplicaHealthStatuses { get; private set; } + public long GlobalNRegionCommittedGLSN { get; private set; } + public DocumentClientException GetException() { if (this.Exception == null) @@ -505,7 +526,7 @@ public void AppendToBuilder(StringBuilder stringBuilder) stringBuilder.AppendFormat( CultureInfo.InvariantCulture, "StorePhysicalAddress: {0}, LSN: {1}, GlobalCommittedLsn: {2}, PartitionKeyRangeId: {3}, IsValid: {4}, StatusCode: {5}, SubStatusCode: {6}, " + - "RequestCharge: {7}, ItemLSN: {8}, SessionToken: {9}, UsingLocalLSN: {10}, TransportException: {11}, BELatencyMs: {12}, ActivityId: {13}, RetryAfterInMs: {14}", + "RequestCharge: {7}, ItemLSN: {8}, SessionToken: {9}, UsingLocalLSN: {10}, TransportException: {11}, BELatencyMs: {12}, ActivityId: {13}, RetryAfterInMs: {14}, globalNRegionCommittedGLSN: {15}", this.StorePhysicalAddress, this.LSN, this.GlobalCommittedLSN, @@ -520,7 +541,8 @@ public void AppendToBuilder(StringBuilder stringBuilder) this.Exception?.InnerException is TransportException ? this.Exception.InnerException.Message : "null", this.BackendRequestDurationInMs, this.ActivityId, - this.RetryAfterInMs); + this.RetryAfterInMs, + this.GlobalNRegionCommittedGLSN); if (this.ReplicaHealthStatuses != null && this.ReplicaHealthStatuses.Any()) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemConsistencyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemConsistencyTests.cs new file mode 100644 index 0000000000..a12a3d8cd6 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemConsistencyTests.cs @@ -0,0 +1,210 @@ +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Net.Http; + using System.Text; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Diagnostics; + using Microsoft.Azure.Cosmos.Fluent; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Cosmos.Tracing.TraceData; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json.Linq; + using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum; + + [TestClass] + public class CosmosItemConsistencyTests : BaseCosmosClientHelper + { + HttpClientHandlerHelper httpClientHandlerHelper; + + [TestInitialize] + public async Task TestInitialize() + { + await base.TestInit(); + this.httpClientHandlerHelper = new HttpClientHandlerHelper() + { + ResponseIntercepter = async (response, request) => + { + string json = await response?.Content?.ReadAsStringAsync(); + if (json.Length > 0 && json.Contains("databaseAccountEndpoint")) + { + JObject parsedDatabaseAccountResponse = JObject.Parse(json); + if (parsedDatabaseAccountResponse.ContainsKey("enableNRegionSynchronousCommit")) + { + parsedDatabaseAccountResponse.Property("enableNRegionSynchronousCommit").Value = true.ToString(); + } + else + { + parsedDatabaseAccountResponse.Add("enableNRegionSynchronousCommit", true); + } + string interceptedResponseStr = parsedDatabaseAccountResponse.ToString(); + HttpResponseMessage interceptedResponse = new() + { + StatusCode = response.StatusCode, + Content = new StringContent(interceptedResponseStr), + Version = response.Version, + ReasonPhrase = response.ReasonPhrase, + RequestMessage = response.RequestMessage, + }; + return interceptedResponse; + } + return response; + }, + }; + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task TestNRegionCommitEnabledThenWriteBarrier() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + int createDocumentCallCount = 0; + Func transportClientDelegate = (transportClient) => new TransportClientHelper.TransportClientWrapper( + transportClient, + null, + null, + (resourceOperation, storeResponse) => + { + // Intercept the StoreResponse after the transport client receives it. + if ((resourceOperation.ResourceType == ResourceType.Document && + resourceOperation.OperationType == OperationType.Create) || + (resourceOperation.ResourceType == ResourceType.Collection && + resourceOperation.OperationType == OperationType.Head)) + { + string lsnActualWrite = "100"; + string lsnGlobalNRegionCommitted = createDocumentCallCount++ == 5 ? lsnActualWrite : "90"; + storeResponse.Headers.Set(WFConstants.BackendHeaders.LSN, lsnActualWrite); + storeResponse.Headers.Set(WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_1"); + storeResponse.Headers.Set(WFConstants.BackendHeaders.GlobalCommittedLSN, "100"); + storeResponse.Headers.Set(WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, lsnGlobalNRegionCommitted); + storeResponse.Headers.Set(WFConstants.BackendHeaders.NumberOfReadRegions, "1"); + } + return storeResponse; + } + ); + + try + { + await base.TestInit((builder) => builder + .WithHttpClientFactory(() => new HttpClient(this.httpClientHandlerHelper)) + .WithTransportClientHandlerFactory(transportClientDelegate)); + + + Container container = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: "/pk")); + + ToDoActivity temp = ToDoActivity.CreateRandomToDoActivity("Item with Session Nregion commit writes"); + + ItemResponse itemResponse = await container.CreateItemAsync( + partitionKey: new Cosmos.PartitionKey(temp.pk), + item: temp); + + + // Assert we made 5 HEAD requests to ensure write barrier is met + CosmosTraceDiagnostics traceDiagnostic = itemResponse.Diagnostics as CosmosTraceDiagnostics; + + int headRequestCount = 0; + + ClientSideRequestStatisticsTraceDatum clientSideRequestStats = this.GetClientSideRequestStatsFromTrace(traceDiagnostic.Value, "Transport Request"); + foreach (StoreResponseStatistics responseStatistics in clientSideRequestStats.StoreResponseStatisticsList) + { + if (responseStatistics.RequestResourceType == ResourceType.Collection && + responseStatistics.RequestOperationType == OperationType.Head) + { + headRequestCount++; + } + } + + Assert.AreEqual(5, headRequestCount, "Expected 5 HEAD requests to be made to ensure write barrier is met"); + + + Console.WriteLine(itemResponse.Diagnostics); + } + catch (Exception ex) + { + Assert.Fail("Test failed with exception: " + ex.ToString()); + } + } + + [TestMethod] + public async Task TestNRegionLsnNeverRecoversThenWriteFailure() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + Func transportClientDelegate = (transportClient) => new TransportClientHelper.TransportClientWrapper( + transportClient, + null, + null, + (resourceOperation, storeResponse) => + { + // Intercept the StoreResponse after the transport client receives it. + if ((resourceOperation.ResourceType == ResourceType.Document && + resourceOperation.OperationType == OperationType.Create) || + (resourceOperation.ResourceType == ResourceType.Collection && + resourceOperation.OperationType == OperationType.Head)) + { + string lsnActualWrite = "100"; + string lsnGlobalNRegionCommitted = "90"; + storeResponse.Headers.Set(WFConstants.BackendHeaders.LSN, lsnActualWrite); + storeResponse.Headers.Set(WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_1"); + storeResponse.Headers.Set(WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, lsnGlobalNRegionCommitted); + storeResponse.Headers.Set(WFConstants.BackendHeaders.NumberOfReadRegions, "1"); + } + return storeResponse; + } + ); + try + { + await base.TestInit((builder) => builder + .WithHttpClientFactory(() => new HttpClient(this.httpClientHandlerHelper)) + .WithTransportClientHandlerFactory(transportClientDelegate)); + + Container container = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: "/pk")); + + ToDoActivity temp = ToDoActivity.CreateRandomToDoActivity("Item with Session nregion writes"); + CosmosException cosmosException = await Assert.ThrowsExceptionAsync(() => container.CreateItemAsync( + partitionKey: new Cosmos.PartitionKey(temp.pk), + item: temp)); + + Assert.AreEqual(System.Net.HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode); + } + catch (Exception ex) + { + Assert.Fail("Test failed with exception: " + ex.ToString()); + } + } + + private ClientSideRequestStatisticsTraceDatum GetClientSideRequestStatsFromTrace(ITrace trace, string traceToFind) + { + if (trace.Name.Contains(traceToFind)) + { + foreach (object datum in trace.Data.Values) + { + if (datum is ClientSideRequestStatisticsTraceDatum clientSideStats) + { + return clientSideStats; + } + } + } + + foreach (ITrace child in trace.Children) + { + ClientSideRequestStatisticsTraceDatum datum = this.GetClientSideRequestStatsFromTrace(child, traceToFind); + if (datum != null) + { + return datum; + } + } + return null; + } + + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs index 43fe495852..22939d78a2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs @@ -8,14 +8,18 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Globalization; using System.Net; using System.Net.Http; + using System.Net.Security; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Internal; using Microsoft.Azure.Cosmos.Linq; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; + using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Cosmos.Utils; using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Client; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -272,6 +276,112 @@ private void TestRetryOnThrottled(int? numberOfRetries) Assert.IsTrue(throttled); } + [TestMethod] + [DataRow(false)] + [DataRow(true)] + public void EnableNRegionSynchronousCommit_PassedToStoreClient(bool nRegionCommitEnabled) + { + + StoreClient storeClient = new StoreClient( + new Mock().Object, + new SessionContainer(string.Empty), + new Mock().Object, + new Mock().Object, + Protocol.Tcp, + new Mock().Object); + // Arrange + Mock mockStoreClientFactory = new Mock(); + mockStoreClientFactory.Setup(f => f.CreateStoreClient( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny() + )).Returns(storeClient); + + DocumentClient documentClient = new DocumentClient( + new Uri("https://localhost:8081"), + new Mock().Object, + new EventHandler((s, e) => { }), + new ConnectionPolicy(), + null, // desiredConsistencyLevel + null, // serializerSettings + ApiType.None, + new EventHandler((s, e) => { }), + null, // handler + new Mock().Object, + null, // enableCpuMonitor + new Func(tc => tc), + mockStoreClientFactory.Object, + false, // isLocalQuorumConsistency + "testClientId", + new RemoteCertificateValidationCallback((sender, certificate, chain, sslPolicyErrors) => true), + new Mock().Object, + new Mock().Object, + true // enableAsyncCacheExceptionNoSharing + ); + + AccountProperties accountProperties = new AccountProperties + { + // Set the property to true for test + EnableNRegionSynchronousCommit = nRegionCommitEnabled, + }; + + AccountConsistency ac = new AccountConsistency(); + ac.DefaultConsistencyLevel = (Cosmos.ConsistencyLevel) ConsistencyLevel.Session; + accountProperties.Consistency = ac; + + Func> getDatabaseAccountFn = () => + // When called with any Uri, return the expected AccountProperties + Task.FromResult(accountProperties); + + CosmosAccountServiceConfiguration accountServiceConfiguration = new CosmosAccountServiceConfiguration( + getDatabaseAccountFn); + + typeof(CosmosAccountServiceConfiguration) + .GetProperty("AccountProperties", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(accountServiceConfiguration, accountProperties); + + //Inject the accountServiceConfiguration into the DocumentClient via reflection. + typeof(DocumentClient) + .GetProperty("accountServiceConfiguration", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(documentClient, accountServiceConfiguration); + + + typeof(DocumentClient) + .GetField("storeClientFactory", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(documentClient, mockStoreClientFactory.Object); + + // Act: Call the private method via reflection + typeof(DocumentClient) + .GetMethod("CreateStoreModel", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .Invoke(documentClient, new object[] { true }); + + // Assert: Verify the correct value was passed + mockStoreClientFactory.Verify(f => + f.CreateStoreClient( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.Is(config => config.EnableNRegionSynchronousCommit == accountProperties.EnableNRegionSynchronousCommit), + It.IsAny()), + Times.Once, + "EnableNRegionSynchronousCommit was not passed correctly to AccountConfigurationProperties and StoreClient."); + } private DocumentClientException CreateTooManyRequestException(int retryAfterInMilliseconds) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index cfca49d25b..697903921e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.Globalization; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.DocDBErrors; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; @@ -819,5 +820,181 @@ public void GlobalStrongConsistencyMockTest() Assert.IsTrue(nGlobalCommitedLSN == 90); } } + + private TransportClient GetMockTransportClientForNRegionSynchronousWrites( + AddressInformation[] addressInformation, bool globalNLsnNeverCatchesUp) + { + Mock mockTransportClient = new Mock(); + + // create mock store response object + StoreResponse mockStoreResponse1 = new StoreResponse(); + StoreResponse mockStoreResponse2 = new StoreResponse(); + StoreResponse mockStoreResponse3 = new StoreResponse(); + StoreResponse mockStoreResponse4 = new StoreResponse(); + StoreResponse mockStoreResponse5 = new StoreResponse(); + + + // set lsn and activityid on the store response. + mockStoreResponse1.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "100"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_1" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "90" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse2.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "100"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_2" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "95" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse3.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "103"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "98" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse4.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "103"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "99" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse5.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "106"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "100" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + for (int i = 0; i < addressInformation.Length; i++) + { + + if (globalNLsnNeverCatchesUp) + { + mockTransportClient.Setup(client => client.InvokeResourceOperationAsync( + new TransportAddressUri(new Uri(addressInformation[i].PhysicalUri)), It.IsAny())) + .Returns(Task.FromResult(mockStoreResponse1)); + + } + else + { + mockTransportClient.SetupSequence(client => client.InvokeResourceOperationAsync( + new TransportAddressUri(new Uri(addressInformation[i].PhysicalUri)), It.IsAny())) + .Returns(Task.FromResult(mockStoreResponse1)) // initial write response + .Returns(Task.FromResult(mockStoreResponse2)) // barrier retry, count 1 + .Returns(Task.FromResult(mockStoreResponse3)) // barrier retry, count 2 + .Returns(Task.FromResult(mockStoreResponse4)) // barrier retry, count 3 + .Returns(Task.FromResult(mockStoreResponse5)); // barrier retry, count 4 GlobalNRegionCommittedGLSN catches up. + } + + } + + return mockTransportClient.Object; + } + + /** + + Tests the feature called nregion synchronous commit. This is an account level feature enabled via the accountConfigProperties with property name "EnableNRegionSynchronousCommit" + + Business logic: We send single request to primary of the Write region,which will take care of replicating to its secondaries, one of which is XPPrimary. XPPrimary in this case will replicate this request to n read regions, which will ack from within their region. + In the write region where the original request was sent to , the request returns from the backend once write quorum number of replicas commits the write - but at this time, the response cannot be returned to caller, since linearizability guarantees will be violated. + ConsistencyWriter will continuously issue barrier head requests against the partition in question, until GlobalNRegionCommittedGLSN is at least as big as the lsn of the original response. + Sequence of steps: + 1. After receiving response from primary of write region, look at GlobalNRegionCommittedGLSN and LSN headers. + 2. If GlobalNRegionCommittedGLSN == LSN, return response to caller + 3. If GlobalNRegionCommittedGLSN < LSN && storeResponse.NumberOFReadRegions > 0 , cache LSN in request as SelectedGlobalNRegionCommittedGLSN, and issue barrier requests against any/all replicas. + 4. Each barrier response will contain its own LSN and GlobalNRegionCommittedGLSN, check for any response that satisfies GlobalNRegionCommittedGLSN >= SelectedGlobalNRegionCommittedGLSN + 5. Return to caller on success. + + **/ + [TestMethod] + public void TestWhenNRegionSynchronousCommitEnabledThenDoBarrierHead() + { + // create a real document service request (with auth token level = god) + DocumentServiceRequest entity = DocumentServiceRequest.Create(OperationType.Create, ResourceType.Document, AuthorizationTokenType.SystemAll); + + // set request charge tracker - this is referenced in store reader (ReadMultipleReplicaAsync) + DocumentServiceRequestContext requestContext = new DocumentServiceRequestContext + { + RequestChargeTracker = new RequestChargeTracker() + }; + entity.RequestContext = requestContext; + + // set a dummy resource id on the request. + entity.ResourceId = "1-MxAPlgMgA="; + + // set consistency level on the request to Bounded Staleness + entity.Headers[HttpConstants.HttpHeaders.ConsistencyLevel] = ConsistencyLevel.Session.ToString(); + + // also setup timeout helper, used in store reader + entity.RequestContext.TimeoutHelper = new TimeoutHelper(new TimeSpan(2, 2, 2)); + + // when the store reader throws Invalid Partition exception, the higher layer should + // clear this target identity. + entity.RequestContext.TargetIdentity = new ServiceIdentity("dummyTargetIdentity1", new Uri("http://dummyTargetIdentity1"), false); + entity.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange(); + + AddressInformation[] addressInformation = this.GetMockAddressInformationDuringUpgrade(); + Mock mockAddressCache = this.GetMockAddressCache(addressInformation); + + // validate that the mock works + PartitionAddressInformation partitionAddressInformation = mockAddressCache.Object.ResolveAsync(entity, false, new CancellationToken()).Result; + IReadOnlyList addressInfo = partitionAddressInformation.AllAddresses; + + Assert.IsTrue(addressInfo[0] == addressInformation[0]); + + AddressSelector addressSelector = new AddressSelector(mockAddressCache.Object, Protocol.Tcp); + Uri primaryAddress = addressSelector.ResolvePrimaryTransportAddressUriAsync(entity, false /*forceAddressRefresh*/).Result.Uri; + + // check if the address return from Address Selector matches the original address info + Assert.IsTrue(primaryAddress.Equals(addressInformation[0].PhysicalUri)); + + ISessionContainer sessionContainer = new SessionContainer(string.Empty); + + Mock mockServiceConfigReader = new Mock(); + mockServiceConfigReader.Setup(reader => reader.DefaultConsistencyLevel).Returns(Documents.ConsistencyLevel.Session); + + Mock mockAuthorizationTokenProvider = new Mock(); + mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(0)); + + TransportClient mockTransportClient = this.GetMockTransportClientForNRegionSynchronousWrites(addressInformation, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + + AccountConfigurationProperties accountConfigurationProperties = new AccountConfigurationProperties(true); + + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false, accountConfigurationProperties: accountConfigurationProperties); + StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(3000)), false).Result; + Assert.AreEqual(100, response.LSN); + + + try + { + mockTransportClient = this.GetMockTransportClientForNRegionSynchronousWrites(addressInformation, true); + storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false, accountConfigurationProperties: accountConfigurationProperties); + response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(3000)), false).Result; + Assert.Fail(); + } + catch (AggregateException ex) + { + if (ex.InnerException is DocumentClientException goneEx) + { + DefaultTrace.TraceInformation("Gone exception expected!"); + Assert.AreEqual(SubStatusCodes.Server_NRegionCommitWriteBarrierNotMet, goneEx.GetSubStatusCode()); + } + } + } } } \ No newline at end of file