Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,8 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient);
this.httpClient,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);

thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(
return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
protected static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
Expand Down
13 changes: 9 additions & 4 deletions Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ThinClientStoreClient : GatewayStoreClient
{
private readonly ObjectPool<BufferProviderWrapper> bufferProviderWrapperPool;
private readonly ObjectPool<BufferProviderWrapper> bufferProviderWrapperPool;
private readonly bool isPartitionLevelFailoverEnabled;

public ThinClientStoreClient(
CosmosHttpClient httpClient,
ICommunicationEventSource eventSource,
JsonSerializerSettings serializerSettings = null)
JsonSerializerSettings serializerSettings = null,
bool isPartitionLevelFailoverEnabled = false)
: base(httpClient,
eventSource,
serializerSettings)
{
this.bufferProviderWrapperPool = new ObjectPool<BufferProviderWrapper>(() => new BufferProviderWrapper());
this.bufferProviderWrapperPool = new ObjectPool<BufferProviderWrapper>(() => new BufferProviderWrapper());
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
}

public override async Task<DocumentServiceResponse> InvokeAsync(
Expand Down Expand Up @@ -151,7 +154,9 @@ private Task<HttpResponseMessage> InvokeClientAsync(
return base.httpClient.SendHttpAsync(
() => this.PrepareRequestForProxyAsync(request, physicalAddress, thinClientEndpoint, globalDatabaseAccountName, clientCollectionCache),
resourceType,
HttpTimeoutPolicy.GetTimeoutPolicy(request),
HttpTimeoutPolicy.GetTimeoutPolicy(
request,
this.isPartitionLevelFailoverEnabled),
request.RequestContext.ClientRequestStatistics,
cancellationToken);
}
Expand Down
47 changes: 37 additions & 10 deletions Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;

Expand All @@ -18,8 +20,10 @@ namespace Microsoft.Azure.Cosmos
/// It applies session tokens, resolves partition key ranges, and delegates requests to ThinClientStoreClient.
/// </summary>
internal class ThinClientStoreModel : GatewayStoreModel
{
private ThinClientStoreClient thinClientStoreClient;
{
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
private readonly bool isPartitionLevelFailoverEnabled;
private ThinClientStoreClient thinClientStoreClient;

public ThinClientStoreModel(
GlobalEndpointManager endpointManager,
Expand All @@ -28,7 +32,8 @@ public ThinClientStoreModel(
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
bool isPartitionLevelFailoverEnabled = false)
: base(endpointManager,
sessionContainer,
defaultConsistencyLevel,
Expand All @@ -40,7 +45,13 @@ public ThinClientStoreModel(
this.thinClientStoreClient = new ThinClientStoreClient(
httpClient,
eventSource,
serializerSettings);
serializerSettings,
isPartitionLevelFailoverEnabled);

this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.globalPartitionEndpointManager = globalPartitionEndpointManager;
this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
base.MarkEndpointsToHealthyAsync);
}

public override async Task<DocumentServiceResponse> ProcessMessageAsync(
Expand All @@ -58,14 +69,30 @@ await GatewayStoreModel.ApplySessionTokenAsync(
DocumentServiceResponse response;
try
{
Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType) ? base.GetFeedUri(request) : base.GetEntityUri(request);
if (request.ResourceType.Equals(ResourceType.Document) && base.endpointManager.TryGetLocationForGatewayDiagnostics(
request.RequestContext.LocationEndpointToRoute,
out string regionName))
{
request.RequestContext.RegionName = regionName;
}

}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType) ? base.GetFeedUri(request) : base.GetEntityUri(request);
AccountProperties properties = await this.GetDatabaseAccountPropertiesAsync();
response = await this.thinClientStoreClient.InvokeAsync(
request,
Expand Down Expand Up @@ -100,7 +127,7 @@ await this.CaptureSessionTokenAndHandleSplitAsync(

return response;
}

private async Task<AccountProperties> GetDatabaseAccountPropertiesAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.IO;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -71,7 +74,7 @@ public void TestCleanup()
this.thinClientStoreModel?.Dispose();
}

[TestMethod]
[TestMethod]
public async Task ProcessMessageAsync_Success_ShouldReturnDocumentServiceResponse()
{
// Arrange
Expand Down Expand Up @@ -171,7 +174,7 @@ public void Dispose_ShouldDisposeThinClientStoreClient()

[TestMethod]
public async Task ProcessMessageAsync_404_ShouldThrowDocumentClientException()
{
{
// Arrange
MockThinClientStoreClient thinClientStoreClient = new MockThinClientStoreClient(
(request, resourceType, uri, endpoint, globalDatabaseAccountName, clientCollectionCache, cancellationToken) =>
Expand Down Expand Up @@ -233,12 +236,150 @@ public async Task ProcessMessageAsync_404_ShouldThrowDocumentClientException()

storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache);

ReplaceThinClientStoreClientField(storeModel, thinClientStoreClient);

// Act & Assert
ReplaceThinClientStoreClientField(storeModel, thinClientStoreClient);
// Act & Assert
await Assert.ThrowsExceptionAsync<DocumentClientException>(
async () => await storeModel.ProcessMessageAsync(request),
"Expected 404 DocumentClientException from the final thinClientStore call");
}

[TestMethod]
public async Task PartitionLevelFailoverEnabled_ResolvesPartitionKeyRangeAndCallsLocationOverride()
{
Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(c => c.ServiceEndpoint).Returns(new Uri("https://mock.proxy.com"));
mockDocumentClient
.Setup(c => c.GetDatabaseAccountInternalAsync(It.IsAny<Uri>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new AccountProperties());

ConnectionPolicy connectionPolicy = new ConnectionPolicy();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, connectionPolicy);

Mock<GlobalPartitionEndpointManager> globalPartitionEndpointManager = new Mock<GlobalPartitionEndpointManager>();
globalPartitionEndpointManager
.Setup(m => m.TryAddPartitionLevelLocationOverride(It.IsAny<DocumentServiceRequest>()))
.Returns(true)
.Verifiable();

ISessionContainer sessionContainer = new Mock<ISessionContainer>().Object;
DocumentClientEventSource eventSource = new Mock<DocumentClientEventSource>().Object;
Newtonsoft.Json.JsonSerializerSettings serializerSettings = new Newtonsoft.Json.JsonSerializerSettings();
CosmosHttpClient httpClient = new Mock<CosmosHttpClient>().Object;

ThinClientStoreModel storeModel = new ThinClientStoreModel(
endpointManager,
globalPartitionEndpointManager.Object,
sessionContainer,
Cosmos.ConsistencyLevel.Session,
eventSource,
serializerSettings,
httpClient,
isPartitionLevelFailoverEnabled: true);

Mock<ClientCollectionCache> mockCollectionCache = new Mock<ClientCollectionCache>(
sessionContainer,
storeModel,
null,
null,
null,
false);

ContainerProperties containerProperties = new ContainerProperties("test", "/pk");
typeof(ContainerProperties)
.GetProperty("ResourceId", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Public)
?.SetValue(containerProperties, "testCollectionRid");
containerProperties.PartitionKeyPath = "/pk";

mockCollectionCache
.Setup(c => c.ResolveCollectionAsync(It.IsAny<DocumentServiceRequest>(), It.IsAny<CancellationToken>(), It.IsAny<ITrace>()))
.ReturnsAsync(containerProperties);

Mock<PartitionKeyRangeCache> mockPartitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(
null,
storeModel,
mockCollectionCache.Object,
endpointManager,
false);

PartitionKeyRange pkRange = new PartitionKeyRange { Id = "0", MinInclusive = "", MaxExclusive = "FF" };
List<PartitionKeyRange> pkRanges = new List<PartitionKeyRange> { pkRange };
IEnumerable<Tuple<PartitionKeyRange, ServiceIdentity>> rangeTuples = pkRanges.Select(r => Tuple.Create(r, (ServiceIdentity)null));
CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap(rangeTuples, "testCollectionRid");

mockPartitionKeyRangeCache
.Setup(c => c.TryLookupAsync(It.IsAny<string>(), It.IsAny<CollectionRoutingMap>(), It.IsAny<DocumentServiceRequest>(), It.IsAny<ITrace>()))
.ReturnsAsync(routingMap);

storeModel.SetCaches(mockPartitionKeyRangeCache.Object, mockCollectionCache.Object);

DocumentServiceRequest request = CreatePartitionedDocumentRequest();

MockThinClientStoreClient mockThinClientStoreClient = new MockThinClientStoreClient(
(DocumentServiceRequest req, ResourceType resourceType, Uri uri, Uri endpoint, string globalDatabaseAccountName, ClientCollectionCache clientCollectionCache, CancellationToken cancellationToken) =>
{
MemoryStream stream = new MemoryStream(new byte[] { 1, 2, 3 });
INameValueCollection headers = new StoreResponseNameValueCollection();
return Task.FromResult(new DocumentServiceResponse(stream, headers, HttpStatusCode.OK));
});

ReplaceThinClientStoreClientField(storeModel, mockThinClientStoreClient);

// Act
await storeModel.ProcessMessageAsync(request);

// Assert
globalPartitionEndpointManager.Verify(m => m.TryAddPartitionLevelLocationOverride(It.IsAny<DocumentServiceRequest>()), Times.Once());
}

[TestMethod]
public void CircuitBreaker_MarksPartitionUnavailableOnRepeatedFailures()
{
Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(c => c.ServiceEndpoint).Returns(new Uri("https://mock.proxy.com"));
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, connectionPolicy);
Mock<GlobalPartitionEndpointManager> globalPartitionEndpointManager = new Mock<GlobalPartitionEndpointManager>();
globalPartitionEndpointManager
.Setup(m => m.TryAddPartitionLevelLocationOverride(It.IsAny<DocumentServiceRequest>()))
.Returns(true);

globalPartitionEndpointManager
.Setup(m => m.TryMarkEndpointUnavailableForPartitionKeyRange(It.IsAny<DocumentServiceRequest>()))
.Returns(true)
.Verifiable();

globalPartitionEndpointManager
.Setup(m => m.IncrementRequestFailureCounterAndCheckIfPartitionCanFailover(It.IsAny<DocumentServiceRequest>()))
.Returns(true);

ISessionContainer sessionContainer = new Mock<ISessionContainer>().Object;
DocumentClientEventSource eventSource = new Mock<DocumentClientEventSource>().Object;
Newtonsoft.Json.JsonSerializerSettings serializerSettings = new Newtonsoft.Json.JsonSerializerSettings();
CosmosHttpClient httpClient = new Mock<CosmosHttpClient>().Object;

ThinClientStoreModel storeModel = new ThinClientStoreModel(
endpointManager,
globalPartitionEndpointManager.Object,
sessionContainer,
Cosmos.ConsistencyLevel.Session,
eventSource,
serializerSettings,
httpClient,
isPartitionLevelFailoverEnabled: true);

TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager);

DocumentServiceRequest request = CreatePartitionedDocumentRequest();

for (int i = 0; i < 3; i++)
{
globalPartitionEndpointManager.Object.IncrementRequestFailureCounterAndCheckIfPartitionCanFailover(request);
}

globalPartitionEndpointManager.Object.TryMarkEndpointUnavailableForPartitionKeyRange(request);

globalPartitionEndpointManager.Verify(m => m.TryMarkEndpointUnavailableForPartitionKeyRange(It.IsAny<DocumentServiceRequest>()), Times.Once());
}

private static void ReplaceThinClientStoreClientField(ThinClientStoreModel model, ThinClientStoreClient newClient)
Expand All @@ -251,6 +392,18 @@ private static void ReplaceThinClientStoreClientField(ThinClientStoreModel model
field.SetValue(model, newClient);
}

private static DocumentServiceRequest CreatePartitionedDocumentRequest()
{
DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"/dbs/test/colls/test/docs/test",
AuthorizationTokenType.PrimaryMasterKey);
request.Headers[HttpConstants.HttpHeaders.PartitionKey] = "[\"test\"]";
request.RequestContext = new DocumentServiceRequestContext();
return request;
}

internal class MockThinClientStoreClient : ThinClientStoreClient
{
private readonly Func<DocumentServiceRequest, ResourceType, Uri, Uri, string, ClientCollectionCache, CancellationToken, Task<DocumentServiceResponse>> invokeAsyncFunc;
Expand Down
Loading