Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 160 additions & 17 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,8 @@ public void Dispose()

if (this.GlobalEndpointManager != null)
{
// Unsubscribe from PPAF config change events
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged -= this.HandleEnablePartitionLevelFailoverConfigChanged;
this.GlobalEndpointManager.Dispose();
this.GlobalEndpointManager = null;
}
Expand All @@ -1431,6 +1433,16 @@ public void Dispose()
this.initTaskCache = null;
}

if (this.accountServiceConfiguration != null)
{
this.accountServiceConfiguration = null;
}

if (this.PartitionKeyRangeLocation is IDisposable disposablePartitionManager)
{
disposablePartitionManager.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -6843,10 +6855,13 @@ private async Task InitializeGatewayConfigurationReaderAsync()

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
await this.accountServiceConfiguration.InitializeAsync();
AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties;
this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations;
this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties);

// Subscribe to GlobalEndpointManager PPAF config change events to update CosmosAccountServiceConfiguration
this.GlobalEndpointManager.OnEnablePartitionLevelFailoverConfigChanged += this.HandleEnablePartitionLevelFailoverConfigChanged;
}

internal string GetUserAgentFeatures()
Expand Down Expand Up @@ -6875,21 +6890,149 @@ internal string GetUserAgentFeatures()
return featureFlag == 0 ? string.Empty : $"F{featureFlag:X}";
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.ConnectionPolicy.EnablePartitionLevelFailover
&& this.ConnectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}

/// <summary>
/// Handles dynamic changes to the EnablePartitionLevelFailover flag from account properties refresh
/// </summary>
/// <param name="newEnablePartitionLevelFailover">The new value of the EnablePartitionLevelFailover flag</param>
private void HandleEnablePartitionLevelFailoverChanged(bool? newEnablePartitionLevelFailover)
{
try
{
// Only update if client-level override is not disabled
if (this.ConnectionPolicy.DisablePartitionLevelFailoverClientLevelOverride)
{
DefaultTrace.TraceInformation("DocumentClient: PPAF change ignored due to client-level override disabled");
return;
}

bool previousValue = this.ConnectionPolicy.EnablePartitionLevelFailover;
bool newValue = newEnablePartitionLevelFailover ?? false;

if (previousValue == newValue)
{
// No actual change in effective value
return;
}

DefaultTrace.TraceInformation(
"DocumentClient: Updating EnablePartitionLevelFailover from {0} to {1}",
previousValue,
newValue);

// Update the connection policy
this.ConnectionPolicy.EnablePartitionLevelFailover = newValue;

// Update circuit breaker enablement
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= newValue;

// Update availability strategy for read hedging
this.UpdateAvailabilityStrategyForPPAF(newValue);

// Update the GlobalPartitionEndpointManager
this.UpdateGlobalPartitionEndpointManager();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the user agent features here as well using this.ConnectionPolicy.UserAgentContainer.AppendFeatures

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the user agent features update using this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures()) when PPAF configuration changes dynamically. This ensures the user agent reflects the current PPAF state after dynamic updates. (commit: 49b3135)

DefaultTrace.TraceInformation("DocumentClient: Successfully updated PPAF configuration dynamically");
}
catch (Exception ex)
{
DefaultTrace.TraceError("DocumentClient: Error handling PPAF change: {0}", ex.Message);
}
}

/// <summary>
/// Updates the availability strategy based on PPAF enablement
/// </summary>
/// <param name="enablePPAF">Whether PPAF is enabled</param>
private void UpdateAvailabilityStrategyForPPAF(bool enablePPAF)
{
if (enablePPAF && this.ConnectionPolicy.AvailabilityStrategy == null)
{
// Enable default hedging when PPAF is enabled and no explicit strategy is set
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));

DefaultTrace.TraceInformation("DocumentClient: Enabled default hedging strategy for PPAF");
}
// Note: We don't disable hedging when PPAF is disabled, as the user might have set it explicitly
}

/// <summary>
/// Updates the GlobalPartitionEndpointManager based on current PPAF and circuit breaker settings
/// </summary>
private void UpdateGlobalPartitionEndpointManager()
{
// Create new GlobalPartitionEndpointManager instance with updated settings
GlobalPartitionEndpointManager newPartitionKeyRangeLocation =
this.ConnectionPolicy.EnablePartitionLevelFailover
|| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
this.ConnectionPolicy.EnablePartitionLevelFailover,
this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker,
this.isThinClientEnabled)
: GlobalPartitionEndpointManagerNoOp.Instance;

