Skip to content

Commit 745ca8f

Browse files
simorenohealsur
andauthored
[Cosmos] Implements Client Retry policy (Azure#22394)
* implementation of client retry policy * ignore N-2 on ci * Update ci.yml * changes to pass ci * Update go.mod * Update go.sum * make method private, add test * enableEndpointDiscovery->enableCrossRegionRetries, remove public area change, remove duplicates * saved constants, moved logic around in policy for non-duplicity * added partial tests, missing 503s/ connectivity issues handling * finalizing behavior and tests * revert pipeline useragent, return non-retryable errors to skip Core retries * mark create/delete management plane operations as writes * force refresh ability added, delete/replace operations marked as write * remove print statements * refactor * missing comma * detecting dns failures * missing update * deal with errors fetching initial account information * linter * more linter * Update cosmos_client_retry_policy_test.go * add DNS test * fix error handling logic for dns * small fix to ensure no wrong index is called * fix new locking logic * override header for response on write metadata operations --------- Co-authored-by: Matias Quaranta <[email protected]>
1 parent f4a9a18 commit 745ca8f

16 files changed

+857
-55
lines changed

sdk/data/azcosmos/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ stages:
2525
parameters:
2626
ServiceDirectory: 'data/azcosmos'
2727
UsePipelineProxy: false
28+
ExcludeGoNMinus2: true
2829
- stage: Emulator
2930
displayName: 'Cosmos Emulator'
3031
variables:
@@ -38,7 +39,7 @@ stages:
3839
Windows_Go120:
3940
pool.name: azsdk-pool-mms-win-2022-general
4041
image.name: MMS2022
41-
go.version: '1.21.1'
42+
go.version: '1.22.0'
4243
pool:
4344
name: $(pool.name)
4445
vmImage: $(image.name)

sdk/data/azcosmos/cosmos_client.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1717
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
18-
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
1918
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
2019
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
2120
)
@@ -42,10 +41,11 @@ func (c *Client) Endpoint() string {
4241
// options - Optional Cosmos client options. Pass nil to accept default values.
4342
func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*Client, error) {
4443
preferredRegions := []string{}
44+
enableCrossRegionRetries := true
4545
if o != nil {
4646
preferredRegions = o.PreferredRegions
4747
}
48-
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0)
48+
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0, enableCrossRegionRetries)
4949
if err != nil {
5050
return nil, err
5151
}
@@ -62,10 +62,11 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
6262
return nil, err
6363
}
6464
preferredRegions := []string{}
65+
enableCrossRegionRetries := true
6566
if o != nil {
6667
preferredRegions = o.PreferredRegions
6768
}
68-
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o), preferredRegions, 0)
69+
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o), preferredRegions, 0, enableCrossRegionRetries)
6970
if err != nil {
7071
return nil, err
7172
}
@@ -124,6 +125,7 @@ func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *
124125
},
125126
PerRetry: []policy.Policy{
126127
authPolicy,
128+
&clientRetryPolicy{gem: gem},
127129
},
128130
},
129131
&options.ClientOptions)
@@ -193,10 +195,17 @@ func (c *Client) CreateDatabase(
193195
if o == nil {
194196
o = &CreateDatabaseOptions{}
195197
}
198+
returnResponse := true
199+
h := &headerOptionsOverride{
200+
enableContentResponseOnWrite: &returnResponse,
201+
}
196202

197203
operationContext := pipelineRequestOptions{
198-
resourceType: resourceTypeDatabase,
199-
resourceAddress: ""}
204+
resourceType: resourceTypeDatabase,
205+
resourceAddress: "",
206+
isWriteOperation: true,
207+
headerOptionsOverride: h,
208+
}
200209

