Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class CosmosClientOptions
private const string ConnectionStringDisableServerCertificateValidation = "DisableServerCertificateValidation";

private const ApiType DefaultApiType = ApiType.None;

/// <summary>
/// Default thresholds for PPAF request hedging.
/// </summary>
private const int DefaultHedgingThresholdInMilliseconds = 1000;
private const int DefaultHedgingThresholdStepInMilliseconds = 500;

/// <summary>
/// Default request timeout
Expand All @@ -75,7 +81,7 @@ public class CosmosClientOptions
private Func<HttpClient> httpClientFactory;
private string applicationName;
private IFaultInjector faultInjector;
private bool isCustomSerializerProvided;
private bool isCustomSerializerProvided;

/// <summary>
/// Creates a new CosmosClientOptions
Expand All @@ -90,7 +96,7 @@ public CosmosClientOptions()
this.ApiType = CosmosClientOptions.DefaultApiType;
this.CustomHandlers = new Collection<RequestHandler>();
this.CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions();
this.SessionRetryOptions = new SessionRetryOptions();
this.SessionRetryOptions = new SessionRetryOptions();
}

/// <summary>
Expand Down Expand Up @@ -760,10 +766,14 @@ bool EnableRemoteRegionPreferredForSessionRetry
get => this.SessionRetryOptions.RemoteRegionPreferred;
set => this.SessionRetryOptions.RemoteRegionPreferred = value;
}

/// <summary>
/// Enable partition key level failover
/// </summary>
/// Gets or sets a value indicating whether partition-level failover is enabled. When this feature is enabled,
/// the SDK by default applies a cross-region hedging strategy with a default threshold of 1 seconds.
/// If an availability strategy is provided explicitly, then it will be honored, and the default policy wouldn't be applied. Note that
/// the default availability strategy can be opted out by setting <see cref="DisabledAvailabilityStrategy"/> as the availability strategy in
/// cosmos client options.
/// </summary>
internal bool EnablePartitionLevelFailover { get; set; } = ConfigurationManager.IsPartitionLevelFailoverEnabled(defaultValue: false);

/// <summary>
Expand Down Expand Up @@ -1013,8 +1023,8 @@ internal CosmosClientOptions Clone()
internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
{
this.ValidateDirectTCPSettings();
this.ValidateLimitToEndpointSettings();
this.ValidatePartitionLevelFailoverSettings();
this.ValidateLimitToEndpointSettings();
this.InitializePartitionLevelFailoverWithDefaultHedging();

ConnectionPolicy connectionPolicy = new ConnectionPolicy()
{
Expand Down Expand Up @@ -1191,16 +1201,6 @@ private void ValidateLimitToEndpointSettings()
}
}

private void ValidatePartitionLevelFailoverSettings()
{
if (this.EnablePartitionLevelFailover
&& string.IsNullOrEmpty(this.ApplicationRegion)
&& (this.ApplicationPreferredRegions is null || this.ApplicationPreferredRegions.Count == 0))
{
throw new ArgumentException($"{nameof(this.ApplicationPreferredRegions)} or {nameof(this.ApplicationRegion)} is required when {nameof(this.EnablePartitionLevelFailover)} is enabled.");
}
}

private void ValidateDirectTCPSettings()
{
string settingName = string.Empty;
Expand Down Expand Up @@ -1262,6 +1262,21 @@ internal UserAgentContainer CreateUserAgentContainerWithFeatures(int clientId)
suffix: this.GetUserAgentSuffix());
}

internal void InitializePartitionLevelFailoverWithDefaultHedging()
{
if (this.EnablePartitionLevelFailover
&& this.AvailabilityStrategy == null)
{
// The default threshold is the minimum value of 1 second and a fraction (currently it's half) of
// the request timeout value provided by the end customer.
double defaultThresholdInMillis = Math.Min(CosmosClientOptions.DefaultHedgingThresholdInMilliseconds, this.RequestTimeout.TotalMilliseconds / 2);

this.AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(defaultThresholdInMillis),
thresholdStep: TimeSpan.FromMilliseconds(CosmosClientOptions.DefaultHedgingThresholdStepInMilliseconds));
}
}

