Skip to content

Commit e1abb97

Browse files
committed
dynamic enablement of ppaf
1 parent 6eb976b commit e1abb97

5 files changed

Lines changed: 233 additions & 7 deletions

File tree

Microsoft.Azure.Cosmos/src/DocumentClient.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
116116
/// <summary>
117117
/// Default thresholds for PPAF request hedging.
118118
/// </summary>
119-
private const int DefaultHedgingThresholdInMilliseconds = 1000;
120-
private const int DefaultHedgingThresholdStepInMilliseconds = 500;
119+
internal const int DefaultHedgingThresholdInMilliseconds = 1000;
120+
internal const int DefaultHedgingThresholdStepInMilliseconds = 500;
121121

122122
private static readonly char[] resourceIdOrFullNameSeparators = new char[] { '/' };
123123
private static readonly char[] resourceIdSeparators = new char[] { '/', '\\', '?', '#' };
@@ -6882,7 +6882,7 @@ internal void InitializePartitionLevelFailoverWithDefaultHedging()
68826882
DocumentClient.DefaultHedgingThresholdInMilliseconds,
68836883
this.ConnectionPolicy.RequestTimeout.TotalMilliseconds / 2);
68846884

6885-
this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
6885+
this.ConnectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
68866886
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
68876887
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
68886888
}

Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategy.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,26 @@ public static AvailabilityStrategy CrossRegionHedgingStrategy(
4646
{
4747
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge);
4848
}
49+
50+
/// <summary>
51+
/// After a request's duration passes a threshold, this strategy will send out
52+
/// hedged request to other regions. The first hedge request will be sent after the threshold.
53+
/// After that, the strategy will send out a request every thresholdStep
54+
/// until the request is completed or regions are exausted
55+
/// </summary>
56+
/// <param name="threshold"> how long before SDK begins hedging</param>
57+
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
58+
/// <param name="enableMultiWriteRegionHedge">Whether hedging for write requests on accounts with multi-region writes are enabled
59+
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
60+
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
61+
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations</param>
62+
/// <returns>the cross region hedging availability</returns>
63+
internal static AvailabilityStrategy SDKDefaultCrossRegionHedgingStrategy(
64+
TimeSpan threshold,
65+
TimeSpan? thresholdStep,
66+
bool enableMultiWriteRegionHedge = false)
67+
{
68+
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge, isSDKDefaultStrategy: true);
69+
}
4970
}
5071
}

Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
4444
/// </summary>
4545
public bool EnableMultiWriteRegionHedge { get; private set; }
4646

47+
/// <summary>
48+
/// Intenal flag to indicate if this is the default strategy used by the SDK when ebabling
49+
/// PPAF for clients without customer defined availability strategy.
50+
/// </summary>
51+
public bool IsSDKDefaultStrategy { get; private set; }
52+
4753
private readonly string HedgeConfigText;
4854