201210
path, err := generatePathForNameBased(resourceTypeDatabase, "", true)
202211
if err != nil {
@@ -220,7 +229,7 @@ func (c *Client) CreateDatabase(
220229
// NewQueryDatabasesPager executes query for databases.
221230
// query - The SQL query to execute.
222231
// o - Options for the operation.
223-
func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions) *runtime.Pager[QueryDatabasesResponse] {
232+
func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions) *azruntime.Pager[QueryDatabasesResponse] {
224233
queryOptions := &QueryDatabasesOptions{}
225234
if o != nil {
226235
originalOptions := *o
@@ -234,7 +243,7 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)
234243

235244
path, _ := generatePathForNameBased(resourceTypeDatabase, operationContext.resourceAddress, true)
236245

237-
return runtime.NewPager(runtime.PagingHandler[QueryDatabasesResponse]{
246+
return azruntime.NewPager(azruntime.PagingHandler[QueryDatabasesResponse]{
238247
More: func(page QueryDatabasesResponse) bool {
239248
return page.ContinuationToken != ""
240249
},
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azcosmos
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"net"
10+
"net/http"
11+
"time"
12+
13+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
14+
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
15+
"github.com/Azure/azure-sdk-for-go/sdk/internal/errorinfo"
16+
)
17+
18+
type clientRetryPolicy struct {
19+
gem *globalEndpointManager
20+
useWriteEndpoint bool
21+
retryCount int
22+
sessionRetryCount int
23+
preferredLocationIndex int
24+
}
25+
26+
const maxRetryCount = 120
27+
const defaultBackoff = 1
28+
29+
func (p *clientRetryPolicy) Do(req *policy.Request) (*http.Response, error) {
30+
p.resetPolicyCounters()
31+
o := pipelineRequestOptions{}
32+
if !req.OperationValue(&o) {
33+
return nil, fmt.Errorf("failed to obtain request options, please check request being sent: %s", req.Body())
34+
}
35+
for {
36+
resolvedEndpoint := p.gem.ResolveServiceEndpoint(p.retryCount, o.isWriteOperation, p.useWriteEndpoint)
37+
req.Raw().Host = resolvedEndpoint.Host
38+
req.Raw().URL.Host = resolvedEndpoint.Host
39+
response, err := req.Next() // err can happen in weird scenarios (connectivity, etc)
40+
if err != nil {
41+
if p.isNetworkConnectionError(err) {
42+
shouldRetry, errRetry := p.attemptRetryOnNetworkError(req)
43+
if errRetry != nil {
44+
return nil, errRetry
45+
}
46+
if !shouldRetry {
47+
return nil, err
48+
}
49+
err = req.RewindBody()
50+
if err != nil {
51+
return nil, err
52+
}
53+
p.retryCount += 1
54+
continue
55+
}
56+
return nil, err
57+
}
58+
subStatus := response.Header.Get(cosmosHeaderSubstatus)
59+
if p.shouldRetryStatus(response.StatusCode, subStatus) {
60+
p.useWriteEndpoint = false
61+
if response.StatusCode == http.StatusForbidden {
62+
shouldRetry, err := p.attemptRetryOnEndpointFailure(req, o.isWriteOperation)
63+
if err != nil {
64+
return nil, err
65+
}
66+
if !shouldRetry {
67+
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
68+
}
69+
} else if response.StatusCode == http.StatusNotFound {
70+
if !p.attemptRetryOnSessionUnavailable(req, o.isWriteOperation) {
71+
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
72+
}
73+
} else if response.StatusCode == http.StatusServiceUnavailable {
74+
if !p.attemptRetryOnServiceUnavailable(req, o.isWriteOperation) {
75+
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
76+
}
77+
}
78+
err = req.RewindBody()
79+
if err != nil {
80+
return response, err
81+
}
82+
p.retryCount += 1
83+
continue
84+
}
85+
86+
return response, err
87+
}
88+
89+
}
90+
91+
func (p *clientRetryPolicy) shouldRetryStatus(status int, subStatus string) (shouldRetry bool) {
92+
if (status == http.StatusForbidden && (subStatus == subStatusWriteForbidden || subStatus == subStatusDatabaseAccountNotFound)) ||
93+
(status == http.StatusNotFound && subStatus == subStatusReadSessionNotAvailable) ||
94+
(status == http.StatusServiceUnavailable) {
95+
return true
96+
}
97+
return false
98+
}
99+
100+
func (p *clientRetryPolicy) attemptRetryOnNetworkError(req *policy.Request) (bool, error) {
101+
if (p.retryCount > maxRetryCount) || !p.gem.locationCache.enableCrossRegionRetries {
102+
return false, nil
103+
}
104+
105+
err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
106+
if err != nil {
107+
return false, err
108+
}
109+
err = p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
110+
if err != nil {
111+
return false, err
112+
}
113+
err = p.gem.Update(req.Raw().Context(), false)
114+
if err != nil {
115+
return false, err
116+
}
117+
118+
time.Sleep(defaultBackoff * time.Second)
119+
return true, nil
120+
}
121+
122+
func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, isWriteOperation bool) (bool, error) {
123+
if (p.retryCount > maxRetryCount) || !p.gem.locationCache.enableCrossRegionRetries {
124+
return false, nil
125+
}
126+
if isWriteOperation {
127+
err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
128+
if err != nil {
129+
return false, err
130+
}
131+
} else {
132+
err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
133+
if err != nil {
134+
return false, err
135+
}
136+
}
137+
138+
err := p.gem.Update(req.Raw().Context(), isWriteOperation)
139+
if err != nil {
140+
return false, err
141+
}
142+
143+
time.Sleep(defaultBackoff * time.Second)
144+
return true, nil
145+
}
146+
147+
func (p *clientRetryPolicy) attemptRetryOnSessionUnavailable(req *policy.Request, isWriteOperation bool) bool {
148+
if p.gem.CanUseMultipleWriteLocations() {
149+
endpoints := p.gem.locationCache.locationInfo.availReadLocations
150+
if isWriteOperation {
151+
endpoints = p.gem.locationCache.locationInfo.availWriteLocations
152+
}
153+
if p.sessionRetryCount >= len(endpoints) {
154+
return false
155+
}
156+
} else {
157+
if p.sessionRetryCount > 0 {
158+
return false
159+
}
160+
p.useWriteEndpoint = true
161+
}
162+
p.sessionRetryCount += 1
163+
return true
164+
}
165+
166+
func (p *clientRetryPolicy) attemptRetryOnServiceUnavailable(req *policy.Request, isWriteOperation bool) bool {
167+
if !p.gem.locationCache.enableCrossRegionRetries || p.preferredLocationIndex >= len(p.gem.preferredLocations) {
168+
return false
169+
}
170+
if isWriteOperation && !p.gem.CanUseMultipleWriteLocations() {
171+
return false
172+
}
173+
p.preferredLocationIndex += 1
174+
return true
175+
}
176+
177+
func (p *clientRetryPolicy) resetPolicyCounters() {
178+
p.retryCount = 0
179+
p.sessionRetryCount = 0
180+
p.preferredLocationIndex = 0
181+
}
182+
183+
// isNetworkConnectionError checks if the error is related to failure to connect / resolve DNS
184+
func (p *clientRetryPolicy) isNetworkConnectionError(err error) bool {
185+
var dnserror *net.DNSError
186+
return errors.As(err, &dnserror)
187+
}

0 commit comments

Comments
 (0)