Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ae0aa5b
Adds retries in DTS when isRetriable is true and on timeout
Meghana-Palaparthi Mar 10, 2026
2eb6832
Adds tests on DTS retries
Meghana-Palaparthi Mar 11, 2026
3d69ceb
Remove server diagnostics from response deserialization
Meghana-Palaparthi Mar 11, 2026
bc5e52c
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Mar 16, 2026
d547817
Change to unbounded retries
Meghana-Palaparthi Mar 16, 2026
409b3e6
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Mar 17, 2026
a84a6a8
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Mar 19, 2026
414afb4
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Mar 24, 2026
ec997d1
address feedback on PR
Meghana-Palaparthi Mar 24, 2026
72c47cf
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 1, 2026
9f27546
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 2, 2026
ac7a3f6
Add retries of different scenarios
Meghana-Palaparthi Apr 7, 2026
b05917c
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 7, 2026
db03eb7
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 14, 2026
ac1e50c
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 14, 2026
0d67585
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 17, 2026
390b7d2
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 20, 2026
be2adfc
Refactor DTX retry policy into ClientRetryPolicy
Meghana-Palaparthi Apr 21, 2026
4605fec
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 21, 2026
27448e4
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 23, 2026
b9afa85
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 23, 2026
f1aba1a
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 27, 2026
98677f9
Merge branch 'master' into users/Meghana-Palaparthi/DTS_timeout_handling
Meghana-Palaparthi Apr 29, 2026
a971dfc
Update ClientRetryPolicyTests.cs
Meghana-Palaparthi Apr 29, 2026
713f673
Merge branch 'main' into users/Meghana-Palaparthi/DTS_timeout_handling
NaluTripician May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal sealed class ClientRetryPolicy : IDocumentClientRetryPolicy
private const int RetryIntervalInMS = 1000; // Once we detect failover wait for 1 second before retrying request.
private const int MaxRetryCount = 120;
private const int MaxServiceUnavailableRetryCount = 1;
private const int MaxDtxRetryCount = 100; // DTX commits carry an idempotency token, making them safe to retry regardless of account topology

private readonly IDocumentClientRetryPolicy throttlingRetry;
private readonly GlobalEndpointManager globalEndpointManager;
Expand All @@ -33,6 +34,7 @@ internal sealed class ClientRetryPolicy : IDocumentClientRetryPolicy

private int sessionTokenRetryCount;
private int serviceUnavailableRetryCount;
private int distributedTransactionRetryCount;
private bool isReadRequest;
private bool canUseMultipleWriteLocations;
private bool isMultiMasterWriteRequest;
Expand Down Expand Up @@ -117,7 +119,8 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(

ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
clientException?.StatusCode,
clientException?.GetSubStatus());
clientException?.GetSubStatus(),
clientException?.RetryAfter);
if (shouldRetryResult != null)
{
return shouldRetryResult;
Expand All @@ -131,7 +134,8 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
{
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosException.StatusCode,
cosmosException.Headers.SubStatusCode);
cosmosException.Headers.SubStatusCode,
cosmosException.RetryAfter);
if (shouldRetryResult != null)
{
return shouldRetryResult;
Expand Down Expand Up @@ -172,7 +176,8 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(

ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode);
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.Headers.RetryAfter);
if (shouldRetryResult != null)
{
return shouldRetryResult;
Expand Down Expand Up @@ -245,7 +250,8 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)