// Dispose the old instance if it's disposable
if (this.PartitionKeyRangeLocation is IDisposable disposableOldManager)
{
disposableOldManager.Dispose();
}

// Update the partition key range location
this.PartitionKeyRangeLocation = newPartitionKeyRangeLocation;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it cause any thread contention ? Can we use Interlocked.CompareExchange to alter the object ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed thread contention issue by using Interlocked.Exchange for atomic updates to PartitionKeyRangeLocation. Changed from auto-property to field with property accessor and used Interlocked.Exchange(ref this.partitionKeyRangeLocation, newPartitionKeyRangeLocation) to atomically update the reference while avoiding race conditions during concurrent access. (commit: ec9fd26)


// Update retry policy with new partition key range location
this.retryPolicy = new RetryPolicy(
globalEndpointManager: this.GlobalEndpointManager,
connectionPolicy: this.ConnectionPolicy,
partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation,
isThinClientEnabled: this.isThinClientEnabled);

this.ResetSessionTokenRetryPolicy = this.retryPolicy;

DefaultTrace.TraceInformation("DocumentClient: Updated GlobalPartitionEndpointManager for dynamic PPAF change");
}

/// <summary>
/// Handles PPAF config change events from GlobalEndpointManager
/// </summary>
/// <param name="accountProperties">The refreshed account properties</param>
private void HandleEnablePartitionLevelFailoverConfigChanged(AccountProperties accountProperties)
{
try
{
DefaultTrace.TraceInformation("DocumentClient: Received PPAF config change from GlobalEndpointManager");

// Handle the PPAF enablement change (comparison already done in GlobalEndpointManager)
bool? newEnablePartitionLevelFailover = accountProperties?.EnablePartitionLevelFailover;
this.HandleEnablePartitionLevelFailoverChanged(newEnablePartitionLevelFailover);
}
catch (Exception ex)
{
DefaultTrace.TraceError("DocumentClient: Error handling PPAF config change: {0}", ex.Message);
}
}

internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response)
Expand Down
26 changes: 24 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ internal class GlobalEndpointManager : IGlobalEndpointManager
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
private DateTime LastBackgroundRefreshUtc = DateTime.MinValue;
private bool? previousEnablePartitionLevelFailover = null;

/// <summary>
/// Event that is raised when PPAF (Per Partition Automatic Failover) enablement status changes
/// </summary>
internal event Action<AccountProperties>? OnEnablePartitionLevelFailoverConfigChanged;

public GlobalEndpointManager(
IDocumentClientInternal owner,
Expand Down Expand Up @@ -763,11 +769,27 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);

GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a change ? Please fix the code diff.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the code diff by reverting the file to original state and reapplying only the necessary changes. Removed the unnecessary whitespace-only change that was appearing in the diff. (commit: 526d0d8)


this.locationCache.OnDatabaseAccountRead(accountProperties);

// Check if PPAF enablement status has changed and raise event only when it changes
bool? currentEnablePartitionLevelFailover = accountProperties?.EnablePartitionLevelFailover;
if (this.previousEnablePartitionLevelFailover != currentEnablePartitionLevelFailover)
{
DefaultTrace.TraceInformation("GlobalEndpointManager: PPAF enablement status changed from {0} to {1}",
this.previousEnablePartitionLevelFailover, currentEnablePartitionLevelFailover);

this.previousEnablePartitionLevelFailover = currentEnablePartitionLevelFailover;

// Only invoke the event if accountProperties is not null
if (accountProperties != null)
{
this.OnEnablePartitionLevelFailoverConfigChanged?.Invoke(accountProperties);
}
}

}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class DynamicPpafTests
{
[TestMethod]
[Timeout(10000)]
public async Task CosmosAccountServiceConfiguration_ShouldInitializeCorrectly()
{
// Arrange
AccountProperties initialProperties = new AccountProperties()
{
Id = "testAccount",
EnablePartitionLevelFailover = false
};

Func<Task<AccountProperties>> mockAccountPropertiesFunc = () => Task.FromResult(initialProperties);

CosmosAccountServiceConfiguration config = new CosmosAccountServiceConfiguration(mockAccountPropertiesFunc);

// Act - Initialize with properties
await config.InitializeAsync();

// Assert
Assert.AreEqual(false, config.AccountProperties.EnablePartitionLevelFailover);
Assert.AreEqual("testAccount", config.AccountProperties.Id);
}
}
}
Loading