Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
// Resolve the endpoint for the request and pin the resolution to the resolved endpoint
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoint = this.isThinClientEnabled
&& ThinClientStoreModel.IsOperationSupportedByThinClient(request)
&& GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ResolveThinClientEndpoint(request)
: this.globalEndpointManager.ResolveServiceEndpoint(request);

Expand Down
46 changes: 14 additions & 32 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,8 +1058,8 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.ConnectionPolicy.EnablePartitionLevelFailover = this.accountServiceConfiguration.AccountProperties.EnablePartitionLevelFailover.Value;
}

this.isThinClientEnabled = (this.accountServiceConfiguration.AccountProperties?.ThinClientWritableLocationsInternal?.Count ?? 0) > 0;

this.isThinClientEnabled = (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway) &&
(this.accountServiceConfiguration.AccountProperties?.ThinClientWritableLocationsInternal?.Count ?? 0) > 0;
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= this.ConnectionPolicy.EnablePartitionLevelFailover;
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());
this.InitializePartitionLevelFailoverWithDefaultHedging();
Expand All @@ -1082,15 +1082,17 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient,
this.PartitionKeyRangeLocation,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);
GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
endpointManager: this.GlobalEndpointManager,
sessionContainer: this.sessionContainer,
defaultConsistencyLevel: (Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
eventSource: this.eventSource,
serializerSettings: this.serializerSettings,
httpClient: this.httpClient,
globalPartitionEndpointManager: this.PartitionKeyRangeLocation,
isThinClientEnabled: this.isThinClientEnabled,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
userAgentContainer: this.ConnectionPolicy.UserAgentContainer);

this.GatewayStoreModel = gatewayStoreModel;

Expand All @@ -1108,27 +1110,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli

if (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway)
{
if (this.isThinClientEnabled)
{
ThinClientStoreModel thinClientStoreModel = new (
endpointManager: this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.sessionContainer,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient,
this.ConnectionPolicy.UserAgentContainer,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);

thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

this.StoreModel = thinClientStoreModel;
}
else
{
this.StoreModel = this.GatewayStoreModel;
}
this.StoreModel = this.GatewayStoreModel;
}
else
{
Expand Down
254 changes: 167 additions & 87 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,93 +30,136 @@ internal class GatewayStoreModel : IStoreModelExtension, IDisposable

internal readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
internal readonly ConsistencyLevel defaultConsistencyLevel;
internal readonly ConsistencyLevel defaultConsistencyLevel;

private ThinClientStoreClient thinClientStoreClient;

private GatewayStoreClient gatewayStoreClient;

// Caches to resolve the PartitionKeyRange from request. For Session Token Optimization.
protected PartitionKeyRangeCache partitionKeyRangeCache;
protected ClientCollectionCache clientCollectionCache;
protected ISessionContainer sessionContainer;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isPartitionLevelFailoverEnabled = false)
protected ISessionContainer sessionContainer;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isThinClientEnabled,
bool isPartitionLevelFailoverEnabled = false,
UserAgentContainer userAgentContainer = null)
{
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;
this.globalPartitionEndpointManager = globalPartitionEndpointManager;
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings,
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings,
isPartitionLevelFailoverEnabled);

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(

if (isThinClientEnabled)
{
this.thinClientStoreClient = new ThinClientStoreClient(
Comment thread
aavasthy marked this conversation as resolved.
Comment thread
kundadebdatta marked this conversation as resolved.
httpClient,
userAgentContainer,
this.eventSource,
isPartitionLevelFailoverEnabled,
serializerSettings);
}

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
this.MarkEndpointsToHealthyAsync);
}

public virtual async Task<DocumentServiceResponse> ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default)
{
await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache,
this.endpointManager);

