Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
// Resolve the endpoint for the request and pin the resolution to the resolved endpoint
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoint = this.isThinClientEnabled
&& ThinClientStoreModel.IsOperationSupportedByThinClient(request)
&& GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ResolveThinClientEndpoint(request)
: this.globalEndpointManager.ResolveServiceEndpoint(request);

Expand Down
46 changes: 14 additions & 32 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,8 +1058,8 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.ConnectionPolicy.EnablePartitionLevelFailover = this.accountServiceConfiguration.AccountProperties.EnablePartitionLevelFailover.Value;
}

this.isThinClientEnabled = (this.accountServiceConfiguration.AccountProperties?.ThinClientWritableLocationsInternal?.Count ?? 0) > 0;

this.isThinClientEnabled = (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway) &&
(this.accountServiceConfiguration.AccountProperties?.ThinClientWritableLocationsInternal?.Count ?? 0) > 0;
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= this.ConnectionPolicy.EnablePartitionLevelFailover;
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());
this.InitializePartitionLevelFailoverWithDefaultHedging();
Expand All @@ -1082,15 +1082,17 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient,
this.PartitionKeyRangeLocation,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);
GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
endpointManager: this.GlobalEndpointManager,
sessionContainer: this.sessionContainer,
defaultConsistencyLevel: (Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
eventSource: this.eventSource,
serializerSettings: this.serializerSettings,
httpClient: this.httpClient,
globalPartitionEndpointManager: this.PartitionKeyRangeLocation,
isThinClientEnabled: this.isThinClientEnabled,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
userAgentContainer: this.ConnectionPolicy.UserAgentContainer);

this.GatewayStoreModel = gatewayStoreModel;

Expand All @@ -1108,27 +1110,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli

if (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway)
{
if (this.isThinClientEnabled)
{
ThinClientStoreModel thinClientStoreModel = new (
endpointManager: this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.sessionContainer,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient,
this.ConnectionPolicy.UserAgentContainer,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);

thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

this.StoreModel = thinClientStoreModel;
}
else
{
this.StoreModel = this.GatewayStoreModel;
}
this.StoreModel = this.GatewayStoreModel;
}
else
{
Expand Down
255 changes: 168 additions & 87 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,93 +30,136 @@ internal class GatewayStoreModel : IStoreModelExtension, IDisposable

internal readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
internal readonly ConsistencyLevel defaultConsistencyLevel;
internal readonly ConsistencyLevel defaultConsistencyLevel;

private ThinClientStoreClient thinClientStoreClient;

private GatewayStoreClient gatewayStoreClient;

// Caches to resolve the PartitionKeyRange from request. For Session Token Optimization.
protected PartitionKeyRangeCache partitionKeyRangeCache;
protected ClientCollectionCache clientCollectionCache;
protected ISessionContainer sessionContainer;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isPartitionLevelFailoverEnabled = false)
protected ISessionContainer sessionContainer;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isThinClientEnabled,
bool isPartitionLevelFailoverEnabled = false,
UserAgentContainer userAgentContainer = null)
{
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;
this.globalPartitionEndpointManager = globalPartitionEndpointManager;
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings,
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings,
isPartitionLevelFailoverEnabled);

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(

if (isThinClientEnabled)
{
this.thinClientStoreClient = new ThinClientStoreClient(
Comment thread
aavasthy marked this conversation as resolved.
Comment thread
kundadebdatta marked this conversation as resolved.
httpClient,
userAgentContainer,
this.eventSource,
isPartitionLevelFailoverEnabled,
serializerSettings);
}

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
this.MarkEndpointsToHealthyAsync);
}

public virtual async Task<DocumentServiceResponse> ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default)
{
await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache,
this.endpointManager);

DocumentServiceResponse response;
try
{
// Collect region name only for document resources
if (request.ResourceType.Equals(ResourceType.Document) && this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request);
response = await this.gatewayStoreClient.InvokeAsync(request, request.ResourceType, physicalAddress, cancellationToken);
}
catch (DocumentClientException exception)
{
if ((!ReplicatedResourceClient.IsMasterResource(request.ResourceType)) &&
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
DocumentServiceResponse response;

await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache,
this.endpointManager);
try
{
if (request.ResourceType.Equals(ResourceType.Document) &&
this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

bool canUseThinClient =
this.thinClientStoreClient != null &&
GatewayStoreModel.IsOperationSupportedByThinClient(request);

Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType)
? this.GetFeedUri(request)
: this.GetEntityUri(request);

if (canUseThinClient)
{
Uri thinClientEndpoint = this.endpointManager.ResolveThinClientEndpoint(request);

Comment thread
aavasthy marked this conversation as resolved.
AccountProperties account = await this.GetDatabaseAccountPropertiesAsync();

response = await this.thinClientStoreClient.InvokeAsync(
request,
request.ResourceType,
physicalAddress,
thinClientEndpoint,
account.Id,
this.clientCollectionCache,
cancellationToken);
}
else
{
response = await this.gatewayStoreClient.InvokeAsync(
request,
request.ResourceType,
physicalAddress,
cancellationToken);
}
}
catch (DocumentClientException exception)
{
if ((!ReplicatedResourceClient.IsMasterResource(request.ResourceType)) &&
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
await this.CaptureSessionTokenAndHandleSplitAsync(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
}

throw;
}

}
throw;
}
await this.CaptureSessionTokenAndHandleSplitAsync(response.StatusCode, response.SubStatusCode, request, response.Headers);
return response;
return response;
}

public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueTask<HttpRequestMessage>> requestMessage,
Expand Down Expand Up @@ -492,27 +535,65 @@ internal static bool IsStoredProcedureCrudOperation(
{
return resourceType == ResourceType.StoredProcedure &&
operationType != Documents.OperationType.ExecuteJavaScript;
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (this.gatewayStoreClient != null)
{
try
{
this.gatewayStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.gatewayStoreClient = null;
}
}
}

internal static bool IsOperationSupportedByThinClient(DocumentServiceRequest request)
{
return request.ResourceType == ResourceType.Document
&& (request.OperationType == OperationType.Batch
|| request.OperationType == OperationType.Patch
|| request.OperationType == OperationType.Create
|| request.OperationType == OperationType.Read
|| request.OperationType == OperationType.Upsert
|| request.OperationType == OperationType.Replace
|| request.OperationType == OperationType.Delete
|| request.OperationType == OperationType.Query);
}
private async Task<AccountProperties> GetDatabaseAccountPropertiesAsync()
Comment thread
aavasthy marked this conversation as resolved.
{
AccountProperties accountProperties = await this.endpointManager.GetDatabaseAccountAsync();
if (accountProperties != null)
{
return accountProperties;
}

throw new InvalidOperationException("Failed to retrieve AccountProperties. The response was null.");
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (this.gatewayStoreClient != null)
{
try
{
this.gatewayStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.gatewayStoreClient = null;
}

if (this.thinClientStoreClient != null)
Comment thread
kundadebdatta marked this conversation as resolved.
Outdated
{
try
{
this.thinClientStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.thinClientStoreClient = null;
}
}
}

internal Uri GetEntityUri(DocumentServiceRequest entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
{
// For multi master write accounts, since all the regions are treated as write regions, the next locations to fail over
// will be the preferred read regions that are configured in the application preferred regions in the CosmosClientOptions.
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.ReadEndpoints;

Expand All @@ -191,7 +191,7 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
else if (this.IsRequestEligibleForPerPartitionAutomaticFailover(request))
{
// For any single master write accounts, the next locations to fail over will be the read regions configured at the account level.
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.AccountReadEndpoints;

Expand Down
Loading
Loading