-
Notifications
You must be signed in to change notification settings - Fork 533
[Per Partition Automatic Failover] Enable PPAF Dynamically Using Targeted Event-Based Updates with Thread-Safe Operations #5326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a7d02e0
45ae5f2
6941697
395c4bd
47dcb90
526d0d8
5dc92c8
9f329f8
e610a29
49b3135
ec9fd26
2cd6627
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1416,6 +1416,8 @@ public void Dispose() | |
|
|
||
| if (this.GlobalEndpointManager != null) | ||
| { | ||
| // Unsubscribe from account properties refresh events | ||
| this.GlobalEndpointManager.OnAccountPropertiesRefreshed -= this.HandleAccountPropertiesRefreshed; | ||
| this.GlobalEndpointManager.Dispose(); | ||
| this.GlobalEndpointManager = null; | ||
| } | ||
|
|
@@ -1431,6 +1433,16 @@ public void Dispose() | |
| this.initTaskCache = null; | ||
| } | ||
|
|
||
| if (this.accountServiceConfiguration != null) | ||
| { | ||
| this.accountServiceConfiguration = null; | ||
| } | ||
|
|
||
| if (this.PartitionKeyRangeLocation is IDisposable disposablePartitionManager) | ||
| { | ||
| disposablePartitionManager.Dispose(); | ||
| } | ||
|
|
||
| DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId); | ||
| DefaultTrace.Flush(); | ||
|
|
||
|
|
@@ -6843,10 +6855,16 @@ private async Task InitializeGatewayConfigurationReaderAsync() | |
|
|
||
| this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync); | ||
|
|
||
| await this.accountServiceConfiguration.InitializeAsync(); | ||
| AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties; | ||
| // Subscribe to account properties changes for dynamic PPAF updates | ||
| this.accountServiceConfiguration.OnEnablePartitionLevelFailoverChanged += this.HandleEnablePartitionLevelFailoverChanged; | ||
|
|
||
| await this.accountServiceConfiguration.InitializeAsync(); | ||
| AccountProperties accountProperties = this.accountServiceConfiguration.AccountProperties; | ||
| this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && accountProperties.EnableMultipleWriteLocations; | ||
| this.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(accountProperties); | ||
|
|
||
| // Subscribe to GlobalEndpointManager account properties refresh events to update CosmosAccountServiceConfiguration | ||
| this.GlobalEndpointManager.OnAccountPropertiesRefreshed += this.HandleAccountPropertiesRefreshed; | ||
| } | ||
|
|
||
| internal string GetUserAgentFeatures() | ||
|
|
@@ -6875,21 +6893,150 @@ internal string GetUserAgentFeatures() | |
| return featureFlag == 0 ? string.Empty : $"F{featureFlag:X}"; | ||
| } | ||
|
|
||
| internal void InitializePartitionLevelFailoverWithDefaultHedging() | ||
| { | ||
| if (this.ConnectionPolicy.EnablePartitionLevelFailover | ||
| && this.ConnectionPolicy.AvailabilityStrategy == null) | ||
| { | ||
| // The default threshold is the minimum value of 1 second and a fraction (currently it's half) of | ||
| // the request timeout value provided by the end customer. | ||
| double defaultThresholdInMillis = Math.Min( | ||
| DocumentClient.DefaultHedgingThresholdInMilliseconds, | ||
| this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2); | ||
|
|
||
| this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy( | ||
| threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis), | ||
| thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds)); | ||
| } | ||
| internal void InitializePartitionLevelFailoverWithDefaultHedging() | ||
| { | ||
| if (this.ConnectionPolicy.EnablePartitionLevelFailover | ||
| && this.ConnectionPolicy.AvailabilityStrategy == null) | ||
| { | ||
| // The default threshold is the minimum value of 1 second and a fraction (currently it's half) of | ||
| // the request timeout value provided by the end customer. | ||
| double defaultThresholdInMillis = Math.Min( | ||
| DocumentClient.DefaultHedgingThresholdInMilliseconds, | ||
| this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2); | ||
|
|
||
| this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy( | ||
| threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis), | ||
| thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds)); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Handles dynamic changes to the EnablePartitionLevelFailover flag from account properties refresh | ||
| /// </summary> | ||
| /// <param name="newEnablePartitionLevelFailover">The new value of the EnablePartitionLevelFailover flag</param> | ||
| private void HandleEnablePartitionLevelFailoverChanged(bool? newEnablePartitionLevelFailover) | ||
| { | ||
| try | ||
| { | ||
| // Only update if client-level override is not disabled | ||
| if (this.ConnectionPolicy.DisablePartitionLevelFailoverClientLevelOverride) | ||
| { | ||
| DefaultTrace.TraceInformation("DocumentClient: PPAF change ignored due to client-level override disabled"); | ||
| return; | ||
| } | ||
|
|
||
| bool previousValue = this.ConnectionPolicy.EnablePartitionLevelFailover; | ||
| bool newValue = newEnablePartitionLevelFailover ?? false; | ||
|
|
||
| if (previousValue == newValue) | ||
| { | ||
| // No actual change in effective value | ||
| return; | ||
| } | ||
|
|
||
| DefaultTrace.TraceInformation( | ||
| "DocumentClient: Updating EnablePartitionLevelFailover from {0} to {1}", | ||
| previousValue, | ||
| newValue); | ||
|
|
||
| // Update the connection policy | ||
| this.ConnectionPolicy.EnablePartitionLevelFailover = newValue; | ||
|
|
||
| // Update circuit breaker enablement | ||
| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker |= newValue; | ||
|
|
||
| // Update availability strategy for read hedging | ||
| this.UpdateAvailabilityStrategyForPPAF(newValue); | ||
|
|
||
| // Update the GlobalPartitionEndpointManager | ||
| this.UpdateGlobalPartitionEndpointManager(); | ||
|
|
||
| DefaultTrace.TraceInformation("DocumentClient: Successfully updated PPAF configuration dynamically"); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| DefaultTrace.TraceError("DocumentClient: Error handling PPAF change: {0}", ex.Message); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates the availability strategy based on PPAF enablement | ||
| /// </summary> | ||
| /// <param name="enablePPAF">Whether PPAF is enabled</param> | ||
| private void UpdateAvailabilityStrategyForPPAF(bool enablePPAF) | ||
| { | ||
| if (enablePPAF && this.ConnectionPolicy.AvailabilityStrategy == null) | ||
| { | ||
| // Enable default hedging when PPAF is enabled and no explicit strategy is set | ||
| double defaultThresholdInMillis = Math.Min( | ||
| DocumentClient.DefaultHedgingThresholdInMilliseconds, | ||
| this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2); | ||
|
|
||
| this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy( | ||
| threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis), | ||
| thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds)); | ||
|
|
||
| DefaultTrace.TraceInformation("DocumentClient: Enabled default hedging strategy for PPAF"); | ||
| } | ||
| // Note: We don't disable hedging when PPAF is disabled, as the user might have set it explicitly | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates the GlobalPartitionEndpointManager based on current PPAF and circuit breaker settings | ||
| /// </summary> | ||
| private void UpdateGlobalPartitionEndpointManager() | ||
| { | ||
| // Create new GlobalPartitionEndpointManager instance with updated settings | ||
| GlobalPartitionEndpointManager newPartitionKeyRangeLocation = | ||
| this.ConnectionPolicy.EnablePartitionLevelFailover | ||
| || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker | ||
| ? new GlobalPartitionEndpointManagerCore( | ||
| this.GlobalEndpointManager, | ||
| this.ConnectionPolicy.EnablePartitionLevelFailover, | ||
| this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker, | ||
| this.isThinClientEnabled) | ||
| : GlobalPartitionEndpointManagerNoOp.Instance; | ||
|
|
||
| // Dispose the old instance if it's disposable | ||
| if (this.PartitionKeyRangeLocation is IDisposable disposableOldManager) | ||
| { | ||
| disposableOldManager.Dispose(); | ||
| } | ||
|
|
||
| // Update the partition key range location | ||
| this.PartitionKeyRangeLocation = newPartitionKeyRangeLocation; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could it cause any thread contention ? Can we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed thread contention issue by using |
||
|
|
||
| // Update retry policy with new partition key range location | ||
| this.retryPolicy = new RetryPolicy( | ||
| globalEndpointManager: this.GlobalEndpointManager, | ||
| connectionPolicy: this.ConnectionPolicy, | ||
| partitionKeyRangeLocationCache: this.PartitionKeyRangeLocation, | ||
| isThinClientEnabled: this.isThinClientEnabled); | ||
|
|
||
| this.ResetSessionTokenRetryPolicy = this.retryPolicy; | ||
|
|
||
| DefaultTrace.TraceInformation("DocumentClient: Updated GlobalPartitionEndpointManager for dynamic PPAF change"); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Handles account properties refresh events from GlobalEndpointManager | ||
| /// Updates the CosmosAccountServiceConfiguration with new account properties | ||
| /// </summary> | ||
| /// <param name="accountProperties">The refreshed account properties</param> | ||
| private void HandleAccountPropertiesRefreshed(AccountProperties accountProperties) | ||
| { | ||
| try | ||
| { | ||
| DefaultTrace.TraceInformation("DocumentClient: Received account properties refresh from GlobalEndpointManager"); | ||
|
|
||
| // Update the CosmosAccountServiceConfiguration with the new account properties | ||
| // This will trigger the OnEnablePartitionLevelFailoverChanged event if PPAF value changed | ||
| this.accountServiceConfiguration?.UpdateAccountProperties(accountProperties); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| DefaultTrace.TraceError("DocumentClient: Error handling account properties refresh: {0}", ex.Message); | ||
| } | ||
| } | ||
|
|
||
| internal void CaptureSessionToken(DocumentServiceRequest request, DocumentServiceResponse response) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,11 @@ internal class CosmosAccountServiceConfiguration : IServiceConfigurationReader | |
|
|
||
| internal AccountProperties AccountProperties { get; private set; } | ||
|
|
||
| /// <summary> | ||
| /// Event that is raised when account properties are refreshed and PPAF enablement status changes | ||
| /// </summary> | ||
| internal event Action<bool?> OnEnablePartitionLevelFailoverChanged; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this event anymore ? Can we not leverage the event
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're absolutely right! I've removed the redundant Changes made:
This eliminates the intermediate event layer and provides a cleaner, more direct flow: The functionality remains the same but with reduced complexity and fewer event subscriptions. (commit: 395c4bd) |
||
|
|
||
| public CosmosAccountServiceConfiguration(Func<Task<AccountProperties>> accountPropertiesTaskFunc) | ||
| { | ||
| if (accountPropertiesTaskFunc == null) | ||
|
|
@@ -60,5 +65,29 @@ public async Task InitializeAsync() | |
| this.AccountProperties = await this.accountPropertiesTaskFunc(); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates the account properties and notifies listeners if EnablePartitionLevelFailover changes | ||
| /// This method is called by GlobalEndpointManager when account properties are refreshed | ||
| /// </summary> | ||
| internal void UpdateAccountProperties(AccountProperties newProperties) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not needed. Let's get this removed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the |
||
| { | ||
| if (newProperties == null) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| bool? previousEnablePartitionLevelFailover = this.AccountProperties?.EnablePartitionLevelFailover; | ||
| bool? newEnablePartitionLevelFailover = newProperties.EnablePartitionLevelFailover; | ||
|
|
||
| // Update the properties first | ||
| this.AccountProperties = newProperties; | ||
|
|
||
| // Check if PPAF enablement status has changed and notify listeners | ||
| if (previousEnablePartitionLevelFailover != newEnablePartitionLevelFailover) | ||
| { | ||
| this.OnEnablePartitionLevelFailoverChanged?.Invoke(newEnablePartitionLevelFailover); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,11 @@ internal class GlobalEndpointManager : IGlobalEndpointManager | |
| private bool isBackgroundAccountRefreshActive = false; | ||
| private DateTime LastBackgroundRefreshUtc = DateTime.MinValue; | ||
|
|
||
| /// <summary> | ||
| /// Event that is raised when account properties are refreshed and PPAF enablement status changes | ||
| /// </summary> | ||
| internal event Action<AccountProperties>? OnAccountPropertiesRefreshed; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's call this as Update the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've renamed the event to |
||
|
|
||
| public GlobalEndpointManager( | ||
| IDocumentClientInternal owner, | ||
| ConnectionPolicy connectionPolicy, | ||
|
|
@@ -763,17 +768,20 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) | |
| { | ||
| this.LastBackgroundRefreshUtc = DateTime.UtcNow; | ||
| AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true); | ||
| GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties); | ||
|
|
||
| GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really a change ? Please fix the code diff.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the code diff by reverting the file to original state and reapplying only the necessary changes. Removed the unnecessary whitespace-only change that was appearing in the diff. (commit: 526d0d8) |
||
|
|
||
| this.locationCache.OnDatabaseAccountRead(accountProperties); | ||
|
|
||
| } | ||
| catch (Exception ex) | ||
| { | ||
| DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", | ||
| ex.Message, | ||
| System.Diagnostics.Trace.CorrelationManager.ActivityId); | ||
| // Raise event to notify about account properties refresh | ||
| this.OnAccountPropertiesRefreshed?.Invoke(accountProperties); | ||
|
|
||
| } | ||
| catch (Exception ex) | ||
| { | ||
| DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'", | ||
| ex.Message, | ||
| System.Diagnostics.Trace.CorrelationManager.ActivityId); | ||
| } | ||
| finally | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,13 @@ internal static class ConfigurationManager | |
| /// <summary> | ||
| /// Environment variable name to enable thin client mode. | ||
| /// </summary> | ||
| internal static readonly string ThinClientModeEnabled = "AZURE_COSMOS_THIN_CLIENT_ENABLED"; | ||
| internal static readonly string ThinClientModeEnabled = "AZURE_COSMOS_THIN_CLIENT_ENABLED"; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are not diffs. There are no changes. Please fix these diffs and any other places applicable within the scope of this PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the spurious diffs in ConfigurationManager.cs and GlobalEndpointManager.cs by reverting to original state and reapplying only the necessary changes without whitespace modifications. (commit: 5dc92c8) |
||
|
|
||
| /// <summary> | ||
| /// A read-only string containing the environment variable name for capturing the account properties refresh interval | ||
| /// in seconds. The default value for this interval is 300 seconds (5 minutes). | ||
| /// </summary> | ||
| internal static readonly string AccountPropertiesRefreshIntervalInSeconds = "AZURE_COSMOS_ACCOUNT_PROPERTIES_REFRESH_INTERVAL_IN_SECONDS"; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the empty line that was left after removing the AccountPropertiesRefreshIntervalInSeconds constant. (commit: 526d0d8) |
||
|
|
||
| /// <summary> | ||
| /// Environment variable to override AAD scope. | ||
|
|
@@ -297,6 +303,22 @@ public static double GetCircuitBreakerTimeoutCounterResetWindowInMinutes( | |
| defaultValue: defaultValue); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the account properties refresh interval in seconds. | ||
| /// The user can set the respective environment variable 'AZURE_COSMOS_ACCOUNT_PROPERTIES_REFRESH_INTERVAL_IN_SECONDS' | ||
| /// to override the value. | ||
| /// </summary> | ||
| /// <param name="defaultValue">An integer containing the default value for the refresh interval in seconds.</param> | ||
| /// <returns>An integer representing the refresh interval in seconds.</returns> | ||
| public static int GetAccountPropertiesRefreshIntervalInSeconds( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no usage as we got rid of the redundant code for account refresh. Please remove this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're correct! I've removed the unused |
||
| int defaultValue) | ||
| { | ||
| return ConfigurationManager | ||
| .GetEnvironmentVariable( | ||
| variable: ConfigurationManager.AccountPropertiesRefreshIntervalInSeconds, | ||
| defaultValue: defaultValue); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the boolean value indicating whether optimistic direct execution is enabled based on the environment variable override. | ||
| /// </summary> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the user agent features here as well using
this.ConnectionPolicy.UserAgentContainer.AppendFeaturesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the user agent features update using
this.ConnectionPolicy.UserAgentContainer.AppendFeatures(this.GetUserAgentFeatures())when PPAF configuration changes dynamically. This ensures the user agent reflects the current PPAF state after dynamic updates. (commit: 49b3135)