DocumentServiceResponse response;
try
{
// Collect region name only for document resources
if (request.ResourceType.Equals(ResourceType.Document) && this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request);
response = await this.gatewayStoreClient.InvokeAsync(request, request.ResourceType, physicalAddress, cancellationToken);
}
catch (DocumentClientException exception)
{
if ((!ReplicatedResourceClient.IsMasterResource(request.ResourceType)) &&
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
DocumentServiceResponse response;

await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache,
this.endpointManager);
try
{
if (request.ResourceType.Equals(ResourceType.Document) &&
this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

bool canUseThinClient =
this.thinClientStoreClient != null &&
GatewayStoreModel.IsOperationSupportedByThinClient(request);

Uri physicalAddress = ThinClientStoreClient.IsFeedRequest(request.OperationType)
? this.GetFeedUri(request)
: this.GetEntityUri(request);

if (canUseThinClient)
{
Uri thinClientEndpoint = this.endpointManager.ResolveThinClientEndpoint(request);

Comment thread
aavasthy marked this conversation as resolved.
AccountProperties account = await this.GetDatabaseAccountPropertiesAsync();

response = await this.thinClientStoreClient.InvokeAsync(
request,
request.ResourceType,
physicalAddress,
thinClientEndpoint,
account.Id,
this.clientCollectionCache,
cancellationToken);
}
else
{
response = await this.gatewayStoreClient.InvokeAsync(
request,
request.ResourceType,
physicalAddress,
cancellationToken);
}
}
catch (DocumentClientException exception)
{
if ((!ReplicatedResourceClient.IsMasterResource(request.ResourceType)) &&
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
await this.CaptureSessionTokenAndHandleSplitAsync(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
}

throw;
}

}
throw;
}
await this.CaptureSessionTokenAndHandleSplitAsync(response.StatusCode, response.SubStatusCode, request, response.Headers);
return response;
return response;
}

public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueTask<HttpRequestMessage>> requestMessage,
Expand Down Expand Up @@ -492,27 +535,64 @@ internal static bool IsStoredProcedureCrudOperation(
{
return resourceType == ResourceType.StoredProcedure &&
operationType != Documents.OperationType.ExecuteJavaScript;
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (this.gatewayStoreClient != null)
{
try
{
this.gatewayStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.gatewayStoreClient = null;
}
}
}

internal static bool IsOperationSupportedByThinClient(DocumentServiceRequest request)
{
return request.ResourceType == ResourceType.Document
&& (request.OperationType == OperationType.Batch
|| request.OperationType == OperationType.Patch
|| request.OperationType == OperationType.Create
|| request.OperationType == OperationType.Read
|| request.OperationType == OperationType.Upsert
|| request.OperationType == OperationType.Replace
|| request.OperationType == OperationType.Delete
|| request.OperationType == OperationType.Query);
}
private async Task<AccountProperties> GetDatabaseAccountPropertiesAsync()
Comment thread
aavasthy marked this conversation as resolved.
{
AccountProperties accountProperties = await this.endpointManager.GetDatabaseAccountAsync();
if (accountProperties != null)
{
return accountProperties;
}

throw new InvalidOperationException("Failed to retrieve AccountProperties. The response was null.");
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (this.thinClientStoreClient != null)
{
try
{
this.thinClientStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.thinClientStoreClient = null;
}
if (this.gatewayStoreClient != null)
{
try
{
this.gatewayStoreClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception.Message);
}

this.gatewayStoreClient = null;
}
}
}

internal Uri GetEntityUri(DocumentServiceRequest entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
{
// For multi master write accounts, since all the regions are treated as write regions, the next locations to fail over
// will be the preferred read regions that are configured in the application preferred regions in the CosmosClientOptions.
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.ReadEndpoints;

Expand All @@ -191,7 +191,7 @@ public override bool TryMarkEndpointUnavailableForPartitionKeyRange(
else if (this.IsRequestEligibleForPerPartitionAutomaticFailover(request))
{
// For any single master write accounts, the next locations to fail over will be the read regions configured at the account level.
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && ThinClientStoreModel.IsOperationSupportedByThinClient(request)
ReadOnlyCollection<Uri> nextLocations = this.isThinClientEnabled && GatewayStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ThinClientReadEndpoints
: this.globalEndpointManager.AccountReadEndpoints;

Expand Down
Loading