internal string GetUserAgentSuffix()
{
int featureFlag = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ internal sealed class GlobalPartitionEndpointManagerCore : GlobalPartitionEndpoi
private readonly int partitionUnavailabilityDurationInSeconds = ConfigurationManager.GetAllowedPartitionUnavailabilityDurationInSeconds(5);

/// <summary>
/// A readonly integer containing the partition failback refresh interval in seconds. The default value is 60 seconds.
/// A readonly integer containing the partition failback refresh interval in seconds. The default value is 5 minutes.
/// </summary>
private readonly int backgroundConnectionInitTimeIntervalInSeconds = ConfigurationManager.GetStalePartitionUnavailabilityRefreshIntervalInSeconds(60);
private readonly int backgroundConnectionInitTimeIntervalInSeconds = ConfigurationManager.GetStalePartitionUnavailabilityRefreshIntervalInSeconds(300);
Comment thread
kundadebdatta marked this conversation as resolved.

/// <summary>
/// A readonly boolean flag used to determine if partition level failover is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,123 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountA
}
}

[TestMethod]
[TestCategory("MultiRegion")]
[Owner("dkunda")]
[Timeout(70000)]
public async Task ReadItemAsync_WithNoPreferredRegionsAndCircuitBreakerEnabledAndSingleMasterAccountAndServiceUnavailableReceived_ShouldApplyPartitionLevelOverride()
{
// Arrange.
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, "True");
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, "10");

// Enabling fault injection rule to simulate a 503 service unavailable scenario.
string serviceUnavailableRuleId = "503-rule-" + Guid.NewGuid().ToString();
FaultInjectionRule serviceUnavailableRule = new FaultInjectionRuleBuilder(
id: serviceUnavailableRuleId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadItem)
.WithRegion(region1)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
.WithDelay(TimeSpan.FromMilliseconds(10))
.Build())
.Build();

List<FaultInjectionRule> rules = new List<FaultInjectionRule> { serviceUnavailableRule };
FaultInjector faultInjector = new FaultInjector(rules);

CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConnectionMode = ConnectionMode.Direct,
ConsistencyLevel = ConsistencyLevel.Session,
FaultInjector = faultInjector,
RequestTimeout = TimeSpan.FromSeconds(5),
};

List<CosmosIntegrationTestObject> itemsList = new()
{
new() { Id = "smTestId1", Pk = "smpk1" },
};

try
{
using CosmosClient cosmosClient = new(connectionString: this.connectionString, clientOptions: cosmosClientOptions);
Database database = cosmosClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);

// Act and Assert.
await this.TryCreateItems(itemsList);

//Must Ensure the data is replicated to all regions
await Task.Delay(3000);

int consecutiveFailureCount = 10;
int totalIterations = 15;

for (int attemptCount = 1; attemptCount <= totalIterations; attemptCount++)
{
try
{
ItemResponse<CosmosIntegrationTestObject> readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
id: itemsList[0].Id,
partitionKey: new PartitionKey(itemsList[0].Pk));

IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
HashSet<string> contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));

Assert.AreEqual(
expected: HttpStatusCode.OK,
actual: readResponse.StatusCode);

Assert.IsNotNull(contactedRegions);

PartitionKeyRangeFailoverInfo failoverInfo = TestCommon.GetFailoverInfoForFirstPartitionUsingReflection(
globalPartitionEndpointManager: cosmosClient.ClientContext.DocumentClient.PartitionKeyRangeLocation,
isReadOnlyOrMultiMaster: true);

if (attemptCount > consecutiveFailureCount + 1)
{
Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent read request/s were successful on the next region.");
Assert.IsTrue(contactedRegions.Contains(region2));
Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current);
}
else
{
Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the read request succeeds before the consecutive failure count reaches the threshold, the partition didn't over to the next region, and the request was retried on the next region.");
Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2));

