Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e1abb97
dynamic enablement of ppaf
NaluTripician Jul 22, 2025
7c5d519
Update GlobalEndpointManager.cs
NaluTripician Jul 22, 2025
b8d975c
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Jul 22, 2025
09b964a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Jul 25, 2025
bd5be79
updated approach
NaluTripician Jul 31, 2025
f74afc2
fixes
NaluTripician Jul 31, 2025
506663a
requested changes
NaluTripician Jul 31, 2025
a82b471
Gateway changes
NaluTripician Jul 31, 2025
4783f36
Update GlobalPartitionEndpointManager.cs
NaluTripician Jul 31, 2025
53a2c34
Update GlobalPartitionEndpointManagerCore.cs
NaluTripician Jul 31, 2025
2329880
Update CosmosItemIntegrationTests.cs
NaluTripician Jul 31, 2025
c79a7bf
fixed build
NaluTripician Jul 31, 2025
8feaaa8
Code changes to refactor behavior and tests.
kundadebdatta Aug 1, 2025
acff813
Code changes to fix thin client test
kundadebdatta Aug 1, 2025
6472394
Code changes to refactor tests.
kundadebdatta Aug 2, 2025
9a0564a
requested changes
NaluTripician Aug 4, 2025
f300d39
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 5, 2025
b73c17a
Code changes to fix test failures.
kundadebdatta Aug 5, 2025
843c666
Code changes to refactor global partition endpoint manager.
kundadebdatta Aug 6, 2025
aac3ece
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Aug 7, 2025
6e68326
Update Microsoft.Azure.Cosmos/src/UserAgentContainer.cs
NaluTripician Aug 7, 2025
18e7b67
Code changes to fix build
kundadebdatta Aug 7, 2025
bafa80a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 7, 2025
6abd41c
Code changes to fix code diff in GatewayStoreModel.
kundadebdatta Aug 8, 2025
2ee4a1f
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 8, 2025
da3ff36
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
4a272f5
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
f6dde28
change regex to static
NaluTripician Aug 12, 2025
a352d77
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ internal sealed class ClientRetryPolicy : IDocumentClientRetryPolicy
private readonly IDocumentClientRetryPolicy throttlingRetry;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly bool enableEndpointDiscovery;
private readonly bool isPartitionLevelFailoverEnabled;
private readonly bool enableEndpointDiscovery;
private readonly bool isThinClientEnabled;
private int failoverRetryCount;

Expand All @@ -45,8 +44,7 @@ public ClientRetryPolicy(
GlobalEndpointManager globalEndpointManager,
GlobalPartitionEndpointManager partitionKeyRangeLocationCache,
RetryOptions retryOptions,
bool enableEndpointDiscovery,
bool isPartitionLevelFailoverEnabled,
bool enableEndpointDiscovery,
bool isThinClientEnabled)
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
Expand All @@ -60,8 +58,7 @@ public ClientRetryPolicy(
this.sessionTokenRetryCount = 0;
this.serviceUnavailableRetryCount = 0;
this.canUseMultipleWriteLocations = false;
this.isMultiMasterWriteRequest = false;
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.isMultiMasterWriteRequest = false;
this.isThinClientEnabled = isThinClientEnabled;
}

Expand Down Expand Up @@ -500,7 +497,7 @@ private ShouldRetryResult ShouldRetryOnUnavailableEndpointStatusCodes()

