-
Notifications
You must be signed in to change notification settings - Fork 533
Expand file tree
/
Copy pathAbstractRetryHandler.cs
More file actions
132 lines (124 loc) · 5.85 KB
/
AbstractRetryHandler.cs
File metadata and controls
132 lines (124 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
internal abstract class AbstractRetryHandler : RequestHandler
{
internal abstract Task<IDocumentClientRetryPolicy> GetRetryPolicyAsync(RequestMessage request);
public override async Task<ResponseMessage> SendAsync(
RequestMessage request,
CancellationToken cancellationToken)
{
IDocumentClientRetryPolicy retryPolicyInstance = await this.GetRetryPolicyAsync(request);
request.OnBeforeSendRequestActions += retryPolicyInstance.OnBeforeSendRequest;
try
{
return await RetryHandler.ExecuteHttpRequestAsync(
callbackMethod: () => base.SendAsync(request, cancellationToken),
callShouldRetry: (cosmosResponseMessage, token) => retryPolicyInstance.ShouldRetryAsync(cosmosResponseMessage, cancellationToken),
callShouldRetryException: (exception, token) => retryPolicyInstance.ShouldRetryAsync(exception, cancellationToken),
cancellationToken: cancellationToken);
}
catch (DocumentClientException ex)
{
return ex.ToCosmosResponseMessage(request);
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request);
}
catch (AggregateException ex)
{
// TODO: because the SDK underneath this path uses ContinueWith or task.Result we need to catch AggregateExceptions here
// in order to ensure that underlying DocumentClientExceptions get propagated up correctly. Once all ContinueWith and .Result
// is removed this catch can be safely removed.
AggregateException innerExceptions = ex.Flatten();
Exception docClientException = innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is DocumentClientException);
if (docClientException != null)
{
return ((DocumentClientException)docClientException).ToCosmosResponseMessage(request);
}
throw;
}
catch (OperationCanceledException ex)
{
throw new CosmosOperationCanceledException(ex, request.Trace);
}
finally
{
request.OnBeforeSendRequestActions -= retryPolicyInstance.OnBeforeSendRequest;
}
}
private static async Task<ResponseMessage> ExecuteHttpRequestAsync(
Func<Task<ResponseMessage>> callbackMethod,
Func<ResponseMessage, CancellationToken, Task<ShouldRetryResult>> callShouldRetry,
Func<Exception, CancellationToken, Task<ShouldRetryResult>> callShouldRetryException,
CancellationToken cancellationToken)
{
while (true)
{
ShouldRetryResult result;
try
{
cancellationToken.ThrowIfCancellationRequested();
ResponseMessage cosmosResponseMessage = await callbackMethod();
if (cosmosResponseMessage.IsSuccessStatusCode)
{
return cosmosResponseMessage;
}
result = await callShouldRetry(cosmosResponseMessage, cancellationToken);
if (!result.ShouldRetry)
{
return cosmosResponseMessage;
}
}
catch (HttpRequestException httpRequestException)
{
result = await callShouldRetryException(httpRequestException, cancellationToken);
if (!result.ShouldRetry)
{
// Today we don't translate request exceptions into status codes since this was an error before
// making the request. TODO: Figure out how to pipe this as a response instead of throwing?
throw;
}
}
catch (OperationCanceledException oce)
{
result = await callShouldRetryException(oce, cancellationToken);
if (!result.ShouldRetry)
{
throw;
}
}
catch (CosmosException cosmosException)
{
// Metadata requests (e.g., pkranges) that fail with a regional
// error throw CosmosException from within the pipeline. Without
// this catch, the exception escapes the retry loop and is caught
// by the outer catch in SendAsync, which converts it to a
// ResponseMessage without consulting the retry policy.
// By catching it here, ClientRetryPolicy can evaluate the
// failure and retry the entire operation in another region.
result = await callShouldRetryException(cosmosException, cancellationToken);
if (!result.ShouldRetry)
{
throw;
}
}
TimeSpan backoffTime = result.BackoffTime;
if (backoffTime != TimeSpan.Zero)
{
await Task.Delay(result.BackoffTime, cancellationToken);
}
}
}
}
}