Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 137 additions & 15 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,17 @@ public void Dispose()
this.initTaskCache = null;
}

if (this.accountServiceConfiguration != null)
{
this.accountServiceConfiguration.Dispose();
this.accountServiceConfiguration = null;
}

if (this.PartitionKeyRangeLocation is IDisposable disposablePartitionManager)
{
disposablePartitionManager.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -6843,6 +6854,9 @@ private async Task InitializeGatewayConfigurationReaderAsync()

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

// Subscribe to account properties changes for dynamic PPAF updates
this.accountServiceConfiguration.OnEnablePartitionLevelFailoverChanged += this.HandleEnablePartitionLevelFailoverChanged;

await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations;
Expand Down Expand Up @@ -6875,21 +6889,129 @@ internal string GetUserAgentFeatures()
return featureFlag == 0 ? string.Empty : $"F{featureFlag:X}";
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}

/// <summary>
/// Handles dynamic changes to the EnablePartitionLevelFailover flag from account properties refresh
/// </summary>
/// <param name="newEnablePartitionLevelFailover">The new value of the EnablePartitionLevelFailover flag</param>
private void HandleEnablePartitionLevelFailoverChanged(bool? newEnablePartitionLevelFailover)
{
try
{
// Only update if client-level override is not disabled
if (this.ConnectionPolicy.DisablePartitionLevelFailoverClientLevelOverride)
{
DefaultTrace.TraceInformation("DocumentClient: PPAF change ignored due to client-level override disabled");
return;
}

bool previousValue = this.ConnectionPolicy.EnablePartitionLevelFailover;
bool newValue = newEnablePartitionLevelFailover ?? false;

if (previousValue == newValue)
{
// No actual change in effective value
return;
}

DefaultTrace.TraceInformation(
"DocumentClient: Updating EnablePartitionLevelFailover from {0} to {1}",
previousValue,
newValue);

// Update the connection policy
this.ConnectionPolicy.EnablePartitionLevelFailover = newValue;

// Update circuit breaker enablement
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= newValue;

// Update availability strategy for read hedging
this.UpdateAvailabilityStrategyForPPAF(newValue);

// Update the GlobalPartitionEndpointManager
this.UpdateGlobalPartitionEndpointManager();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the user agent features here as well using this.ConnectionPolicy.UserAgentContainer.AppendFeatures

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the user agent features update using this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures()) when PPAF configuration changes dynamically. This ensures the user agent reflects the current PPAF state after dynamic updates. (commit: 49b3135)

DefaultTrace.TraceInformation("DocumentClient: Successfully updated PPAF configuration dynamically");
}
catch (Exception ex)
{
DefaultTrace.TraceError("DocumentClient: Error handling PPAF change: {0}", ex.Message);
}
}

/// <summary>
/// Updates the availability strategy based on PPAF enablement
/// </summary>
/// <param name="enablePPAF">Whether PPAF is enabled</param>
private void UpdateAvailabilityStrategyForPPAF(bool enablePPAF)
{
if (enablePPAF && this.ConnectionPolicy.AvailabilityStrategy == null)
{
// Enable default hedging when PPAF is enabled and no explicit strategy is set
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));

DefaultTrace.TraceInformation("DocumentClient: Enabled default hedging strategy for PPAF");
}
// Note: We don't disable hedging when PPAF is disabled, as the user might have set it explicitly
}

/// <summary>
/// Updates the GlobalPartitionEndpointManager based on current PPAF and circuit breaker settings
/// </summary>
private void UpdateGlobalPartitionEndpointManager()
{
// Create new GlobalPartitionEndpointManager instance with updated settings
GlobalPartitionEndpointManager newPartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
: GlobalPartitionEndpointManagerNoOp.Instance;

// Dispose the old instance if it's disposable
if (this.PartitionKeyRangeLocation is IDisposable disposableOldManager)
{
disposableOldManager.Dispose();
}

// Update the partition key range location
this.PartitionKeyRangeLocation = newPartitionKeyRangeLocation;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it cause any thread contention ? Can we use Interlocked.CompareExchange to alter the object ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed thread contention issue by using Interlocked.Exchange for atomic updates to PartitionKeyRangeLocation. Changed from auto-property to field with property accessor and used Interlocked.Exchange(ref this.partitionKeyRangeLocation, newPartitionKeyRangeLocation) to atomically update the reference while avoiding race conditions during concurrent access. (commit: ec9fd26)


// Update retry policy with new partition key range location
this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
connectionPolicy: this.ConnectionPolicy,
partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation,
isThinClientEnabled: this.isThinClientEnabled);

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

