Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,8 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient);
this.httpClient,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);

thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(
return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
protected static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
{
// For multi master write accounts, since all the regions are treated as write regions, the next locations to fail over
// will be the preferred read regions that are configured in the application preferred regions in the CosmosClientOptions.
ReadOnlyCollection<Uri> nextLocations = this.globalEndpointManager.ReadEndpoints;
ReadOnlyCollection<Uri> nextLocations = ConfigurationManager.IsThinClientEnabled(defaultValue: false) && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
Comment thread
kundadebdatta marked this conversation as resolved.
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.ReadEndpoints;

return this.TryAddOrUpdatePartitionFailoverInfoAndMoveToNextLocation(
partitionKeyRange,
Expand All @@ -181,7 +183,9 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
else if (this.IsRequestEligibleForPerPartitionAutomaticFailover(request))
{
// For any single master write accounts, the next locations to fail over will be the read regions configured at the account level.
ReadOnlyCollection<Uri> nextLocations = this.globalEndpointManager.AccountReadEndpoints;
ReadOnlyCollection<Uri> nextLocations = ConfigurationManager.IsThinClientEnabled(defaultValue: false) && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.AccountReadEndpoints;

return this.TryAddOrUpdatePartitionFailoverInfoAndMoveToNextLocation(
partitionKeyRange,
Expand Down
11 changes: 7 additions & 4 deletions Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Cosmos
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;
using static Microsoft.Azure.Cosmos.ThinClientTransportSerializer;
Expand All @@ -23,17 +22,21 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ThinClientStoreClient : GatewayStoreClient
{
private readonly bool isPartitionLevelFailoverEnabled;
private readonly ObjectPool<BufferProviderWrapper> bufferProviderWrapperPool;

public ThinClientStoreClient(
CosmosHttpClient httpClient,
ICommunicationEventSource eventSource,
JsonSerializerSettings serializerSettings = null)
JsonSerializerSettings serializerSettings = null,
bool isPartitionLevelFailoverEnabled = false)
: base(httpClient,
eventSource,
serializerSettings)
serializerSettings,
isPartitionLevelFailoverEnabled)
{
this.bufferProviderWrapperPool = new ObjectPool<BufferProviderWrapper>(() => new BufferProviderWrapper());
this.bufferProviderWrapperPool = new ObjectPool<BufferProviderWrapper>(() => new BufferProviderWrapper());
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
}

public override async Task<DocumentServiceResponse> InvokeAsync(
Expand Down
34 changes: 30 additions & 4 deletions Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ThinClientStoreModel : GatewayStoreModel
{
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
private readonly bool isPartitionLevelFailoverEnabled;
private ThinClientStoreClient thinClientStoreClient;

public ThinClientStoreModel(
Expand All @@ -28,19 +30,27 @@ public ThinClientStoreModel(
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
bool isPartitionLevelFailoverEnabled = false)
: base(endpointManager,
sessionContainer,
defaultConsistencyLevel,
eventSource,
serializerSettings,
httpClient,
globalPartitionEndpointManager)
globalPartitionEndpointManager,
isPartitionLevelFailoverEnabled)
{
this.thinClientStoreClient = new ThinClientStoreClient(
httpClient,
eventSource,
serializerSettings);
serializerSettings,
isPartitionLevelFailoverEnabled);

this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.globalPartitionEndpointManager = globalPartitionEndpointManager;
this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
base.MarkEndpointsToHealthyAsync);
}

public override async Task<DocumentServiceResponse> ProcessMessageAsync(
Expand All @@ -63,14 +73,30 @@ await GatewayStoreModel.ApplySessionTokenAsync(
DocumentServiceResponse response;
try
{
Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType) ? base.GetFeedUri(request) : base.GetEntityUri(request);
if (request.ResourceType.Equals(ResourceType.Document) && base.endpointManager.TryGetLocationForGatewayDiagnostics(
request.RequestContext.LocationEndpointToRoute,
out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
Comment thread
kundadebdatta marked this conversation as resolved.
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType) ? base.GetFeedUri(request) : base.GetEntityUri(request);
AccountProperties properties = await this.GetDatabaseAccountPropertiesAsync();
response = await this.thinClientStoreClient.InvokeAsync(
request,
Expand Down
Loading
Loading