diff --git a/Directory.Build.props b/Directory.Build.props index 05495528c9..5f2908cc98 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.53.1 3.54.0 preview.1 - 3.40.3 + 3.41.0 1.0.0 beta.0 2.0.5 diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 2311dd2c5d..8176b020b9 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6778,6 +6778,8 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory private void CreateStoreModel(bool subscribeRntbdStatus) { + AccountConfigurationProperties accountConfigurationProperties = new (EnableNRegionSynchronousCommit: this.accountServiceConfiguration.AccountProperties.EnableNRegionSynchronousCommit); + //EnableReadRequestsFallback, if not explicity set on the connection policy, //is false if the account's consistency is bounded staleness, //and true otherwise. @@ -6792,7 +6794,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, enableReplicaValidation: this.isReplicaAddressValidationEnabled, - sessionRetryOptions: this.ConnectionPolicy.SessionRetryOptions); + sessionRetryOptions: this.ConnectionPolicy.SessionRetryOptions, + accountConfigurationProperties: accountConfigurationProperties); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs b/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs index 9157964b9e..e8cd9ac67e 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Settings/AccountProperties.cs @@ -269,5 +269,7 @@ private IDictionary QueryStringToDictConverter() [JsonExtensionData] internal IDictionary AdditionalProperties { get; set; } + [JsonProperty(PropertyName = Constants.Properties.EnableNRegionSynchronousCommit, NullValueHandling = NullValueHandling.Ignore)] + internal bool EnableNRegionSynchronousCommit { get; set; } } } diff --git a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs index 36859cfd94..f4a3cf0e79 100644 --- a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs +++ b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs @@ -113,7 +113,8 @@ public static async Task SerializeProxyRequestAsync( activityId, bufferProvider.Provider, accountName, - out _, + accountName, + out _, out _); int length = serializedRequest.RequestSize; diff --git a/Microsoft.Azure.Cosmos/src/Tracing/TraceWriter.TraceJsonWriter.cs b/Microsoft.Azure.Cosmos/src/Tracing/TraceWriter.TraceJsonWriter.cs index 8a2eabc13b..481c7255f9 100644 --- a/Microsoft.Azure.Cosmos/src/Tracing/TraceWriter.TraceJsonWriter.cs +++ b/Microsoft.Azure.Cosmos/src/Tracing/TraceWriter.TraceJsonWriter.cs @@ -409,7 +409,10 @@ public void Visit(StoreResult storeResult) this.WriteStringValueOrNull(storeResult.PartitionKeyRangeId); this.jsonWriter.WriteFieldName(nameof(storeResult.GlobalCommittedLSN)); - this.jsonWriter.WriteNumberValue(storeResult.GlobalCommittedLSN); + this.jsonWriter.WriteNumberValue(storeResult.GlobalCommittedLSN); + + this.jsonWriter.WriteFieldName(nameof(storeResult.GlobalNRegionCommittedGLSN)); + this.jsonWriter.WriteNumberValue(storeResult.GlobalNRegionCommittedGLSN); this.jsonWriter.WriteFieldName(nameof(storeResult.ItemLSN)); this.jsonWriter.WriteNumberValue(storeResult.ItemLSN); @@ -453,7 +456,7 @@ public void Visit(StoreResult storeResult) this.jsonWriter.WriteFieldName("TransportException"); TransportException transportException = storeResult.Exception?.InnerException as TransportException; - this.WriteStringValueOrNull(transportException?.Message); + this.WriteStringValueOrNull(transportException?.Message); this.jsonWriter.WriteObjectEnd(); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs index 43fe495852..34a35ccd46 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientUnitTests.cs @@ -8,14 +8,18 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Globalization; using System.Net; using System.Net.Http; + using System.Net.Security; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Internal; using Microsoft.Azure.Cosmos.Linq; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; + using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Cosmos.Utils; using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Client; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -272,6 +276,112 @@ private void TestRetryOnThrottled(int? numberOfRetries) Assert.IsTrue(throttled); } + [TestMethod] + [DataRow(false, DisplayName = "NRegion Synchronous commit is disabled for the account")] + [DataRow(true, DisplayName = "NRegion Synchronous commit is enabled for the account")] + public void EnableNRegionSynchronousCommit_PassedToStoreClient(bool nRegionCommitEnabled) + { + + StoreClient storeClient = new StoreClient( + new Mock().Object, + new SessionContainer(string.Empty), + new Mock().Object, + new Mock().Object, + Protocol.Tcp, + new Mock().Object); + // Arrange + Mock mockStoreClientFactory = new Mock(); + mockStoreClientFactory.Setup(f => f.CreateStoreClient( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny() + )).Returns(storeClient); + + DocumentClient documentClient = new DocumentClient( + new Uri("https://localhost:8081"), + new Mock().Object, + new EventHandler((s, e) => { }), + new ConnectionPolicy(), + null, // desiredConsistencyLevel + null, // serializerSettings + ApiType.None, + new EventHandler((s, e) => { }), + null, // handler + new Mock().Object, + null, // enableCpuMonitor + new Func(tc => tc), + mockStoreClientFactory.Object, + false, // isLocalQuorumConsistency + "testClientId", + new RemoteCertificateValidationCallback((sender, certificate, chain, sslPolicyErrors) => true), + new Mock().Object, + new Mock().Object, + true // enableAsyncCacheExceptionNoSharing + ); + + AccountProperties accountProperties = new AccountProperties + { + // Set the property to true for test + EnableNRegionSynchronousCommit = nRegionCommitEnabled, + }; + + AccountConsistency ac = new AccountConsistency(); + ac.DefaultConsistencyLevel = (Cosmos.ConsistencyLevel) ConsistencyLevel.Session; + accountProperties.Consistency = ac; + + Func> getDatabaseAccountFn = () => + // When called with any Uri, return the expected AccountProperties + Task.FromResult(accountProperties); + + CosmosAccountServiceConfiguration accountServiceConfiguration = new CosmosAccountServiceConfiguration( + getDatabaseAccountFn); + + typeof(CosmosAccountServiceConfiguration) + .GetProperty("AccountProperties", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(accountServiceConfiguration, accountProperties); + + //Inject the accountServiceConfiguration into the DocumentClient via reflection. + typeof(DocumentClient) + .GetProperty("accountServiceConfiguration", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(documentClient, accountServiceConfiguration); + + + typeof(DocumentClient) + .GetField("storeClientFactory", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .SetValue(documentClient, mockStoreClientFactory.Object); + + // Act: Call the private method via reflection + typeof(DocumentClient) + .GetMethod("CreateStoreModel", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic) + .Invoke(documentClient, new object[] { true }); + + // Assert: Verify the correct value was passed + mockStoreClientFactory.Verify(f => + f.CreateStoreClient( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.Is(config => config.EnableNRegionSynchronousCommit == accountProperties.EnableNRegionSynchronousCommit), + It.IsAny()), + Times.Once, + "EnableNRegionSynchronousCommit was not passed correctly to AccountConfigurationProperties and StoreClient."); + } private DocumentClientException CreateTooManyRequestException(int retryAfterInMilliseconds) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.TraceData.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.TraceData.xml index 13be13cdef..3254c58ab8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.TraceData.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.TraceData.xml @@ -272,7 +272,8 @@ Exception ) ]]> - + ( id, - $@"{{""id"":""{id}"",""writableLocations"":[],""readableLocations"":[],""userConsistencyPolicy"":null,""addresses"":null,""userReplicationPolicy"":null,""systemReplicationPolicy"":null,""readPolicy"":null,""queryEngineConfiguration"":null,""enableMultipleWriteLocations"":false,""enablePerPartitionFailoverBehavior"":null}}"); + $@"{{""id"":""{id}"",""writableLocations"":[],""readableLocations"":[],""userConsistencyPolicy"":null,""addresses"":null,""userReplicationPolicy"":null,""systemReplicationPolicy"":null,""readPolicy"":null,""queryEngineConfiguration"":null,""enableMultipleWriteLocations"":false,""enablePerPartitionFailoverBehavior"":null,""enableNRegionSynchronousCommit"":false}}"); this.TestProperty( id, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosSerializerCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosSerializerCoreTests.cs index 7b18b25518..00f00156fc 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosSerializerCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosSerializerCoreTests.cs @@ -143,7 +143,7 @@ public void ValidateCustomSerializerNotUsedForInternalTypes() this.TestProperty( serializerCore, id, - $@"{{""id"":""{id}"",""writableLocations"":[],""readableLocations"":[],""userConsistencyPolicy"":null,""addresses"":null,""userReplicationPolicy"":null,""systemReplicationPolicy"":null,""readPolicy"":null,""queryEngineConfiguration"":null,""enableMultipleWriteLocations"":false,""enablePerPartitionFailoverBehavior"":null}}"); + $@"{{""id"":""{id}"",""writableLocations"":[],""readableLocations"":[],""userConsistencyPolicy"":null,""addresses"":null,""userReplicationPolicy"":null,""systemReplicationPolicy"":null,""readPolicy"":null,""queryEngineConfiguration"":null,""enableMultipleWriteLocations"":false,""enablePerPartitionFailoverBehavior"":null,""enableNRegionSynchronousCommit"":false}}"); this.TestProperty( serializerCore, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index cfca49d25b..8f5521c2a2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.Globalization; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.DocDBErrors; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; @@ -819,5 +820,181 @@ public void GlobalStrongConsistencyMockTest() Assert.IsTrue(nGlobalCommitedLSN == 90); } } + + private TransportClient GetMockTransportClientForNRegionSynchronousWrites( + AddressInformation[] addressInformation, bool globalNLsnNeverCatchesUp) + { + Mock mockTransportClient = new Mock(); + + // create mock store response object + StoreResponse mockStoreResponse1 = new StoreResponse(); + StoreResponse mockStoreResponse2 = new StoreResponse(); + StoreResponse mockStoreResponse3 = new StoreResponse(); + StoreResponse mockStoreResponse4 = new StoreResponse(); + StoreResponse mockStoreResponse5 = new StoreResponse(); + + + // set lsn and activityid on the store response. + mockStoreResponse1.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "100"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_1" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "90" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse2.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "100"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_2" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "95" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse3.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "103"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "98" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse4.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "103"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "99" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + mockStoreResponse5.Headers = new StoreResponseNameValueCollection() + { + { WFConstants.BackendHeaders.LSN, "106"}, + { WFConstants.BackendHeaders.ActivityId, "ACTIVITYID1_3" }, + { WFConstants.BackendHeaders.GlobalNRegionCommittedGLSN, "100" }, + { WFConstants.BackendHeaders.NumberOfReadRegions, "1" }, + }; + + for (int i = 0; i < addressInformation.Length; i++) + { + + if (globalNLsnNeverCatchesUp) + { + mockTransportClient.Setup(client => client.InvokeResourceOperationAsync( + new TransportAddressUri(new Uri(addressInformation[i].PhysicalUri)), It.IsAny())) + .Returns(Task.FromResult(mockStoreResponse1)); + + } + else + { + mockTransportClient.SetupSequence(client => client.InvokeResourceOperationAsync( + new TransportAddressUri(new Uri(addressInformation[i].PhysicalUri)), It.IsAny())) + .Returns(Task.FromResult(mockStoreResponse1)) // initial write response + .Returns(Task.FromResult(mockStoreResponse2)) // barrier retry, count 1 + .Returns(Task.FromResult(mockStoreResponse3)) // barrier retry, count 2 + .Returns(Task.FromResult(mockStoreResponse4)) // barrier retry, count 3 + .Returns(Task.FromResult(mockStoreResponse5)); // barrier retry, count 4 GlobalNRegionCommittedGLSN catches up. + } + + } + + return mockTransportClient.Object; + } + + /** + + Tests the feature called nregion synchronous commit. This is an account level feature enabled via the accountConfigProperties with property name "EnableNRegionSynchronousCommit" + + Business logic: We send single request to primary of the Write region,which will take care of replicating to its secondaries, one of which is XPPrimary. XPPrimary in this case will replicate this request to n read regions, which will ack from within their region. + In the write region where the original request was sent to , the request returns from the backend once write quorum number of replicas commits the write - but at this time, the response cannot be returned to caller, since linearizability guarantees will be violated. + ConsistencyWriter will continuously issue barrier head requests against the partition in question, until GlobalNRegionCommittedGLSN is at least as big as the lsn of the original response. + Sequence of steps: + 1. After receiving response from primary of write region, look at GlobalNRegionCommittedGLSN and LSN headers. + 2. If GlobalNRegionCommittedGLSN == LSN, return response to caller + 3. If GlobalNRegionCommittedGLSN < LSN && storeResponse.NumberOfReadRegions > 0 , cache LSN in request as SelectedGlobalNRegionCommittedGLSN, and issue barrier requests against any/all replicas. + 4. Each barrier response will contain its own LSN and GlobalNRegionCommittedGLSN, check for any response that satisfies GlobalNRegionCommittedGLSN >= SelectedGlobalNRegionCommittedGLSN + 5. Return to caller on success. + + **/ + [TestMethod] + public void TestWhenNRegionSynchronousCommitEnabledThenDoBarrierHead() + { + // create a real document service request (with auth token level = god) + DocumentServiceRequest entity = DocumentServiceRequest.Create(OperationType.Create, ResourceType.Document, AuthorizationTokenType.SystemAll); + + // set request charge tracker - this is referenced in store reader (ReadMultipleReplicaAsync) + DocumentServiceRequestContext requestContext = new DocumentServiceRequestContext + { + RequestChargeTracker = new RequestChargeTracker() + }; + entity.RequestContext = requestContext; + + // set a dummy resource id on the request. + entity.ResourceId = "1-MxAPlgMgA="; + + // set consistency level on the request to Bounded Staleness + entity.Headers[HttpConstants.HttpHeaders.ConsistencyLevel] = ConsistencyLevel.Session.ToString(); + + // also setup timeout helper, used in store reader + entity.RequestContext.TimeoutHelper = new TimeoutHelper(new TimeSpan(2, 2, 2)); + + // when the store reader throws Invalid Partition exception, the higher layer should + // clear this target identity. + entity.RequestContext.TargetIdentity = new ServiceIdentity("dummyTargetIdentity1", new Uri("http://dummyTargetIdentity1"), false); + entity.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange(); + + AddressInformation[] addressInformation = this.GetMockAddressInformationDuringUpgrade(); + Mock mockAddressCache = this.GetMockAddressCache(addressInformation); + + // validate that the mock works + PartitionAddressInformation partitionAddressInformation = mockAddressCache.Object.ResolveAsync(entity, false, new CancellationToken()).Result; + IReadOnlyList addressInfo = partitionAddressInformation.AllAddresses; + + Assert.IsTrue(addressInfo[0] == addressInformation[0]); + + AddressSelector addressSelector = new AddressSelector(mockAddressCache.Object, Protocol.Tcp); + Uri primaryAddress = addressSelector.ResolvePrimaryTransportAddressUriAsync(entity, false /*forceAddressRefresh*/).Result.Uri; + + // check if the address return from Address Selector matches the original address info + Assert.IsTrue(primaryAddress.Equals(addressInformation[0].PhysicalUri)); + + ISessionContainer sessionContainer = new SessionContainer(string.Empty); + + Mock mockServiceConfigReader = new Mock(); + mockServiceConfigReader.Setup(reader => reader.DefaultConsistencyLevel).Returns(Documents.ConsistencyLevel.Session); + + Mock mockAuthorizationTokenProvider = new Mock(); + mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(0)); + + TransportClient mockTransportClient = this.GetMockTransportClientForNRegionSynchronousWrites(addressInformation, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + + AccountConfigurationProperties accountConfigurationProperties = new AccountConfigurationProperties(true); + + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false, accountConfigurationProperties: accountConfigurationProperties); + StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(3000)), false).Result; + Assert.AreEqual(100, response.LSN); + + + try + { + mockTransportClient = this.GetMockTransportClientForNRegionSynchronousWrites(addressInformation, true); + storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false, accountConfigurationProperties: accountConfigurationProperties); + response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(3000)), false).Result; + Assert.Fail(); + } + catch (AggregateException ex) + { + if (ex.InnerException is DocumentClientException goneEx) + { + DefaultTrace.TraceInformation("Gone exception expected!"); + Assert.AreEqual(SubStatusCodes.Server_NRegionCommitWriteBarrierNotMet, goneEx.GetSubStatusCode()); + } + } + } } } \ No newline at end of file