if (!this.canUseMultipleWriteLocations
&& !this.isReadRequest
&& !this.isPartitionLevelFailoverEnabled)
&& !this.partitionKeyRangeLocationCache.IsPartitionLevelAutomaticFailoverEnabled())
{
// Write requests on single master cannot be retried if partition level failover is disabled.
// This means there are no other regions available to serve the writes.
Expand Down
234 changes: 137 additions & 97 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs

Large diffs are not rendered by default.

16 changes: 11 additions & 5 deletions Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ namespace Microsoft.Azure.Cosmos

internal class GatewayStoreClient : TransportClient
{
private readonly bool isPartitionLevelFailoverEnabled;
private readonly ICommunicationEventSource eventSource;
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
protected readonly CosmosHttpClient httpClient;
protected readonly JsonSerializerSettings SerializerSettings;

Expand All @@ -32,13 +32,13 @@ internal class GatewayStoreClient : TransportClient
public GatewayStoreClient(
CosmosHttpClient httpClient,
ICommunicationEventSource eventSource,
JsonSerializerSettings serializerSettings = null,
bool isPartitionLevelFailoverEnabled = false)
GlobalPartitionEndpointManager globalPartitionEndpointManager,
JsonSerializerSettings serializerSettings = null)
{
this.httpClient = httpClient;
this.SerializerSettings = serializerSettings;
this.eventSource = eventSource;
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.globalPartitionEndpointManager = globalPartitionEndpointManager;
}

public async Task<DocumentServiceResponse> InvokeAsync(
Expand Down Expand Up @@ -386,10 +386,16 @@ private Task<HttpResponseMessage> InvokeClientAsync(
resourceType,
HttpTimeoutPolicy.GetTimeoutPolicy(
request,
this.isPartitionLevelFailoverEnabled),
this.IsPartitionLevelFailoverEnabled()),
request.RequestContext.ClientRequestStatistics,
cancellationToken,
request);
}

private bool IsPartitionLevelFailoverEnabled()
{
return this.globalPartitionEndpointManager.IsPartitionLevelCircuitBreakerEnabled()
|| this.globalPartitionEndpointManager.IsPartitionLevelAutomaticFailoverEnabled();
}
}
}
32 changes: 17 additions & 15 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
Expand All @@ -24,22 +23,21 @@ namespace Microsoft.Azure.Cosmos
// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
{
private readonly bool isPartitionLevelFailoverEnabled;
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
private readonly ISessionContainer sessionContainer;
private readonly DocumentClientEventSource eventSource;

internal readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
internal readonly ConsistencyLevel defaultConsistencyLevel;

// Store Clients to send requests to the gateway and/ or thin client endpoints.
private ThinClientStoreClient thinClientStoreClient;

private GatewayStoreClient gatewayStoreClient;

// Caches to resolve the PartitionKeyRange from request. For Session Token Optimization.
protected PartitionKeyRangeCache partitionKeyRangeCache;
protected ClientCollectionCache clientCollectionCache;
protected ISessionContainer sessionContainer;
private PartitionKeyRangeCache partitionKeyRangeCache;
private ClientCollectionCache clientCollectionCache;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
Expand All @@ -50,10 +48,8 @@ public GatewayStoreModel(
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isThinClientEnabled,
bool isPartitionLevelFailoverEnabled = false,
UserAgentContainer userAgentContainer = null)
{
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
Expand All @@ -62,16 +58,16 @@ public GatewayStoreModel(
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings,
isPartitionLevelFailoverEnabled);
globalPartitionEndpointManager,
serializerSettings);

if (isThinClientEnabled)
{
this.thinClientStoreClient = new ThinClientStoreClient(
httpClient,
userAgentContainer,
this.eventSource,
isPartitionLevelFailoverEnabled,
globalPartitionEndpointManager,
serializerSettings);
}

Expand Down Expand Up @@ -99,7 +95,7 @@ await GatewayStoreModel.ApplySessionTokenAsync(
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
if (this.IsPartitionLevelFailoverEnabled()
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
Expand Down Expand Up @@ -410,8 +406,14 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(

return new Tuple<bool, string>(false, null);
}

private bool IsPartitionLevelFailoverEnabled()
{
return this.globalPartitionEndpointManager.IsPartitionLevelCircuitBreakerEnabled()
|| this.globalPartitionEndpointManager.IsPartitionLevelAutomaticFailoverEnabled();
}

protected static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
Expand Down
3 changes: 1 addition & 2 deletions Microsoft.Azure.Cosmos/src/RetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public IDocumentClientRetryPolicy GetRequestPolicy()
this.globalEndpointManager,
this.partitionKeyRangeLocationCache,
this.retryOptions,
this.enableEndpointDiscovery,
this.isPartitionLevelFailoverEnabled,
this.enableEndpointDiscovery,
this.isThinClientEnabled);

return clientRetryPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,26 @@ public static AvailabilityStrategy CrossRegionHedgingStrategy(
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge);
}

/// <summary>
/// After a request's duration passes a threshold, this strategy will send out
/// hedged request to other regions. The first hedge request will be sent after the threshold.
/// After that, the strategy will send out a request every thresholdStep
/// until the request is completed or regions are exausted
/// </summary>
/// <param name="threshold"> how long before SDK begins hedging</param>
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
/// <param name="enableMultiWriteRegionHedge">Whether hedging for write requests on accounts with multi-region writes are enabled
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations</param>
/// <returns>the cross region hedging availability</returns>
internal static AvailabilityStrategy SDKDefaultCrossRegionHedgingStrategyForPPAF(
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge, isSDKDefaultStrategy: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// </summary>
public bool EnableMultiWriteRegionHedge { get; private set; }

/// <summary>
/// Internal flag to indicate if this is the default strategy used by the SDK when enabling
/// PPAF for clients without customer defined availability strategy.
/// </summary>
public bool IsSDKDefaultStrategyForPPAF { get; private set; }

private readonly string HedgeConfigText;

/// <summary>
Expand All @@ -52,10 +58,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// <param name="threshold"></param>
/// <param name="thresholdStep"></param>
/// <param name="enableMultiWriteRegionHedge"></param>
/// <param name="isSDKDefaultStrategy"></param>
public CrossRegionHedgingAvailabilityStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
bool enableMultiWriteRegionHedge = false,
bool isSDKDefaultStrategy = false)
{
if (threshold <= TimeSpan.Zero)
{
Expand All @@ -70,6 +78,7 @@ public CrossRegionHedgingAvailabilityStrategy(
this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
this.IsSDKDefaultStrategyForPPAF = isSDKDefaultStrategy;

this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
}
Expand Down
21 changes: 17 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Core.Trace;
Comment thread
NaluTripician marked this conversation as resolved.
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Linq;

Expand All @@ -41,7 +41,12 @@ internal class GlobalEndpointManager : IGlobalEndpointManager
private readonly object isAccountRefreshInProgressLock = new object();
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;

/// <summary>
/// Event that is raised when PPAF (Per Partition Automatic Failover) enablement status changes
/// </summary>
internal event Action<bool>? OnEnablePartitionLevelFailoverConfigChanged;

public GlobalEndpointManager(
IDocumentClientInternal owner,
Expand Down Expand Up @@ -762,7 +767,14 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);

if (!this.connectionPolicy.DisablePartitionLevelFailoverClientLevelOverride
&& accountProperties.EnablePartitionLevelFailover.HasValue
&& (this.connectionPolicy.EnablePartitionLevelFailover != accountProperties.EnablePartitionLevelFailover.Value))
{
this.OnEnablePartitionLevelFailoverConfigChanged?.Invoke(accountProperties.EnablePartitionLevelFailover.Value);
}

GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);

Expand All @@ -782,7 +794,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
this.isAccountRefreshInProgress = false;
}
}
}
}

internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh = false)
{
#nullable disable // Needed because AsyncCache does not have nullable enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,31 @@ public abstract bool IsRequestEligibleForPartitionLevelCircuitBreaker(
/// </summary>
public abstract void SetBackgroundConnectionPeriodicRefreshTask(
Func<Dictionary<PartitionKeyRange, Tuple<string, Uri, TransportAddressHealthState.HealthStatus>>, Task> backgroundConnectionInitTask);

/// <summary>
/// Enables or disables per-partition automatic failover (PPAF) in a thread-safe manner.
/// This method sets the internal flag controlling whether automatic failover is allowed for partition key ranges.
/// </summary>
/// <param name="isEnabled">A boolean flag indicating the value to set.</param>
public abstract void SetIsPPAFEnabled(bool isEnabled);

/// <summary>
/// Enables or disables per-partition circuit breaker (PPCB) in a thread-safe manner.
/// This method sets the internal flag controlling whether circuit breaker logic is allowed for partition key ranges.
/// </summary>
/// <param name="isEnabled">A boolean flag indicating the value to set.</param>
public abstract void SetIsPPCBEnabled(bool isEnabled);

/// <summary>
/// Gets a value indicating whether per-partition automatic failover is currently enabled.
/// Returns true if automatic failover for partition key ranges is active, otherwise false.
/// </summary>
public abstract bool IsPartitionLevelAutomaticFailoverEnabled();

/// <summary>
/// Gets a value indicating whether per-partition circuit breaker is currently enabled.
/// Returns true if circuit breaker logic for partition key ranges is active, otherwise false.
/// </summary>
public abstract bool IsPartitionLevelCircuitBreakerEnabled();
}
}
Loading
Loading