Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e1abb97
dynamic enablement of ppaf
NaluTripician Jul 22, 2025
7c5d519
Update GlobalEndpointManager.cs
NaluTripician Jul 22, 2025
b8d975c
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Jul 22, 2025
09b964a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Jul 25, 2025
bd5be79
updated approach
NaluTripician Jul 31, 2025
f74afc2
fixes
NaluTripician Jul 31, 2025
506663a
requested changes
NaluTripician Jul 31, 2025
a82b471
Gateway changes
NaluTripician Jul 31, 2025
4783f36
Update GlobalPartitionEndpointManager.cs
NaluTripician Jul 31, 2025
53a2c34
Update GlobalPartitionEndpointManagerCore.cs
NaluTripician Jul 31, 2025
2329880
Update CosmosItemIntegrationTests.cs
NaluTripician Jul 31, 2025
c79a7bf
fixed build
NaluTripician Jul 31, 2025
8feaaa8
Code changes to refactor behavior and tests.
kundadebdatta Aug 1, 2025
acff813
Code changes to fix thin client test
kundadebdatta Aug 1, 2025
6472394
Code changes to refactor tests.
kundadebdatta Aug 2, 2025
9a0564a
requested changes
NaluTripician Aug 4, 2025
f300d39
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 5, 2025
b73c17a
Code changes to fix test failures.
kundadebdatta Aug 5, 2025
843c666
Code changes to refactor global partition endpoint manager.
kundadebdatta Aug 6, 2025
aac3ece
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Aug 7, 2025
6e68326
Update Microsoft.Azure.Cosmos/src/UserAgentContainer.cs
NaluTripician Aug 7, 2025
18e7b67
Code changes to fix build
kundadebdatta Aug 7, 2025
bafa80a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 7, 2025
6abd41c
Code changes to fix code diff in GatewayStoreModel.
kundadebdatta Aug 8, 2025
2ee4a1f
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 8, 2025
da3ff36
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
4a272f5
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
f6dde28
change regex to static
NaluTripician Aug 12, 2025
a352d77
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed class ClientRetryPolicy : IDocumentClientRetryPolicy
private readonly IDocumentClientRetryPolicy throttlingRetry;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly bool enableEndpointDiscovery;
private readonly bool enableEndpointDiscovery;
private readonly bool isPartitionLevelFailoverEnabled;
private readonly bool isThinClientEnabled;
private int failoverRetryCount;
Expand Down
53 changes: 42 additions & 11 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
/// <summary>
/// Default thresholds for PPAF request hedging.
/// </summary>
private const int DefaultHedgingThresholdInMilliseconds = 1000;
private const int DefaultHedgingThresholdStepInMilliseconds = 500;
internal const int DefaultHedgingThresholdInMilliseconds = 1000;
internal const int DefaultHedgingThresholdStepInMilliseconds = 500;

private static readonly char[] resourceIdOrFullNameSeparators = new char[] { '/' };
private static readonly char[] resourceIdSeparators = new char[] { '/', '\\', '?', '#' };
Expand Down Expand Up @@ -1064,15 +1064,12 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());
this.InitializePartitionLevelFailoverWithDefaultHedging();

this.PartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.PartitionKeyRangeLocation =
new GlobalPartitionEndpointManagerCore(
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
: GlobalPartitionEndpointManagerNoOp.Instance;
this.isThinClientEnabled);

this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
Expand Down Expand Up @@ -1415,7 +1412,8 @@ public void Dispose()
}

if (this.GlobalEndpointManager != null)
{
{
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged -= this.SetPPAFOnRefresh;
this.GlobalEndpointManager.Dispose();
this.GlobalEndpointManager = null;
}
Expand Down Expand Up @@ -6846,7 +6844,8 @@ private async Task InitializeGatewayConfigurationReaderAsync()
await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations;
this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties);
this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties);
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged += this.SetPPAFOnRefresh;
}

internal string GetUserAgentFeatures()
Expand Down Expand Up @@ -6886,10 +6885,42 @@ internal void InitializePartitionLevelFailoverWithDefaultHedging()
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}

private void SetPPAFOnRefresh(bool isEnabled)
{
this.ConnectionPolicy.EnablePartitionLevelFailover = isEnabled;
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker = isEnabled;
this.PartitionKeyRangeLocation.SetIsPPAFEnabled(isEnabled);
this.PartitionKeyRangeLocation.SetIsPPCBEnabled(isEnabled);
this.retryPolicy.SetIsPartitionLevelFailoverEnabled(isEnabled);

this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());