4955
/// <summary>
@@ -52,10 +58,12 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
5258
/// <param name="threshold"></param>
5359
/// <param name="thresholdStep"></param>
5460
/// <param name="enableMultiWriteRegionHedge"></param>
61+
/// <param name="isSDKDefaultStrategy"></param>
5562
public CrossRegionHedgingAvailabilityStrategy(
5663
TimeSpan threshold,
5764
TimeSpan? thresholdStep,
58-
bool enableMultiWriteRegionHedge = false)
65+
bool enableMultiWriteRegionHedge = false,
66+
bool isSDKDefaultStrategy = false)
5967
{
6068
if (threshold <= TimeSpan.Zero)
6169
{
@@ -70,6 +78,7 @@ public CrossRegionHedgingAvailabilityStrategy(
7078
this.Threshold = threshold;
7179
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
7280
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
81+
this.IsSDKDefaultStrategy = isSDKDefaultStrategy;
7382

7483
this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
7584
}

Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ namespace Microsoft.Azure.Cosmos.Routing
1414
using System.Threading;
1515
using System.Threading.Tasks;
1616
using Microsoft.Azure.Cosmos.Common;
17-
using Microsoft.Azure.Cosmos.Core.Trace;
17+
using Microsoft.Azure.Cosmos.Core.Trace;
18+
using Microsoft.Azure.Cosmos.Linq;
1819
using Microsoft.Azure.Documents;
1920
using Newtonsoft.Json.Linq;
2021

@@ -762,7 +763,14 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
762763
try
763764
{
764765
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
765-
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
766+
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
767+
768+
if (!this.connectionPolicy.DisablePartitionLevelFailoverClientLevelOverride
769+
&& accountProperties.EnablePartitionLevelFailover.HasValue
770+
&& (this.connectionPolicy.EnablePartitionLevelFailover != accountProperties.EnablePartitionLevelFailover.Value))
771+
{
772+
this.SetPPAFOnRefresh(accountProperties.EnablePartitionLevelFailover.Value);
773+
}
766774

767775
GlobalEndpointManager.ParseThinClientLocationsFromAdditionalProperties(accountProperties);
768776

@@ -782,7 +790,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh)
782790
this.isAccountRefreshInProgress = false;
783791
}
784792
}
785-
}
793+
}
794+
786795
internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh = false)
787796
{
788797
#nullable disable // Needed because AsyncCache does not have nullable enabled
@@ -798,6 +807,39 @@ internal async Task<AccountProperties> GetDatabaseAccountAsync(bool forceRefresh
798807
cancellationToken: this.cancellationTokenSource.Token,
799808
forceRefresh: forceRefresh);
800809
#nullable enable
810+
}
811+
812+
private void SetPPAFOnRefresh(bool enablePartitionLevelFailover)
813+
{
814+
if (enablePartitionLevelFailover)
815+
{
816+
this.connectionPolicy.EnablePartitionLevelFailover = true;
817+
this.connectionPolicy.EnablePartitionLevelCircuitBreaker = true;
818+
819+
if (this.connectionPolicy.AvailabilityStrategy == null)
820+
{
821+
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
822+
// the request timeout value provided by the end customer.
823+
double defaultThresholdInMillis = Math.Min(
824+
DocumentClient.DefaultHedgingThresholdInMilliseconds,
825+
this.connectionPolicy.RequestTimeout.TotalMilliseconds / 2);
826+
827+
this.connectionPolicy.AvailabilityStrategy = AvailabilityStrategy.SDKDefaultCrossRegionHedgingStrategy(
828+
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
829+
thresholdStep: TimeSpan.FromMilliseconds(DocumentClient.DefaultHedgingThresholdStepInMilliseconds));
830+
}
831+
}
832+
else
833+
{
834+
this.connectionPolicy.EnablePartitionLevelFailover = false;
835+
this.connectionPolicy.EnablePartitionLevelCircuitBreaker = false;
836+
837+
if (((CrossRegionHedgingAvailabilityStrategy)this.connectionPolicy.AvailabilityStrategy).IsSDKDefaultStrategy)
838+
{
839+
// If the user has not set a custom availability strategy, then we will reset it to null.
840+
this.connectionPolicy.AvailabilityStrategy = null;
841+
}
842+
}
801843
}
802844

803845
/// <summary>

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,6 +1576,160 @@ public async Task ReadItemAsync_WithPPAFEnabledAndSingleMasterAccountWithRespons
15761576
}
15771577
}
15781578

