Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal sealed class ClientRetryPolicy : IDocumentClientRetryPolicy
private readonly GlobalEndpointManager globalEndpointManager;
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly bool enableEndpointDiscovery;
private readonly bool isPartitionLevelFailoverEnabled;
private readonly bool isThinClientEnabled;
private int failoverRetryCount;

Expand All @@ -46,7 +45,6 @@ public ClientRetryPolicy(
GlobalPartitionEndpointManager partitionKeyRangeLocationCache,
RetryOptions retryOptions,
bool enableEndpointDiscovery,
bool isPartitionLevelFailoverEnabled,
bool isThinClientEnabled)
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
Expand All @@ -61,7 +59,6 @@ public ClientRetryPolicy(
this.serviceUnavailableRetryCount = 0;
this.canUseMultipleWriteLocations = false;
this.isMultiMasterWriteRequest = false;
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.isThinClientEnabled = isThinClientEnabled;
}

Expand Down Expand Up @@ -500,7 +497,7 @@ private ShouldRetryResult ShouldRetryOnUnavailableEndpointStatusCodes()

if (!this.canUseMultipleWriteLocations
&& !this.isReadRequest
&& !this.isPartitionLevelFailoverEnabled)
&& !this.partitionKeyRangeLocationCache.IsPerPartitionAutomaticFailoverEnabled())
{
// Write requests on single master cannot be retried if partition level failover is disabled.
// This means there are no other regions available to serve the writes.
Expand Down
206 changes: 176 additions & 30 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,9 @@ internal virtual async Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsy

internal GlobalEndpointManager GlobalEndpointManager { get; private set; }

internal GlobalPartitionEndpointManager PartitionKeyRangeLocation { get; private set; }
private GlobalPartitionEndpointManager partitionKeyRangeLocation;

internal GlobalPartitionEndpointManager PartitionKeyRangeLocation => this.partitionKeyRangeLocation;

/// <summary>
/// Open the connection to validate that the client initialization is successful in the Azure Cosmos DB service.
Expand Down Expand Up @@ -1064,14 +1066,14 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());
this.InitializePartitionLevelFailoverWithDefaultHedging();

this.PartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
this.partitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
: GlobalPartitionEndpointManagerNoOp.Instance;

this.retryPolicy = new RetryPolicy(
Expand All @@ -1089,8 +1091,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.eventSource,
this.serializerSettings,
this.httpClient,
this.PartitionKeyRangeLocation,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);
this.PartitionKeyRangeLocation);

this.GatewayStoreModel = gatewayStoreModel;

Expand Down Expand Up @@ -1118,8 +1119,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
this.eventSource,
this.serializerSettings,
this.httpClient,
this.ConnectionPolicy.UserAgentContainer,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);
this.ConnectionPolicy.UserAgentContainer);

thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

Expand Down Expand Up @@ -1416,6 +1416,8 @@ public void Dispose()

if (this.GlobalEndpointManager != null)
{
// Unsubscribe from PPAF config change events
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged -= this.HandleEnablePartitionLevelFailoverConfigChanged;
this.GlobalEndpointManager.Dispose();
this.GlobalEndpointManager = null;
}
Expand All @@ -1431,6 +1433,16 @@ public void Dispose()
this.initTaskCache = null;
}

if (this.accountServiceConfiguration != null)
{
this.accountServiceConfiguration = null;
}

if (this.PartitionKeyRangeLocation is IDisposable disposablePartitionManager)
{
disposablePartitionManager.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -6843,10 +6855,13 @@ private async Task InitializeGatewayConfigurationReaderAsync()

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations;
this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties);

// Subscribe to GlobalEndpointManager PPAF config change events to update CosmosAccountServiceConfiguration
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged += this.HandleEnablePartitionLevelFailoverConfigChanged;
}

internal string GetUserAgentFeatures()
Expand Down Expand Up @@ -6875,21 +6890,152 @@ internal string GetUserAgentFeatures()
return featureFlag == 0 ? string.Empty : $"F{featureFlag:X}";
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}

/// <summary>
/// Handles dynamic changes to the EnablePartitionLevelFailover flag from account properties refresh
/// </summary>
/// <param name="newEnablePartitionLevelFailover">The new value of the EnablePartitionLevelFailover flag</param>
private void HandleEnablePartitionLevelFailoverChanged(bool? newEnablePartitionLevelFailover)
{
try
{
// Only update if client-level override is not disabled
if (this.ConnectionPolicy.DisablePartitionLevelFailoverClientLevelOverride)
{
DefaultTrace.TraceInformation("DocumentClient: PPAF change ignored due to client-level override disabled");
return;
}

bool previousValue = this.ConnectionPolicy.EnablePartitionLevelFailover;
bool newValue = newEnablePartitionLevelFailover ?? false;

if (previousValue == newValue)
{
// No actual change in effective value
return;
}

DefaultTrace.TraceInformation(
"DocumentClient: Updating EnablePartitionLevelFailover from {0} to {1}",
previousValue,
newValue);

// Update the connection policy
this.ConnectionPolicy.EnablePartitionLevelFailover = newValue;

// Update circuit breaker enablement
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= newValue;

// Update availability strategy for read hedging
this.UpdateAvailabilityStrategyForPPAF(newValue);

// Update the GlobalPartitionEndpointManager
this.UpdateGlobalPartitionEndpointManager();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the user agent features here as well using this.ConnectionPolicy.UserAgentContainer.AppendFeatures

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the user agent features update using this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures()) when PPAF configuration changes dynamically. This ensures the user agent reflects the current PPAF state after dynamic updates. (commit: 49b3135)

// Update user agent features to reflect the new PPAF configuration
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures());