DefaultTrace.TraceInformation("DocumentClient: Updated GlobalPartitionEndpointManager for dynamic PPAF change");
}

internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,44 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;

internal class CosmosAccountServiceConfiguration : IServiceConfigurationReader
internal class CosmosAccountServiceConfiguration : IServiceConfigurationReader, IDisposable
{
private Func<Task<AccountProperties>> accountPropertiesTaskFunc { get; }
private readonly Func<Task<AccountProperties>> accountPropertiesTaskFunc;
private readonly int refreshIntervalInSeconds;
private readonly object refreshLock = new object();
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

private AccountProperties accountProperties;
private bool isBackgroundRefreshActive = false;
private Task backgroundRefreshTask;

internal AccountProperties AccountProperties
{
get
{
lock (this.refreshLock)
{
return this.accountProperties;
}
}
private set
{
lock (this.refreshLock)
{
this.accountProperties = value;
}
}
}

internal AccountProperties AccountProperties { get; private set; }
/// <summary>
/// Event that is raised when account properties are refreshed and PPAF enablement status changes
/// </summary>
internal event Action<bool?> OnEnablePartitionLevelFailoverChanged;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this event anymore ? Can we not leverage the event OnAccountPropertiesRefreshed from GlobalEndpointManager directly ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! I've removed the redundant OnEnablePartitionLevelFailoverChanged event and simplified the architecture to leverage the existing OnAccountPropertiesRefreshed event from GlobalEndpointManager directly.

Changes made:

  • Removed OnEnablePartitionLevelFailoverChanged event from CosmosAccountServiceConfiguration
  • Enhanced DocumentClient.HandleAccountPropertiesRefreshed() to directly compare previous and new PPAF values and handle changes
  • Simplified CosmosAccountServiceConfiguration.UpdateAccountProperties() to only update properties without event firing
  • Updated unit tests to reflect the new direct architecture

This eliminates the intermediate event layer and provides a cleaner, more direct flow:
GlobalEndpointManager.OnAccountPropertiesRefreshedDocumentClient.HandleAccountPropertiesRefreshed() → direct PPAF change handling

The functionality remains the same but with reduced complexity and fewer event subscriptions. (commit: 395c4bd)


public CosmosAccountServiceConfiguration(Func<Task<AccountProperties>> accountPropertiesTaskFunc)
{
Expand All @@ -23,6 +53,7 @@ public CosmosAccountServiceConfiguration(Func<Task<AccountProperties>> accountPr
}

this.accountPropertiesTaskFunc = accountPropertiesTaskFunc;
this.refreshIntervalInSeconds = ConfigurationManager.GetAccountPropertiesRefreshIntervalInSeconds(300); // Default 5 minutes
}

public IDictionary<string, object> QueryEngineConfiguration => this.AccountProperties.QueryEngineConfiguration;
Expand Down Expand Up @@ -58,6 +89,135 @@ public async Task InitializeAsync()
if (this.AccountProperties == null)
{
this.AccountProperties = await this.accountPropertiesTaskFunc();
this.InitializeBackgroundRefresh();
}
}

/// <summary>
/// Initializes and starts the background account properties refresh task
/// </summary>
private void InitializeBackgroundRefresh()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a background task in GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh.

Let's try to leverage that instead of creating a new one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! I've refactored the implementation to leverage the existing background task in GlobalEndpointManager.StartLocationBackgroundRefreshLoop() instead of creating a duplicate one.

