Skip to content
Merged
6 changes: 5 additions & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)

// Resolve the endpoint for the request and pin the resolution to the resolved endpoint
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);
this.locationEndpoint = ConfigurationManager.IsThinClientEnabled(defaultValue: false)
&& ThinClientStoreModel.IsOperationSupportedByThinClient(request)
? this.globalEndpointManager.ResolveThinClientEndpoint(request)
Comment thread
kundadebdatta marked this conversation as resolved.
: this.globalEndpointManager.ResolveServiceEndpoint(request);

request.RequestContext.RouteToLocation(this.locationEndpoint);
}

Expand Down
23 changes: 17 additions & 6 deletions Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ internal abstract class HttpTimeoutPolicy

public static HttpTimeoutPolicy GetTimeoutPolicy(
DocumentServiceRequest documentServiceRequest,
bool isPartitionLevelFailoverEnabled = false)
bool isPartitionLevelFailoverEnabled = false,
bool isThinClientEnabled = false)
{
//Query Plan Requests
if (documentServiceRequest.ResourceType == ResourceType.Document
Expand All @@ -43,12 +44,22 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Data Plane Read
if (!HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest)
//Data Plane Operations
if (!HttpTimeoutPolicy.IsMetaData(documentServiceRequest))
{
return isPartitionLevelFailoverEnabled
? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeout
: HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
if (isThinClientEnabled)
{
return documentServiceRequest.IsReadOnlyRequest
? HttpTimeoutPolicyForThinClient.InstanceShouldRetryAndThrow503OnTimeout
: HttpTimeoutPolicyForThinClient.InstanceShouldNotRetryAndThrow503OnTimeout;
}
// Data Plane Reads.
else if (documentServiceRequest.IsReadOnlyRequest)
{
return isPartitionLevelFailoverEnabled
? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeout
: HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
}
}

//Meta Data Read
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Net.Http;

internal sealed class HttpTimeoutPolicyForThinClient : HttpTimeoutPolicy
{
public bool shouldRetry;
public bool shouldThrow503OnTimeout;
private static readonly string Name = nameof(HttpTimeoutPolicyForThinClient);
public static readonly HttpTimeoutPolicy InstanceShouldRetryAndThrow503OnTimeout = new HttpTimeoutPolicyForThinClient(true, true);
public static readonly HttpTimeoutPolicy InstanceShouldNotRetryAndThrow503OnTimeout = new HttpTimeoutPolicyForThinClient(true, false);

private HttpTimeoutPolicyForThinClient(
bool shouldThrow503OnTimeout,
bool shouldRetry)
{
this.shouldThrow503OnTimeout = shouldThrow503OnTimeout;
this.shouldRetry = shouldRetry;
}

private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelays = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
{
(TimeSpan.FromSeconds(.5), TimeSpan.Zero),
(TimeSpan.FromSeconds(1), TimeSpan.Zero),
(TimeSpan.FromSeconds(5), TimeSpan.Zero),
Comment thread
kundadebdatta marked this conversation as resolved.
};

public override string TimeoutPolicyName => HttpTimeoutPolicyForThinClient.Name;

public override int TotalRetryCount => this.TimeoutsAndDelays.Count;

public override IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> GetTimeoutEnumerator()
{
return this.TimeoutsAndDelays.GetEnumerator();
}

public override bool IsSafeToRetry(HttpMethod httpMethod)
{
return this.shouldRetry;
}

public override bool ShouldRetryBasedOnResponse(HttpMethod requestHttpMethod, HttpResponseMessage responseMessage)
{
if (responseMessage == null)
{
return false;
}

if (responseMessage.StatusCode != System.Net.HttpStatusCode.RequestTimeout)
{
return false;
}

if (!this.IsSafeToRetry(requestHttpMethod))
{
return false;
}

return true;
}

public override bool ShouldThrow503OnTimeout => this.shouldThrow503OnTimeout;
}
}
4 changes: 4 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public GlobalEndpointManager(

public ReadOnlyCollection<Uri> WriteEndpoints => this.locationCache.WriteEndpoints;

public ReadOnlyCollection<Uri> ThinClientReadEndpoints => this.locationCache.ThinClientReadEndpoints;

public ReadOnlyCollection<Uri> ThinClientWriteEndpoints => this.locationCache.ThinClientWriteEndpoints;

public int PreferredLocationCount
{
get
Expand Down
4 changes: 4 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/IGlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ internal interface IGlobalEndpointManager : IDisposable

ReadOnlyCollection<Uri> WriteEndpoints { get; }

ReadOnlyCollection<Uri> ThinClientReadEndpoints { get; }

ReadOnlyCollection<Uri> ThinClientWriteEndpoints { get; }

int PreferredLocationCount { get; }

Uri ResolveServiceEndpoint(DocumentServiceRequest request);
Expand Down
45 changes: 43 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,42 @@ public ReadOnlyCollection<Uri> WriteEndpoints
return this.locationInfo.WriteEndpoints;
}
}

/// <summary>
/// Gets the list of thin client read endpoints.
/// </summary>
public ReadOnlyCollection<Uri> ThinClientReadEndpoints
{
get
{
// Hot-path: avoid ConcurrentDictionary methods which acquire locks
if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
&& this.locationUnavailablityInfoByEndpoint.Any())
{
this.UpdateLocationCache();
}

return this.locationInfo.ThinClientReadEndpoints;
}
}