DefaultTrace.TraceInformation("DocumentClient: Successfully updated PPAF configuration dynamically");
}
catch (Exception ex)
{
DefaultTrace.TraceError("DocumentClient: Error handling PPAF change: {0}", ex.Message);
}
}

/// <summary>
/// Updates the availability strategy based on PPAF enablement
/// </summary>
/// <param name="enablePPAF">Whether PPAF is enabled</param>
private void UpdateAvailabilityStrategyForPPAF(bool enablePPAF)
{
if (enablePPAF && this.ConnectionPolicy.AvailabilityStrategy == null)
{
// Enable default hedging when PPAF is enabled and no explicit strategy is set
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));

DefaultTrace.TraceInformation("DocumentClient: Enabled default hedging strategy for PPAF");
}
// Note: We don't disable hedging when PPAF is disabled, as the user might have set it explicitly
}

/// <summary>
/// Updates the GlobalPartitionEndpointManager based on current PPAF and circuit breaker settings
/// </summary>
private void UpdateGlobalPartitionEndpointManager()
{
// Create new GlobalPartitionEndpointManager instance with updated settings
GlobalPartitionEndpointManager newPartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
: GlobalPartitionEndpointManagerNoOp.Instance;

// Atomically update the partition key range location to avoid thread contention
GlobalPartitionEndpointManager oldPartitionKeyRangeLocation = Interlocked.Exchange(ref this.partitionKeyRangeLocation, newPartitionKeyRangeLocation);

// Dispose the old instance if it's disposable
if (oldPartitionKeyRangeLocation is IDisposable disposableOldManager)
{
disposableOldManager.Dispose();
}

// Update retry policy with new partition key range location
this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
connectionPolicy: this.ConnectionPolicy,
partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation,
isThinClientEnabled: this.isThinClientEnabled);

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

DefaultTrace.TraceInformation("DocumentClient: Updated GlobalPartitionEndpointManager for dynamic PPAF change");
}

/// <summary>
/// Handles PPAF config change events from GlobalEndpointManager
/// </summary>
/// <param name="accountProperties">The refreshed account properties</param>
private void HandleEnablePartitionLevelFailoverConfigChanged(AccountProperties accountProperties)
{
try
{
DefaultTrace.TraceInformation("DocumentClient: Received PPAF config change from GlobalEndpointManager");

// Handle the PPAF enablement change (comparison already done in GlobalEndpointManager)
bool? newEnablePartitionLevelFailover = accountProperties?.EnablePartitionLevelFailover;
this.HandleEnablePartitionLevelFailoverChanged(newEnablePartitionLevelFailover);
}
catch (Exception ex)
{
DefaultTrace.TraceError("DocumentClient: Error handling PPAF config change: {0}", ex.Message);
}
}

internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response)
Expand Down
9 changes: 3 additions & 6 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace Microsoft.Azure.Cosmos
// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
{
private readonly bool isPartitionLevelFailoverEnabled;
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;

Expand All @@ -46,10 +45,8 @@ public GatewayStoreModel(
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isPartitionLevelFailoverEnabled = false)
GlobalPartitionEndpointManager globalPartitionEndpointManager)
{
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
Expand All @@ -59,7 +56,7 @@ public GatewayStoreModel(
httpClient,
this.eventSource,
serializerSettings,
isPartitionLevelFailoverEnabled);
globalPartitionEndpointManager.IsPerPartitionAutomaticFailoverEnabled());

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
this.MarkEndpointsToHealthyAsync);
Expand All @@ -85,7 +82,7 @@ await GatewayStoreModel.ApplySessionTokenAsync(
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
if (this.globalPartitionEndpointManager.IsPerPartitionAutomaticFailoverEnabled()
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
Expand Down
3 changes: 0 additions & 3 deletions Microsoft.Azure.Cosmos/src/RetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ internal sealed class RetryPolicy : IRetryPolicyFactory
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly bool enableEndpointDiscovery;
private readonly bool isPartitionLevelFailoverEnabled;
private readonly bool isThinClientEnabled;
private readonly RetryOptions retryOptions;

Expand All @@ -27,7 +26,6 @@ public RetryPolicy(
bool isThinClientEnabled)
{
this.enableEndpointDiscovery = connectionPolicy.EnableEndpointDiscovery;
this.isPartitionLevelFailoverEnabled = connectionPolicy.EnablePartitionLevelFailover;
this.globalEndpointManager = globalEndpointManager;
this.retryOptions = connectionPolicy.RetryOptions;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand All @@ -44,7 +42,6 @@ public IDocumentClientRetryPolicy GetRequestPolicy()
this.partitionKeyRangeLocationCache,
this.retryOptions,
this.enableEndpointDiscovery,
this.isPartitionLevelFailoverEnabled,
this.isThinClientEnabled);

return clientRetryPolicy;
Expand Down
Loading
Loading