private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode)
SubStatusCodes? subStatusCode,
TimeSpan? retryAfter = null)
{
if (!statusCode.HasValue
&& (!subStatusCode.HasValue
Expand Down Expand Up @@ -356,6 +362,55 @@ private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
return this.ShouldRetryOnUnavailableEndpointStatusCodes();
}

// DTX-specific retriable codes. DTX commits carry an idempotency token making them safe
// to retry regardless of account topology (single-master, multi-master, single-region).
// 403.3 WriteForbidden and 429 throttling are already handled for all request types above.
if (this.documentServiceRequest != null
&& DistributedTransactionConstants.IsDistributedTransactionRequest(
this.documentServiceRequest.OperationType,
this.documentServiceRequest.ResourceType))
{
TimeSpan? dtxRetryDelay = null;

// 408 RequestTimeout: endpoint already marked unavailable above; retry (idempotency ensures safety).
if (statusCode == HttpStatusCode.RequestTimeout)
{
dtxRetryDelay = TimeSpan.Zero;
}
// 449/5352: coordinator race conflict — retry after server-specified backoff.
else if ((int?)statusCode == (int)StatusCodes.RetryWith
&& subStatusCode == (SubStatusCodes)DistributedTransactionConstants.DtcCoordinatorRaceConflict)
{
dtxRetryDelay = retryAfter ?? TimeSpan.Zero;
}
// 500/5411-5413: transient infrastructure failures — safe to retry for writes (idempotency guaranteed).
else if (statusCode == HttpStatusCode.InternalServerError
&& (subStatusCode == (SubStatusCodes)DistributedTransactionConstants.DtcLedgerFailure
|| subStatusCode == (SubStatusCodes)DistributedTransactionConstants.DtcAccountConfigFailure
|| subStatusCode == (SubStatusCodes)DistributedTransactionConstants.DtcDispatchFailure))
{
dtxRetryDelay = TimeSpan.Zero;
}

if (dtxRetryDelay.HasValue)
{
if (this.distributedTransactionRetryCount++ >= ClientRetryPolicy.MaxDtxRetryCount)
{
DefaultTrace.TraceInformation("ClientRetryPolicy: DTX retry budget exhausted. distributedTransactionRetryCount={0}, StatusCode={1}, SubStatusCode={2}.",
this.distributedTransactionRetryCount, statusCode, subStatusCode);
return ShouldRetryResult.NoRetry();
}

DefaultTrace.TraceWarning("ClientRetryPolicy: DTX retriable response (StatusCode={0}, SubStatusCode={1}, attempt={2}). Retrying. Failed Location: {3}",
statusCode,
subStatusCode,
this.distributedTransactionRetryCount,
this.documentServiceRequest?.RequestContext?.LocationEndpointToRoute?.ToString() ?? string.Empty);

return ShouldRetryResult.RetryAfter(dtxRetryDelay.Value);
}
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

Expand All @@ -17,15 +17,33 @@ namespace Microsoft.Azure.Cosmos

internal class DistributedTransactionCommitter
{
private static readonly TimeSpan DefaultRetryBaseDelay = TimeSpan.FromSeconds(1);

internal const int MaxIsRetriableRetryCount = 100;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious, if this retries 100 times, and inside ClientRetryPolicy we retry 100 times, wouldn't it be like 100k retries for a single request?
To avoid this confusion, maybe we can combine these two retry mechanisms into a single layer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think worst this is 10k retries on a single request (before throttling) . If we want to combine maybe a passing response header to the client retry policy could work? If we wanted to keep the 2 retry layers and not have a new header we can probably just limit it to 3-5 retries here to be safe.


private readonly IReadOnlyList<DistributedTransactionOperation> operations;
private readonly CosmosClientContext clientContext;
private readonly TimeSpan retryBaseDelay;
private readonly Random jitter = new Random();
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
private readonly Func<TimeSpan, CancellationToken, Task> delayProvider;

public DistributedTransactionCommitter(
IReadOnlyList<DistributedTransactionOperation> operations,
CosmosClientContext clientContext)
: this(operations, clientContext, DefaultRetryBaseDelay)
{
}

internal DistributedTransactionCommitter(
IReadOnlyList<DistributedTransactionOperation> operations,
CosmosClientContext clientContext,
TimeSpan retryBaseDelay,
Func<TimeSpan, CancellationToken, Task> delayProvider = null)
{
this.operations = operations ?? throw new ArgumentNullException(nameof(operations));
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.retryBaseDelay = retryBaseDelay;
this.delayProvider = delayProvider ?? Task.Delay;
}

public async Task<DistributedTransactionResponse> CommitTransactionAsync(CancellationToken cancellationToken)
Expand All @@ -43,24 +61,72 @@ await DistributedTransactionCommitterUtils.ResolveCollectionRidsAsync(
this.clientContext.SerializerCore,
cancellationToken);

return await this.ExecuteCommitAsync(serverRequest, cancellationToken);
return await this.ExecuteCommitWithRetryAsync(serverRequest, cancellationToken);
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
DefaultTrace.TraceError($"Distributed transaction failed: {ex.Message}");
// await this.AbortTransactionAsync(cancellationToken);
throw;
}
}

private async Task<DistributedTransactionResponse> ExecuteCommitWithRetryAsync(
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
DistributedTransactionServerRequest serverRequest,
CancellationToken cancellationToken)
{
int attempt = 0;
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
using (ITrace retryTrace = Trace.GetRootTrace("Distributed Transaction Commit", TraceComponent.Batch, TraceLevel.Info))
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

DistributedTransactionResponse response = await this.ExecuteCommitAsync(serverRequest, retryTrace, cancellationToken);

if (!response.IsSuccessStatusCode && response.IsRetriable)
{
if (attempt >= DistributedTransactionCommitter.MaxIsRetriableRetryCount)
{
DefaultTrace.TraceWarning(
$"Distributed transaction isRetriable retry budget exhausted after {attempt} attempts " +
$"(StatusCode={response.StatusCode}). Returning last response.");
return response;
}

DefaultTrace.TraceWarning(
$"Distributed transaction commit retriable (StatusCode={response.StatusCode}, " +
$"IsRetriable={response.IsRetriable}, attempt {attempt + 1}). " +
$"Retrying with idempotency token {serverRequest.IdempotencyToken}.");
response.Dispose();
await this.delayProvider(this.GetRetryDelay(attempt++), cancellationToken);
continue;
}

return response;
}
}
}