/// <summary>
/// Gets the list of thin client write endpoints.
/// </summary>
public ReadOnlyCollection<Uri> ThinClientWriteEndpoints
{
get
{
// Hot-path: avoid ConcurrentDictionary methods which acquire locks
if (DateTime.UtcNow - this.lastCacheUpdateTimestamp > this.unavailableLocationsExpirationTime
&& this.locationUnavailablityInfoByEndpoint.Any())
{
this.UpdateLocationCache();
}

return this.locationInfo.ThinClientWriteEndpoints;
}
}

public ReadOnlyCollection<string> EffectivePreferredLocations => this.locationInfo.EffectivePreferredLocations;

Expand Down Expand Up @@ -216,7 +252,7 @@ public void OnLocationPreferenceChanged(ReadOnlyCollection<string> preferredLoca
preferenceList: preferredLocations);
}

public bool IsMetaData(DocumentServiceRequest request)
public static bool IsMetaData(DocumentServiceRequest request)
{
return (request.OperationType != Documents.OperationType.ExecuteJavaScript && request.ResourceType == ResourceType.StoredProcedure) ||
request.ResourceType != ResourceType.Document;
Expand All @@ -225,7 +261,7 @@ public bool IsMetaData(DocumentServiceRequest request)
public bool IsMultimasterMetadataWriteRequest(DocumentServiceRequest request)
{
return !request.IsReadOnlyRequest && this.locationInfo.AvailableWriteLocations.Count > 1
&& this.IsMetaData(request)
&& LocationCache.IsMetaData(request)
&& this.CanUseMultipleWriteLocations();

}
Expand Down Expand Up @@ -896,6 +932,11 @@ internal bool CanUseMultipleWriteLocations()

internal Uri ResolveThinClientEndpoint(DocumentServiceRequest request, bool isReadRequest)
{
if (request.RequestContext != null && request.RequestContext.LocationEndpointToRoute != null)
{
return request.RequestContext.LocationEndpointToRoute;
}

DatabaseAccountLocationsInfo snapshot = this.locationInfo;
ReadOnlyCollection<Uri> endpoints = isReadRequest
? snapshot.ThinClientReadEndpoints
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ThinClientStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private Task<HttpResponseMessage> InvokeClientAsync(
return base.httpClient.SendHttpAsync(
() => this.PrepareRequestForProxyAsync(request, physicalAddress, thinClientEndpoint, globalDatabaseAccountName, clientCollectionCache),
resourceType,
HttpTimeoutPolicy.GetTimeoutPolicy(request),
HttpTimeoutPolicy.GetTimeoutPolicy(request, isThinClientEnabled: true),
request.RequestContext.ClientRequestStatistics,
cancellationToken,
request);
Expand Down
20 changes: 20 additions & 0 deletions Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public override async Task<DocumentServiceResponse> ProcessMessageAsync(
DocumentServiceRequest request,
CancellationToken cancellationToken = default)
{
if (!ThinClientStoreModel.IsOperationSupportedByThinClient(request))
{
return await base.ProcessMessageAsync(request, cancellationToken);
}

await GatewayStoreModel.ApplySessionTokenAsync(
request,
base.defaultConsistencyLevel,
Expand Down Expand Up @@ -101,6 +106,21 @@ await this.CaptureSessionTokenAndHandleSplitAsync(
return response;
}

public static bool IsOperationSupportedByThinClient(
DocumentServiceRequest request)
{
// Thin proxy supports the following operations for Document resources.
return request.ResourceType == ResourceType.Document
&& (request.OperationType == OperationType.Batch
|| request.OperationType == OperationType.Patch
|| request.OperationType == OperationType.Create
|| request.OperationType == OperationType.Read
|| request.OperationType == OperationType.Upsert
|| request.OperationType == OperationType.Replace
|| request.OperationType == OperationType.Delete
|| request.OperationType == OperationType.Query);
}

private async Task<AccountProperties> GetDatabaseAccountPropertiesAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,115 @@ public void CreateHttpClientHandlerCreatesCorrectValueType()
Assert.IsFalse(clientHandler.ServerCertificateCustomValidationCallback.Invoke(new HttpRequestMessage(), x509Certificate2, x509Chain, sslPolicyErrors));
}

[TestMethod]
public async Task HttpTimeoutPolicyForThinClientOn503TestAsync()
{

async Task TestScenarioAsync(HttpMethod method, ResourceType resourceType, HttpTimeoutPolicy timeoutPolicy, Type expectedException, int expectedNumberOfRetrys)
{
int count = 0;
Task<HttpResponseMessage> sendFunc(HttpRequestMessage request, CancellationToken cancellationToken)
{
count++;

throw new OperationCanceledException("API with exception");

}

DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
using CosmosHttpClient cosmoshttpClient = MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler));

try
{
using (ITrace trace = Trace.GetRootTrace(nameof(NoRetryOnNoRetryPolicyTestAsync)))
{
HttpResponseMessage responseMessage1 = await cosmoshttpClient.SendHttpAsync(() =>
new ValueTask<HttpRequestMessage>(
result: new HttpRequestMessage(method, new Uri("http://localhost"))),
resourceType: resourceType,
timeoutPolicy: timeoutPolicy,
clientSideRequestStatistics: new ClientSideRequestStatisticsTraceDatum(DateTime.UtcNow, trace),
cancellationToken: default);
}
}
catch (Exception e)
{
Assert.AreEqual(expectedNumberOfRetrys, count, "Should retry 3 times for read methods, for writes should only be tried once");
Assert.AreEqual(e.GetType(), expectedException);

if (e.GetType() == typeof(CosmosException))
{
CosmosException cosmosException = (CosmosException)e;
Assert.AreEqual(cosmosException.StatusCode, System.Net.HttpStatusCode.ServiceUnavailable);
Assert.AreEqual((int)cosmosException.SubStatusCode, (int)SubStatusCodes.TransportGenerated503);

Assert.IsNotNull(cosmosException.Trace);
Assert.AreNotEqual(cosmosException.Trace, NoOpTrace.Singleton);
}
}

}

//Data plane read
await TestScenarioAsync(
method: HttpMethod.Get,
resourceType: ResourceType.Document,
timeoutPolicy: HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Document, OperationType.Read),
isPartitionLevelFailoverEnabled: false,
isThinClientEnabled: true),
expectedException: typeof(CosmosException),
expectedNumberOfRetrys: 3);

//Data plane query
await TestScenarioAsync(
method: HttpMethod.Get,
resourceType: ResourceType.Document,
timeoutPolicy: HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Document, OperationType.Query),
isPartitionLevelFailoverEnabled: false,
isThinClientEnabled: true),
expectedException: typeof(CosmosException),
expectedNumberOfRetrys: 3);

////Data plane write
await TestScenarioAsync(
method: HttpMethod.Post,
resourceType: ResourceType.Document,
timeoutPolicy: HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Document, OperationType.Create),
isPartitionLevelFailoverEnabled: false,
isThinClientEnabled: true),
expectedException: typeof(CosmosException),
expectedNumberOfRetrys: 1);

////Meta data read
await TestScenarioAsync(
method: HttpMethod.Get,
resourceType: ResourceType.Database,
timeoutPolicy: HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Database, OperationType.Read),
isPartitionLevelFailoverEnabled: false,
isThinClientEnabled: true),
expectedException: typeof(CosmosException),
expectedNumberOfRetrys: 3);
}

private static DocumentServiceRequest CreateDocumentServiceRequestByOperation(
ResourceType resourceType,
OperationType operationType)
{
string path = $"dbs/dummy_db_id/colls/dummy_ct_id";
return new DocumentServiceRequest(
operationType,
resourceType,
path,
body: null,
AuthorizationTokenType.PrimaryMasterKey,
headers: null);
}

private class MockMessageHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> sendFunc;
Expand Down
Loading
Loading