Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e1abb97
dynamic enablement of ppaf
NaluTripician Jul 22, 2025
7c5d519
Update GlobalEndpointManager.cs
NaluTripician Jul 22, 2025
b8d975c
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Jul 22, 2025
09b964a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Jul 25, 2025
bd5be79
updated approach
NaluTripician Jul 31, 2025
f74afc2
fixes
NaluTripician Jul 31, 2025
506663a
requested changes
NaluTripician Jul 31, 2025
a82b471
Gateway changes
NaluTripician Jul 31, 2025
4783f36
Update GlobalPartitionEndpointManager.cs
NaluTripician Jul 31, 2025
53a2c34
Update GlobalPartitionEndpointManagerCore.cs
NaluTripician Jul 31, 2025
2329880
Update CosmosItemIntegrationTests.cs
NaluTripician Jul 31, 2025
c79a7bf
fixed build
NaluTripician Jul 31, 2025
8feaaa8
Code changes to refactor behavior and tests.
kundadebdatta Aug 1, 2025
acff813
Code changes to fix thin client test
kundadebdatta Aug 1, 2025
6472394
Code changes to refactor tests.
kundadebdatta Aug 2, 2025
9a0564a
requested changes
NaluTripician Aug 4, 2025
f300d39
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 5, 2025
b73c17a
Code changes to fix test failures.
kundadebdatta Aug 5, 2025
843c666
Code changes to refactor global partition endpoint manager.
kundadebdatta Aug 6, 2025
aac3ece
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
NaluTripician Aug 7, 2025
6e68326
Update Microsoft.Azure.Cosmos/src/UserAgentContainer.cs
NaluTripician Aug 7, 2025
18e7b67
Code changes to fix build
kundadebdatta Aug 7, 2025
bafa80a
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 7, 2025
6abd41c
Code changes to fix code diff in GatewayStoreModel.
kundadebdatta Aug 8, 2025
2ee4a1f
Merge branch 'master' into users/nalutripician/ppafDynamicEnable
kundadebdatta Aug 8, 2025
da3ff36
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
4a272f5
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
f6dde28
change regex to static
NaluTripician Aug 12, 2025
a352d77
Update UserAgentContainer.cs
NaluTripician Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
/// <summary>
/// Default thresholds for PPAF request hedging.
/// </summary>
private const int DefaultHedgingThresholdInMilliseconds = 1000;
private const int DefaultHedgingThresholdStepInMilliseconds = 500;
internal const int DefaultHedgingThresholdInMilliseconds = 1000;
internal const int DefaultHedgingThresholdStepInMilliseconds = 500;

private static readonly char[] resourceIdOrFullNameSeparators = new char[] { '/' };
private static readonly char[] resourceIdSeparators = new char[] { '/', '\\', '?', '#' };
Expand Down Expand Up @@ -6882,7 +6882,7 @@ internal void InitializePartitionLevelFailoverWithDefaultHedging()
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,26 @@ public static AvailabilityStrategy CrossRegionHedgingStrategy(
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge);
}