private TimeSpan GetRetryDelay(int attempt)
{
const int maxExponent = 5;
int exponent = Math.Min(attempt, maxExponent);
double baseDelayMs = this.retryBaseDelay.TotalMilliseconds * Math.Pow(2, exponent);
// Jitter: uniform random to decorrelate concurrent clients and avoid synchronized retry storms.
double jitterDelay = baseDelayMs * this.jitter.NextDouble();
return TimeSpan.FromMilliseconds((baseDelayMs * 0.5) + jitterDelay);
}

private async Task<DistributedTransactionResponse> ExecuteCommitAsync(
DistributedTransactionServerRequest serverRequest,
ITrace parentTrace,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (ITrace trace = Trace.GetRootTrace("Execute Distributed Transaction Commit", TraceComponent.Batch, TraceLevel.Info))
using (ITrace attemptTrace = parentTrace.StartChild("Execute Distributed Transaction Commit", TraceComponent.Batch, TraceLevel.Info))
{
using (MemoryStream bodyStream = serverRequest.TransferBodyStream())
using (MemoryStream bodyStream = serverRequest.CreateBodyStream())
{
ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: DistributedTransactionCommitter.GetResourceUri(),
Expand All @@ -72,25 +138,25 @@ private async Task<DistributedTransactionResponse> ExecuteCommitAsync(
itemId: null,
streamPayload: bodyStream,
requestEnricher: requestMessage => DistributedTransactionCommitter.EnrichRequestMessage(requestMessage, serverRequest),
trace: trace,
trace: attemptTrace,
cancellationToken: cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore,
serverRequest.IdempotencyToken,
trace,
cancellationToken);

DistributedTransactionCommitter.MergeSessionTokens(
response,
serverRequest,
this.clientContext.DocumentClient.sessionContainer);

return response;
using (responseMessage)
{
DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore,
parentTrace,
cancellationToken);

DistributedTransactionCommitter.MergeSessionTokens(
response,
serverRequest,
this.clientContext.DocumentClient?.sessionContainer);

return response;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,25 @@ namespace Microsoft.Azure.Cosmos

internal static class DistributedTransactionConstants
{
public static bool IsDistributedTransactionRequest(OperationType operationType, ResourceType resourceType)
// Sub-status codes returned on the envelope response for distributed transactions.
// Source: dtx-sdk-response-status-codes.md — Part A, Section 1.

/// <summary>449/5352 — Coordinator race conflict (ETag contention on the ledger exhausted).</summary>
internal const int DtcCoordinatorRaceConflict = 5352;

/// <summary>429/3200 — Ledger RU throttled and coordinator exhausted its internal retry budget.</summary>
internal const int DtcLedgerThrottled = 3200;

/// <summary>500/5411 — Ledger infrastructure failure.</summary>
internal const int DtcLedgerFailure = 5411;

/// <summary>500/5412 — Account configuration failure.</summary>
internal const int DtcAccountConfigFailure = 5412;

/// <summary>500/5413 — Coordinator dispatch failure.</summary>
internal const int DtcDispatchFailure = 5413;

internal static bool IsDistributedTransactionRequest(OperationType operationType, ResourceType resourceType)
{
return operationType == OperationType.CommitDistributedTransaction
&& resourceType == ResourceType.DistributedTransactionBatch;
Expand Down
Loading
Loading