Changes made:

  • Removed separate background refresh task from CosmosAccountServiceConfiguration
  • Enhanced existing GlobalEndpointManager.RefreshDatabaseAccountInternalAsync() to raise OnAccountPropertiesRefreshed events
  • Updated CosmosAccountServiceConfiguration to receive updates via UpdateAccountProperties() method
  • Modified DocumentClient to subscribe to GlobalEndpointManager events for PPAF changes

This approach is much cleaner as it:

  • ✅ Avoids duplicate background tasks
  • ✅ Reduces resource consumption
  • ✅ Leverages the existing, well-tested refresh mechanism
  • ✅ Keeps responsibilities clear

The functionality remains the same - dynamic PPAF updates when account properties change - but now uses a single, centralized background refresh task. (commit: 6941697)

{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}

if (this.isBackgroundRefreshActive)
{
return;
}

lock (this.refreshLock)
{
if (this.isBackgroundRefreshActive)
{
return;
}

this.isBackgroundRefreshActive = true;
}

try
{
this.backgroundRefreshTask = this.InitiateAccountPropertiesRefreshLoopAsync();
}
catch
{
this.isBackgroundRefreshActive = false;
throw;
}
}

/// <summary>
/// Runs a continuous loop with a delay to refresh the account properties periodically.
/// The loop will break when a cancellation is requested.
/// </summary>
#pragma warning disable VSTHRD100 // Avoid async void methods
private async Task InitiateAccountPropertiesRefreshLoopAsync()
#pragma warning restore VSTHRD100 // Avoid async void methods
{
while (!this.cancellationTokenSource.IsCancellationRequested)
{
try
{
await Task.Delay(
TimeSpan.FromSeconds(this.refreshIntervalInSeconds),
this.cancellationTokenSource.Token);

if (this.cancellationTokenSource.IsCancellationRequested)
{
break;
}

DefaultTrace.TraceInformation("CosmosAccountServiceConfiguration: Refreshing account properties.");
await this.RefreshAccountPropertiesAsync();
}
catch (Exception ex)
{
if (this.cancellationTokenSource.IsCancellationRequested && (ex is OperationCanceledException || ex is ObjectDisposedException))
{
break;
}

DefaultTrace.TraceCritical("CosmosAccountServiceConfiguration: Failed to refresh account properties. Exception: {0}", ex.Message);
}
}
}

/// <summary>
/// Refreshes the account properties and notifies listeners if EnablePartitionLevelFailover changes
/// </summary>
private async Task RefreshAccountPropertiesAsync()
{
try
{
bool? previousEnablePartitionLevelFailover = this.AccountProperties?.EnablePartitionLevelFailover;
AccountProperties newProperties = await this.accountPropertiesTaskFunc();

bool? newEnablePartitionLevelFailover = newProperties?.EnablePartitionLevelFailover;

// Check if PPAF enablement status has changed
if (previousEnablePartitionLevelFailover != newEnablePartitionLevelFailover)
{
DefaultTrace.TraceInformation(
"CosmosAccountServiceConfiguration: EnablePartitionLevelFailover changed from {0} to {1}",
previousEnablePartitionLevelFailover,
newEnablePartitionLevelFailover);

// Update the properties first
this.AccountProperties = newProperties;

// Notify listeners about the change
this.OnEnablePartitionLevelFailoverChanged?.Invoke(newEnablePartitionLevelFailover);
}
else
{
// Update properties even if PPAF status didn't change, as other properties might have changed
this.AccountProperties = newProperties;
}
}
catch (Exception ex)
{
DefaultTrace.TraceError("CosmosAccountServiceConfiguration: Error refreshing account properties: {0}", ex.Message);
}
}

/// <summary>
/// Disposes the background refresh task and cancellation token
/// </summary>
public void Dispose()
{
this.cancellationTokenSource?.Cancel();
this.cancellationTokenSource?.Dispose();

try
{
this.backgroundRefreshTask?.Wait(TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
DefaultTrace.TraceError("CosmosAccountServiceConfiguration: Error disposing background refresh task: {0}", ex.Message);
}
}
}
Expand Down
Loading