/// <summary>
/// After a request's duration passes a threshold, this strategy will send out
/// hedged request to other regions. The first hedge request will be sent after the threshold.
/// After that, the strategy will send out a request every thresholdStep
/// until the request is completed or regions are exausted
/// </summary>
/// <param name="threshold"> how long before SDK begins hedging</param>
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
/// <param name="enableMultiWriteRegionHedge">Whether hedging for write requests on accounts with multi-region writes are enabled
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations</param>
/// <returns>the cross region hedging availability</returns>
internal static AvailabilityStrategy SDKDefaultCrossRegionHedgingStrategy(
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge, isSDKDefaultStrategy: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// </summary>
public bool EnableMultiWriteRegionHedge { get; private set; }

/// <summary>
/// Intenal flag to indicate if this is the default strategy used by the SDK when ebabling
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
/// PPAF for clients without customer defined availability strategy.
/// </summary>
public bool IsSDKDefaultStrategy { get; private set; }
Comment thread
NaluTripician marked this conversation as resolved.
Outdated

private readonly string HedgeConfigText;

/// <summary>
Expand All @@ -52,10 +58,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// <param name="threshold"></param>
/// <param name="thresholdStep"></param>
/// <param name="enableMultiWriteRegionHedge"></param>
/// <param name="isSDKDefaultStrategy"></param>
public CrossRegionHedgingAvailabilityStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
bool enableMultiWriteRegionHedge = false,
bool isSDKDefaultStrategy = false)
{
if (threshold <= TimeSpan.Zero)
{
Expand All @@ -70,6 +78,7 @@ public CrossRegionHedgingAvailabilityStrategy(
this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
this.IsSDKDefaultStrategy = isSDKDefaultStrategy;

this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
}
Expand Down
47 changes: 44 additions & 3 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Core.Trace;
Comment thread
NaluTripician marked this conversation as resolved.
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -762,7 +762,14 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);

if (!this.connectionPolicy.DisablePartitionLevelFailoverClientLevelOverride
&& accountProperties.EnablePartitionLevelFailover.HasValue
&& (this.connectionPolicy.EnablePartitionLevelFailover != accountProperties.EnablePartitionLevelFailover.Value))
{
this.SetPPAFOnRefresh(accountProperties.EnablePartitionLevelFailover.Value);
}

GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);

Expand All @@ -782,7 +789,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
this.isAccountRefreshInProgress = false;
}
}
}
}

internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh = false)
{
#nullable disable // Needed because AsyncCache does not have nullable enabled
Expand All @@ -798,6 +806,39 @@ internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh
cancellationToken: this.cancellationTokenSource.Token,
forceRefresh: forceRefresh);
#nullable enable
}

private void SetPPAFOnRefresh(bool enablePartitionLevelFailover)
{
if (enablePartitionLevelFailover)
{
this.connectionPolicy.EnablePartitionLevelFailover = true;
Comment thread
NaluTripician marked this conversation as resolved.
Outdated
this.connectionPolicy.EnablePartitionLevelCircuitBreaker = true;
Comment thread
kundadebdatta marked this conversation as resolved.
Outdated

if (this.connectionPolicy.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(
DocumentClient.DefaultHedgingThresholdInMilliseconds,
this.connectionPolicy.RequestTimeout.TotalMilliseconds / 2);

this.connectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
}
}
else
{
this.connectionPolicy.EnablePartitionLevelFailover = false;
this.connectionPolicy.EnablePartitionLevelCircuitBreaker = false;

if (((CrossRegionHedgingAvailabilityStrategy)this.connectionPolicy.AvailabilityStrategy).IsSDKDefaultStrategy)
{
// If the user has not set a custom availability strategy, then we will reset it to null.
this.connectionPolicy.AvailabilityStrategy = null;
}
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,160 @@ public async Task ReadItemAsync_WithPPAFEnabledAndSingleMasterAccountWithRespons
}
}

[TestMethod]
[Owner("ntripician")]
[TestCategory("MultiRegion")]
[Timeout(70000)]
public async Task ReadItemAsync_WithPPAFDynamicOverride()
{
// Arrange.
// Enabling fault injection rule to simulate a 503 service unavailable scenario.
string serviceUnavailableRuleId = "503-rule-" + Guid.NewGuid().ToString();
FaultInjectionRule serviceUnavailableRule = new FaultInjectionRuleBuilder(
id: serviceUnavailableRuleId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadItem)
.WithRegion(region1)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
.WithDelay(TimeSpan.FromMilliseconds(3000))
.Build())
.Build();

List<FaultInjectionRule> rules = new List<FaultInjectionRule> { serviceUnavailableRule };
FaultInjector faultInjector = new FaultInjector(rules);

bool enablePPAF = true;