1579+
[TestMethod]
1580+
[Owner("ntripician")]
1581+
[TestCategory("MultiRegion")]
1582+
[Timeout(70000)]
1583+
public async Task ReadItemAsync_WithPPAFDynamicOverride()
1584+
{
1585+
// Arrange.
1586+
// Enabling fault injection rule to simulate a 503 service unavailable scenario.
1587+
string serviceUnavailableRuleId = "503-rule-" + Guid.NewGuid().ToString();
1588+
FaultInjectionRule serviceUnavailableRule = new FaultInjectionRuleBuilder(
1589+
id: serviceUnavailableRuleId,
1590+
condition:
1591+
new FaultInjectionConditionBuilder()
1592+
.WithOperationType(FaultInjectionOperationType.ReadItem)
1593+
.WithRegion(region1)
1594+
.Build(),
1595+
result:
1596+
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
1597+
.WithDelay(TimeSpan.FromMilliseconds(3000))
1598+
.Build())
1599+
.Build();
1600+
1601+
List<FaultInjectionRule> rules = new List<FaultInjectionRule> { serviceUnavailableRule };
1602+
FaultInjector faultInjector = new FaultInjector(rules);
1603+
1604+
bool enablePPAF = true;
1605+
1606+
// Now that the ppaf enablement flag is returned from gateway, we need to intercept the response and remove the flag from the response, so that
1607+
// the environment variable set above is honored.
1608+
HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper()
1609+
{
1610+
ResponseIntercepter = async (response, request) =>
1611+
{
1612+
string json = await response?.Content?.ReadAsStringAsync();
1613+
if (json.Length > 0 && json.Contains("enablePerPartitionFailoverBehavior"))
1614+
{
1615+
if (enablePPAF)
1616+
{
1617+
JObject parsedDatabaseAccountResponse = JObject.Parse(json);
1618+
parsedDatabaseAccountResponse.Property("enablePerPartitionFailoverBehavior").Value = true.ToString();
1619+
1620+
HttpResponseMessage interceptedResponse = new()
1621+
{
1622+
StatusCode = response.StatusCode,
1623+
Content = new StringContent(parsedDatabaseAccountResponse.ToString()),
1624+
Version = response.Version,
1625+
ReasonPhrase = response.ReasonPhrase,
1626+
RequestMessage = response.RequestMessage,
1627+
};
1628+
1629+
return interceptedResponse;
1630+
}
1631+
else
1632+
{
1633+
JObject parsedDatabaseAccountResponse = JObject.Parse(json);
1634+
parsedDatabaseAccountResponse.Property("enablePerPartitionFailoverBehavior").Value = false.ToString();
1635+
1636+
HttpResponseMessage interceptedResponse = new()
1637+
{
1638+
StatusCode = response.StatusCode,
1639+
Content = new StringContent(parsedDatabaseAccountResponse.ToString()),
1640+
Version = response.Version,
1641+
ReasonPhrase = response.ReasonPhrase,
1642+
RequestMessage = response.RequestMessage,
1643+
};
1644+
1645+
return interceptedResponse;
1646+
}
1647+
1648+
}
1649+
1650+
return response;
1651+
},
1652+
};
1653+
1654+
List<string> preferredRegions = new List<string> { region1, region2, region3 };
1655+
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
1656+
{
1657+
ConsistencyLevel = ConsistencyLevel.Session,
1658+
FaultInjector = faultInjector,
1659+
RequestTimeout = TimeSpan.FromSeconds(5),
1660+
ApplicationPreferredRegions = preferredRegions,
1661+
HttpClientFactory = () => new HttpClient(httpClientHandlerHelper),
1662+
};
1663+
1664+
List<CosmosIntegrationTestObject> itemsList = new()
1665+
{
1666+
new() { Id = "smTestId1", Pk = "smpk1" },
1667+
};
1668+
1669+
try
1670+
{
1671+
using CosmosClient cosmosClient = new(connectionString: this.connectionString, clientOptions: cosmosClientOptions);
1672+
Database database = cosmosClient.GetDatabase(MultiRegionSetupHelpers.dbName);
1673+
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
1674+
1675+
// Act and Assert.
1676+
await this.TryCreateItems(itemsList);
1677+
1678+
//Must Ensure the data is replicated to all regions
1679+
await Task.Delay(3000);
1680+
1681+
ItemResponse<CosmosIntegrationTestObject> readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
1682+
id: itemsList[0].Id,
1683+
partitionKey: new PartitionKey(itemsList[0].Pk));
1684+
1685+
IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
1686+
HashSet<string> contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));
1687+
1688+
Assert.AreEqual(
1689+
expected: HttpStatusCode.OK,
1690+
actual: readResponse.StatusCode);
1691+
1692+
CosmosTraceDiagnostics traceDiagnostic = readResponse.Diagnostics as CosmosTraceDiagnostics;
1693+
Assert.IsNotNull(traceDiagnostic);
1694+
1695+
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
1696+
1697+
Assert.IsNotNull(hedgeContext);
1698+
List<string> hedgedRegions = ((IEnumerable<string>)hedgeContext).ToList();
1699+
1700+
Assert.IsTrue(hedgedRegions.Count > 1, "Since the first region is not available, the request should atleast hedge to the next region.");
1701+
Assert.IsTrue(hedgedRegions.Contains(region1) && (hedgedRegions.Contains(region2) || hedgedRegions.Contains(region3)));
1702+
1703+
enablePPAF = false;
1704+
1705+
//force database account refresh
1706+
await cosmosClient.DocumentClient.GlobalEndpointManager.RefreshLocationAsync(true);
1707+
1708+
readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
1709+
id: itemsList[0].Id,
1710+
partitionKey: new PartitionKey(itemsList[0].Pk));
1711+
1712+
contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
1713+
contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));
1714+
1715+
Assert.AreEqual(
1716+
expected: HttpStatusCode.OK,
1717+
actual: readResponse.StatusCode);
1718+
1719+
traceDiagnostic = readResponse.Diagnostics as CosmosTraceDiagnostics;
1720+
Assert.IsNotNull(traceDiagnostic);
1721+
1722+
traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContextNoPPAF);
1723+
1724+
Assert.IsNull(hedgeContextNoPPAF);
1725+
Assert.IsNull(cosmosClient.DocumentClient.ConnectionPolicy.AvailabilityStrategy);
1726+
}
1727+
finally
1728+
{
1729+
await this.TryDeleteItems(itemsList);
1730+
}
1731+
}
1732+
15791733
[TestMethod]
15801734
[Owner("nalutripician")]
15811735
[TestCategory("MultiRegion")]

0 commit comments

Comments
 (0)