if (attemptCount > consecutiveFailureCount)
{
Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current);
}
else
{
Assert.AreEqual(this.readRegionsMapping[region1], failoverInfo.Current);
}
}
}
catch (CosmosException)
{
Assert.Fail("Read Item operation should succeed.");
}
catch (Exception ex)
{
Assert.Fail($"Unhandled Exception was thrown during ReadItemAsync call. Message: {ex.Message}");
}
}
}
finally
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, null);
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, null);

await this.TryDeleteItems(itemsList);
}
}

[TestMethod]
[Owner("dkunda")]
[TestCategory("MultiRegion")]
Expand Down Expand Up @@ -1120,6 +1237,105 @@ public async Task CreateAndReadItemAsync_WithCircuitBreakerEnabledAndMultiMaster
}
}

[TestMethod]
[Owner("dkunda")]
[TestCategory("MultiRegion")]
[Timeout(70000)]
[DataRow(true, DisplayName = "Test scenario when PPAF is enabled at client level.")]
[DataRow(false, DisplayName = "Test scenario when PPAF is disabled at client level.")]
public async Task ReadItemAsync_WithPPAFEnabledAndSingleMasterAccountWithResponseDelay_ShouldHedgeRequestToMultipleRegions(
bool enablePartitionLevelFailover)
{
// Arrange.
if (enablePartitionLevelFailover)
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelFailoverEnabled, "True");
}

// Enabling fault injection rule to simulate a 503 service unavailable scenario.
string serviceUnavailableRuleId = "503-rule-" + Guid.NewGuid().ToString();
FaultInjectionRule serviceUnavailableRule = new FaultInjectionRuleBuilder(
id: serviceUnavailableRuleId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadItem)
.WithRegion(region1)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
.WithDelay(TimeSpan.FromMilliseconds(3000))
.Build())
.Build();

List<FaultInjectionRule> rules = new List<FaultInjectionRule> { serviceUnavailableRule };
FaultInjector faultInjector = new FaultInjector(rules);

List<string> preferredRegions = new List<string> { region1, region2, region3 };
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConsistencyLevel = ConsistencyLevel.Session,
FaultInjector = faultInjector,
RequestTimeout = TimeSpan.FromSeconds(5),
ApplicationPreferredRegions = preferredRegions,
};

List<CosmosIntegrationTestObject> itemsList = new()
{
new() { Id = "smTestId1", Pk = "smpk1" },
};

try
{
using CosmosClient cosmosClient = new(connectionString: this.connectionString, clientOptions: cosmosClientOptions);
Database database = cosmosClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);

// Act and Assert.
await this.TryCreateItems(itemsList);

//Must Ensure the data is replicated to all regions
await Task.Delay(3000);

ItemResponse<CosmosIntegrationTestObject> readResponse = await container.ReadItemAsync<CosmosIntegrationTestObject>(
id: itemsList[0].Id,
partitionKey: new PartitionKey(itemsList[0].Pk));

IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = readResponse.Diagnostics.GetContactedRegions();
HashSet<string> contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));

Assert.AreEqual(
expected: HttpStatusCode.OK,
actual: readResponse.StatusCode);

CosmosTraceDiagnostics traceDiagnostic = readResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);

traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);

if (cosmosClientOptions.EnablePartitionLevelFailover)
{
Assert.IsNotNull(hedgeContext);
List<string> hedgedRegions = ((IEnumerable<string>)hedgeContext).ToList();

Assert.IsTrue(hedgedRegions.Count > 1, "Since the first region is not available, the request should atleast hedge to the next region.");
Assert.IsTrue(hedgedRegions.Contains(region1) && (hedgedRegions.Contains(region2) || hedgedRegions.Contains(region3)));
}
else
{
Assert.IsNull(hedgeContext);
}

Assert.IsNotNull(contactedRegions);
Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the read request succeeds on any region, given that there were no availability loss.");
}
finally
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelFailoverEnabled, null);

await this.TryDeleteItems(itemsList);
}
}

private async Task TryCreateItems(List<CosmosIntegrationTestObject> testItems)
{
foreach (CosmosIntegrationTestObject item in testItems)
Expand Down
Loading
Loading