// Now that the ppaf enablement flag is returned from gateway, we need to intercept the response and remove the flag from the response, so that
// the environment variable set above is honored.
HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper()
{
ResponseIntercepter = async (response, request) =>
{
string json = await response?.Content?.ReadAsStringAsync();
if (json.Length > 0 && json.Contains("enablePerPartitionFailoverBehavior"))
{
if (enablePPAF)
{
JObject parsedDatabaseAccountResponse = JObject.Parse(json);
parsedDatabaseAccountResponse.Property("enablePerPartitionFailoverBehavior").Value = true.ToString();
Comment thread
NaluTripician marked this conversation as resolved.
Outdated

HttpResponseMessage interceptedResponse = new()
{
StatusCode = response.StatusCode,
Content = new StringContent(parsedDatabaseAccountResponse.ToString()),
Version = response.Version,
ReasonPhrase = response.ReasonPhrase,
RequestMessage = response.RequestMessage,
};

return interceptedResponse;
}
else
{
JObject parsedDatabaseAccountResponse = JObject.Parse(json);
parsedDatabaseAccountResponse.Property("enablePerPartitionFailoverBehavior").Value = false.ToString();
Comment thread
NaluTripician marked this conversation as resolved.
Outdated

HttpResponseMessage interceptedResponse = new()
{
StatusCode = response.StatusCode,
Content = new StringContent(parsedDatabaseAccountResponse.ToString()),
Version = response.Version,
ReasonPhrase = response.ReasonPhrase,
RequestMessage = response.RequestMessage,
};

return interceptedResponse;
}

}

return response;
},
Comment thread
NaluTripician marked this conversation as resolved.
};

List<string> preferredRegions = new List<string> { region1, region2, region3 };
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConsistencyLevel = ConsistencyLevel.Session,
FaultInjector = faultInjector,
RequestTimeout = TimeSpan.FromSeconds(5),
ApplicationPreferredRegions = preferredRegions,
HttpClientFactory = () => new HttpClient(httpClientHandlerHelper),
};

List<CosmosIntegrationTestObject> itemsList = new()
{
new() { Id = "smTestId1", Pk = "smpk1" },
};

try
{
using CosmosClient cosmosClient = new(connectionString: this.connectionString, clientOptions: cosmosClientOptions);
Database database = cosmosClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);

// Act and Assert.
await this.TryCreateItems(itemsList);

//Must Ensure the data is replicated to all regions
await Task.Delay(3000);

ItemResponse<CosmosIntegrationTestObject> readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
id: itemsList[0].Id,
partitionKey: new PartitionKey(itemsList[0].Pk));

IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
HashSet<string> contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));

Assert.AreEqual(
expected: HttpStatusCode.OK,
actual: readResponse.StatusCode);

CosmosTraceDiagnostics traceDiagnostic = readResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);

traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);

Assert.IsNotNull(hedgeContext);
List<string> hedgedRegions = ((IEnumerable<string>)hedgeContext).ToList();

Assert.IsTrue(hedgedRegions.Count > 1, "Since the first region is not available, the request should atleast hedge to the next region.");
Assert.IsTrue(hedgedRegions.Contains(region1) && (hedgedRegions.Contains(region2) || hedgedRegions.Contains(region3)));

enablePPAF = false;

//force database account refresh
await cosmosClient.DocumentClient.GlobalEndpointManager.RefreshLocationAsync(true);

readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
id: itemsList[0].Id,
partitionKey: new PartitionKey(itemsList[0].Pk));

contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));

Assert.AreEqual(
expected: HttpStatusCode.OK,
actual: readResponse.StatusCode);

traceDiagnostic = readResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);

traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContextNoPPAF);

Assert.IsNull(hedgeContextNoPPAF);
Assert.IsNull(cosmosClient.DocumentClient.ConnectionPolicy.AvailabilityStrategy);
}
finally
{
await this.TryDeleteItems(itemsList);
}
}

[TestMethod]
[Owner("nalutripician")]
[TestCategory("MultiRegion")]
Expand Down
Loading