if (isEnabled && this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
else
{
if (((CrossRegionHedgingAvailabilityStrategy)this.ConnectionPolicy.AvailabilityStrategy).IsSDKDefaultStrategy)
{
// If the user has not set a custom availability strategy, then we will reset it to null.
this.ConnectionPolicy.AvailabilityStrategy = null;
}
}
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
}

internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response)
Expand Down
12 changes: 9 additions & 3 deletions Microsoft.Azure.Cosmos/src/RetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ internal sealed class RetryPolicy : IRetryPolicyFactory
{
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly bool enableEndpointDiscovery;
private readonly bool isPartitionLevelFailoverEnabled;
private readonly bool enableEndpointDiscovery;
private readonly bool isThinClientEnabled;
private readonly RetryOptions retryOptions;
private readonly RetryOptions retryOptions;

private bool isPartitionLevelFailoverEnabled;

/// <summary>
/// Initialize the instance of the RetryPolicy class
Expand Down Expand Up @@ -48,6 +49,11 @@ public IDocumentClientRetryPolicy GetRequestPolicy()
this.isThinClientEnabled);

return clientRetryPolicy;
}

public void SetIsPartitionLevelFailoverEnabled(bool isEnabled)
{
this.isPartitionLevelFailoverEnabled = isEnabled;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,26 @@ public static AvailabilityStrategy CrossRegionHedgingStrategy(
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge);
}

/// <summary>
/// After a request's duration passes a threshold, this strategy will send out
/// hedged request to other regions. The first hedge request will be sent after the threshold.
/// After that, the strategy will send out a request every thresholdStep
/// until the request is completed or regions are exausted
/// </summary>
/// <param name="threshold"> how long before SDK begins hedging</param>
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
/// <param name="enableMultiWriteRegionHedge">Whether hedging for write requests on accounts with multi-region writes are enabled
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations</param>
/// <returns>the cross region hedging availability</returns>
internal static AvailabilityStrategy SDKDefaultCrossRegionHedgingStrategy(
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge, isSDKDefaultStrategy: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// </summary>
public bool EnableMultiWriteRegionHedge { get; private set; }

/// <summary>
/// Intenal flag to indicate if this is the default strategy used by the SDK when ebabling
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
/// PPAF for clients without customer defined availability strategy.
/// </summary>
public bool IsSDKDefaultStrategy { get; private set; }
Comment thread
NaluTripician marked this conversation as resolved.
Outdated

private readonly string HedgeConfigText;

/// <summary>
Expand All @@ -52,10 +58,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// <param name="threshold"></param>
/// <param name="thresholdStep"></param>
/// <param name="enableMultiWriteRegionHedge"></param>
/// <param name="isSDKDefaultStrategy"></param>
public CrossRegionHedgingAvailabilityStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
bool enableMultiWriteRegionHedge = false,
bool isSDKDefaultStrategy = false)
{
if (threshold <= TimeSpan.Zero)
{
Expand All @@ -70,6 +78,7 @@ public CrossRegionHedgingAvailabilityStrategy(
this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
this.IsSDKDefaultStrategy = isSDKDefaultStrategy;

this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
}
Expand Down
21 changes: 17 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Core.Trace;
Comment thread
NaluTripician marked this conversation as resolved.
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Linq;

Expand All @@ -41,7 +41,12 @@ internal class GlobalEndpointManager : IGlobalEndpointManager
private readonly object isAccountRefreshInProgressLock = new object();
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;

/// <summary>
/// Event that is raised when PPAF (Per Partition Automatic Failover) enablement status changes
/// </summary>
internal event Action<bool>? OnEnablePartitionLevelFailoverConfigChanged;

public GlobalEndpointManager(
IDocumentClientInternal owner,
Expand Down Expand Up @@ -762,7 +767,14 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);

if (!this.connectionPolicy.DisablePartitionLevelFailoverClientLevelOverride
&& accountProperties.EnablePartitionLevelFailover.HasValue
&& (this.connectionPolicy.EnablePartitionLevelFailover != accountProperties.EnablePartitionLevelFailover.Value))
{
this.OnEnablePartitionLevelFailoverConfigChanged?.Invoke(accountProperties.EnablePartitionLevelFailover.Value);
}

GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);

Expand All @@ -782,7 +794,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
this.isAccountRefreshInProgress = false;
}
}
}
}

internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh = false)
{
#nullable disable // Needed because AsyncCache does not have nullable enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,17 @@ public abstract bool IsRequestEligibleForPartitionLevelCircuitBreaker(
/// </summary>
public abstract void SetBackgroundConnectionPeriodicRefreshTask(
Func<Dictionary<PartitionKeyRange, Tuple<string, Uri, TransportAddressHealthState.HealthStatus>>, Task> backgroundConnectionInitTask);

/// <summary>
/// Sets whether per-partition automatic failover is enabled.
/// </summary>
/// <param name="isEnabled"></param>
public abstract void SetIsPPAFEnabled(bool isEnabled);

/// <summary>
/// Sets whether per-partition circuit breaker is enabled.
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
/// </summary>
/// <param name="isEnabled"></param>
public abstract void SetIsPPCBEnabled(bool isEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,11 @@ internal sealed class GlobalPartitionEndpointManagerCore : GlobalPartitionEndpoi
/// </summary>
private readonly int backgroundConnectionInitTimeIntervalInSeconds = ConfigurationManager.GetStalePartitionUnavailabilityRefreshIntervalInSeconds(300);

/// <summary>
/// A readonly boolean flag used to determine if partition level failover is enabled.
/// </summary>
private readonly bool isPartitionLevelFailoverEnabled;

/// <summary>
/// A readonly boolean flag used to determine if thinclient is enabled.
/// </summary>
private readonly bool isThinClientEnabled;

/// <summary>
/// A readonly boolean flag used to determine if partition level circuit breaker is enabled.
/// </summary>
private readonly bool isPartitionLevelCircuitBreakerEnabled;

/// <summary>
/// A <see cref="Lazy{T}"/> instance of <see cref="ConcurrentDictionary{K,V}"/> containing the partition key range to failover info mapping.
/// This mapping is primarily used for writes in a single master account.
Expand All @@ -85,7 +75,17 @@ internal sealed class GlobalPartitionEndpointManagerCore : GlobalPartitionEndpoi
/// <summary>
/// A boolean flag indicating if the background connection initialization recursive task is active.
/// </summary>
private bool isBackgroundConnectionInitActive = false;
private bool isBackgroundConnectionInitActive = false;

/// <summary>
/// A readonly boolean flag used to determine if partition level failover is enabled.
/// </summary>
private bool isPartitionLevelFailoverEnabled;

/// <summary>
/// A readonly boolean flag used to determine if partition level circuit breaker is enabled.
/// </summary>
private bool isPartitionLevelCircuitBreakerEnabled;

/// <summary>
/// A callback func delegate used by the background connection refresh recursive task to establish rntbd connections to backend replicas.
Expand Down Expand Up @@ -282,6 +282,16 @@ public override bool IsRequestEligibleForPartitionLevelCircuitBreaker(
|| (!request.IsReadOnlyRequest && this.globalEndpointManager.CanSupportMultipleWriteLocations(request.ResourceType, request.OperationType)));
}

public override void SetIsPPAFEnabled(bool isPPAFEnabled)
{
this.isPartitionLevelFailoverEnabled = isPPAFEnabled;
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
}

public override void SetIsPPCBEnabled(bool isPLCBEnabled)
{
this.isPartitionLevelCircuitBreakerEnabled = isPLCBEnabled;
}

/// <summary>
/// Disposes the <see cref="GlobalPartitionEndpointManagerCore"/> class.
/// Usage of the disposeCounter was used to make the operation atomic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,15 @@ public override bool IncrementRequestFailureCounterAndCheckIfPartitionCanFailove
{
return false;
}

public override void SetIsPPAFEnabled(bool isEnabled)
{
return;
}

public override void SetIsPPCBEnabled(bool isEnabled)
{
return;
}
}
}
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/UserAgentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal class UserAgentContainer : Documents.UserAgentContainer
private const int MaxClientId = 10;
private readonly string cosmosBaseUserAgent;
private readonly string clientId;
private readonly string nonFeatureSuffix;

public UserAgentContainer(
int clientId,
Expand All @@ -27,16 +28,17 @@ public UserAgentContainer(
features: features,
regionConfiguration: regionConfiguration);
this.Suffix = suffix ?? string.Empty;
this.nonFeatureSuffix = this.Suffix ?? string.Empty;
}

public void AppendFeatures(
string features)
{
if (!string.IsNullOrEmpty(features))
{
this.Suffix = string.IsNullOrEmpty(this.Suffix)
this.Suffix = string.IsNullOrEmpty(this.nonFeatureSuffix)
? features
: $"{features}|{this.Suffix}";
: $"{features}|{this.nonFeatureSuffix}";
}
}

Expand Down
Loading
Loading