Skip to content

Commit d544809

Browse files
authored
refactor(bigtable): Separate admin retry options from data retry options (#12076)
* refactor(bigtable): Separate admin retry options from data retry options * Add defaultBackoff
1 parent 0eee1f9 commit d544809

File tree

3 files changed

+100
-57
lines changed

3 files changed

+100
-57
lines changed

bigtable/admin.go

+57-21
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ import (
3838
"google.golang.org/api/option"
3939
gtransport "google.golang.org/api/transport/grpc"
4040
"google.golang.org/genproto/googleapis/rpc/status"
41+
"google.golang.org/grpc/codes"
4142
"google.golang.org/grpc/metadata"
43+
grpcstatus "google.golang.org/grpc/status"
4244
"google.golang.org/protobuf/types/known/durationpb"
4345
field_mask "google.golang.org/protobuf/types/known/fieldmaskpb"
4446
"google.golang.org/protobuf/types/known/timestamppb"
@@ -47,7 +49,42 @@ import (
4749
const adminAddr = "bigtableadmin.googleapis.com:443"
4850
const mtlsAdminAddr = "bigtableadmin.mtls.googleapis.com:443"
4951

50-
var errExpiryMissing = errors.New("WithExpiry is a required option")
52+
var (
53+
errExpiryMissing = errors.New("WithExpiry is a required option")
54+
adminRetryOptions = []gax.CallOption{
55+
gax.WithRetry(func() gax.Retryer {
56+
return &bigtableAdminRetryer{
57+
Backoff: defaultBackoff,
58+
}
59+
}),
60+
}
61+
)
62+
63+
// bigtableAdminRetryer extends the generic gax Retryer, but also checks
64+
// error messages to check if operation can be retried
65+
//
66+
// Retry is made if :
67+
// - error code is one of the `idempotentRetryCodes` OR
68+
// - error code is internal and error message is one of the `retryableInternalErrMsgs`
69+
type bigtableAdminRetryer struct {
70+
gax.Backoff
71+
}
72+
73+
func (r *bigtableAdminRetryer) Retry(err error) (time.Duration, bool) {
74+
// Similar to gax.OnCodes but shares the backoff with INTERNAL retry messages check
75+
st, ok := grpcstatus.FromError(err)
76+
if !ok {
77+
return 0, false
78+
}
79+
c := st.Code()
80+
_, isIdempotent := isIdempotentRetryCode[c]
81+
if isIdempotent ||
82+
(grpcstatus.Code(err) == codes.Internal && containsAny(err.Error(), retryableInternalErrMsgs)) {
83+
pause := r.Backoff.Pause()
84+
return pause, true
85+
}
86+
return 0, false
87+
}
5188

5289
// ErrPartiallyUnavailable is returned when some locations (clusters) are
5390
// unavailable. Both partial results (retrieved from available locations)
@@ -221,7 +258,7 @@ func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
221258
var err error
222259
res, err = ac.tClient.ListTables(ctx, req)
223260
return err
224-
}, retryOptions...)
261+
}, adminRetryOptions...)
225262
if err != nil {
226263
return nil, err
227264
}
@@ -660,7 +697,7 @@ func (ac *AdminClient) getTable(ctx context.Context, table string, view btapb.Ta
660697
var err error
661698
res, err = ac.tClient.GetTable(ctx, req)
662699
return err
663-
}, retryOptions...)
700+
}, adminRetryOptions...)
664701
if err != nil {
665702
return nil, err
666703
}
@@ -913,7 +950,7 @@ func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotI
913950
var err error
914951
resp, err = ac.tClient.ListSnapshots(ctx, req)
915952
return err
916-
}, retryOptions...)
953+
}, adminRetryOptions...)
917954
if err != nil {
918955
return "", err
919956
}
@@ -1018,7 +1055,7 @@ func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot strin
10181055
var err error
10191056
resp, err = ac.tClient.GetSnapshot(ctx, req)
10201057
return err
1021-
}, retryOptions...)
1058+
}, adminRetryOptions...)
10221059
if err != nil {
10231060
return nil, err
10241061
}
@@ -1070,7 +1107,7 @@ func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string
10701107
var err error
10711108
resp, err = ac.tClient.CheckConsistency(ctx, req)
10721109
return err
1073-
}, retryOptions...)
1110+
}, adminRetryOptions...)
10741111
if err != nil {
10751112
return false, err
10761113
}
@@ -1430,7 +1467,7 @@ func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo,
14301467
var err error
14311468
res, err = iac.iClient.ListInstances(ctx, req)
14321469
return err
1433-
}, retryOptions...)
1470+
}, adminRetryOptions...)
14341471
if err != nil {
14351472
return nil, err
14361473
}
@@ -1468,7 +1505,7 @@ func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID str
14681505
var err error
14691506
res, err = iac.iClient.GetInstance(ctx, req)
14701507
return err
1471-
}, retryOptions...)
1508+
}, adminRetryOptions...)
14721509
if err != nil {
14731510
return nil, err
14741511
}
@@ -1744,7 +1781,7 @@ func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string)
17441781
var err error
17451782
res, err = iac.iClient.ListClusters(ctx, req)
17461783
return err
1747-
}, retryOptions...)
1784+
}, adminRetryOptions...)
17481785
if err != nil {
17491786
return nil, err
17501787
}
@@ -1792,7 +1829,7 @@ func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clus
17921829
var err error
17931830
c, err = iac.iClient.GetCluster(ctx, req)
17941831
return err
1795-
}, retryOptions...)
1832+
}, adminRetryOptions...)
17961833
if err != nil {
17971834
return nil, err
17981835
}
@@ -2187,7 +2224,7 @@ func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, n
21872224
var err error
21882225
ap, err = iac.iClient.GetAppProfile(ctx, profileRequest)
21892226
return err
2190-
}, retryOptions...)
2227+
}, adminRetryOptions...)
21912228
if err != nil {
21922229
return nil, err
21932230
}
@@ -2209,7 +2246,7 @@ func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID
22092246
var err error
22102247
profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest)
22112248
return err
2212-
}, retryOptions...)
2249+
}, adminRetryOptions...)
22132250
if err != nil {
22142251
return "", err
22152252
}
@@ -2228,7 +2265,6 @@ func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID
22282265
// UpdateAppProfile updates an app profile within an instance.
22292266
// updateAttrs should be set. If unset, all fields will be replaced.
22302267
func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error {
2231-
fmt.Println("Entering UpdateAppProfile")
22322268
ctx = mergeOutgoingMetadata(ctx, iac.md)
22332269

22342270
profile := &btapb.AppProfile{
@@ -2595,7 +2631,7 @@ func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupItera
25952631
var err error
25962632
resp, err = ac.tClient.ListBackups(ctx, req)
25972633
return err
2598-
}, retryOptions...)
2634+
}, adminRetryOptions...)
25992635
if err != nil {
26002636
return "", err
26012637
}
@@ -2744,7 +2780,7 @@ func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (
27442780
var err error
27452781
resp, err = ac.tClient.GetBackup(ctx, req)
27462782
return err
2747-
}, retryOptions...)
2783+
}, adminRetryOptions...)
27482784
if err != nil {
27492785
return nil, err
27502786
}
@@ -2983,7 +3019,7 @@ func (ac *AdminClient) AuthorizedViewInfo(ctx context.Context, tableID, authoriz
29833019
var err error
29843020
res, err = ac.tClient.GetAuthorizedView(ctx, req)
29853021
return err
2986-
}, retryOptions...)
3022+
}, adminRetryOptions...)
29873023

29883024
if err != nil {
29893025
return nil, err
@@ -3017,7 +3053,7 @@ func (ac *AdminClient) AuthorizedViews(ctx context.Context, tableID string) ([]s
30173053
var err error
30183054
res, err = ac.tClient.ListAuthorizedViews(ctx, req)
30193055
return err
3020-
}, retryOptions...)
3056+
}, adminRetryOptions...)
30213057
if err != nil {
30223058
return nil, err
30233059
}
@@ -3124,7 +3160,7 @@ func (iac *InstanceAdminClient) LogicalViewInfo(ctx context.Context, instanceID,
31243160
var err error
31253161
res, err = iac.iClient.GetLogicalView(ctx, req)
31263162
return err
3127-
}, retryOptions...)
3163+
}, adminRetryOptions...)
31283164

31293165
if err != nil {
31303166
return nil, err
@@ -3144,7 +3180,7 @@ func (iac *InstanceAdminClient) LogicalViews(ctx context.Context, instanceID str
31443180
var err error
31453181
res, err = iac.iClient.ListLogicalViews(ctx, req)
31463182
return err
3147-
}, retryOptions...)
3183+
}, adminRetryOptions...)
31483184
if err != nil {
31493185
return nil, err
31503186
}
@@ -3253,7 +3289,7 @@ func (iac *InstanceAdminClient) MaterializedViewInfo(ctx context.Context, instan
32533289
var err error
32543290
res, err = iac.iClient.GetMaterializedView(ctx, req)
32553291
return err
3256-
}, retryOptions...)
3292+
}, adminRetryOptions...)
32573293

32583294
if err != nil {
32593295
return nil, err
@@ -3279,7 +3315,7 @@ func (iac *InstanceAdminClient) MaterializedViews(ctx context.Context, instanceI
32793315
var err error
32803316
res, err = iac.iClient.ListMaterializedViews(ctx, req)
32813317
return err
3282-
}, retryOptions...)
3318+
}, adminRetryOptions...)
32833319
if err != nil {
32843320
return nil, err
32853321
}

bigtable/bigtable.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,16 @@ func (c *Client) Close() error {
178178
var (
179179
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
180180
isIdempotentRetryCode = make(map[codes.Code]bool)
181-
retryOptions = []gax.CallOption{
181+
182+
defaultBackoff = gax.Backoff{
183+
Initial: 100 * time.Millisecond,
184+
Max: 2 * time.Second,
185+
Multiplier: 1.2,
186+
}
187+
retryOptions = []gax.CallOption{
182188
gax.WithRetry(func() gax.Retryer {
183-
backoff := gax.Backoff{
184-
Initial: 100 * time.Millisecond,
185-
Max: 2 * time.Second,
186-
Multiplier: 1.2,
187-
}
188189
return &bigtableRetryer{
189-
Backoff: backoff,
190+
Backoff: defaultBackoff,
190191
}
191192
}),
192193
}
@@ -196,11 +197,7 @@ var (
196197

197198
executeQueryRetryOptions = []gax.CallOption{
198199
gax.WithRetry(func() gax.Retryer {
199-
backoff := gax.Backoff{
200-
Initial: 100 * time.Millisecond,
201-
Max: 2 * time.Second,
202-
Multiplier: 1.2,
203-
}
200+
backoff := defaultBackoff
204201
return &bigtableRetryer{
205202
alternateRetryCondition: isQueryExpiredViolation,
206203
Backoff: backoff,

bigtable/integration_test.go

+34-24
Original file line numberDiff line numberDiff line change
@@ -3821,6 +3821,7 @@ func TestIntegration_InstanceAdminClient_UpdateAppProfile(t *testing.T) {
38213821
uattrs ProfileAttrsToUpdate
38223822
wantProfile *btapb.AppProfile
38233823
wantErrMsg string
3824+
skip bool
38243825
}{
38253826
{
38263827
desc: "empty update",
@@ -3865,51 +3866,55 @@ func TestIntegration_InstanceAdminClient_UpdateAppProfile(t *testing.T) {
38653866
},
38663867
},
38673868
{
3868-
desc: "isolation only update DataBoost",
3869+
desc: "routing only update MultiClusterRoutingUseAnyConfig",
38693870
uattrs: ProfileAttrsToUpdate{
3870-
Isolation: &DataBoostIsolationReadOnly{
3871-
ComputeBillingOwner: HostPays,
3871+
RoutingConfig: &MultiClusterRoutingUseAnyConfig{
3872+
ClusterIDs: []string{testEnv.Config().Cluster},
38723873
},
38733874
},
38743875
wantProfile: &btapb.AppProfile{
38753876
Name: gotProfile.Name,
38763877
Etag: gotProfile.Etag,
3877-
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
3878-
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
3879-
ClusterId: testEnv.Config().Cluster,
3878+
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
3879+
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{
3880+
ClusterIds: []string{testEnv.Config().Cluster},
38803881
},
38813882
},
3882-
Isolation: &btapb.AppProfile_DataBoostIsolationReadOnly_{
3883-
DataBoostIsolationReadOnly: &btapb.AppProfile_DataBoostIsolationReadOnly{
3884-
ComputeBillingOwner: ptr(btapb.AppProfile_DataBoostIsolationReadOnly_HOST_PAYS),
3883+
Isolation: &btapb.AppProfile_StandardIsolation_{
3884+
StandardIsolation: &btapb.AppProfile_StandardIsolation{
3885+
Priority: btapb.AppProfile_PRIORITY_HIGH,
38853886
},
38863887
},
38873888
},
38883889
},
38893890
{
3890-
desc: "routing only update MultiClusterRoutingUseAnyConfig",
3891+
desc: "routing only update SingleClusterRoutingConfig",
38913892
uattrs: ProfileAttrsToUpdate{
3892-
RoutingConfig: &MultiClusterRoutingUseAnyConfig{},
3893+
RoutingConfig: &SingleClusterRoutingConfig{
3894+
ClusterID: testEnv.Config().Cluster,
3895+
},
38933896
},
38943897
wantProfile: &btapb.AppProfile{
38953898
Name: gotProfile.Name,
38963899
Etag: gotProfile.Etag,
3897-
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
3898-
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{
3899-
ClusterIds: []string{testEnv.Config().Cluster},
3900+
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
3901+
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
3902+
ClusterId: testEnv.Config().Cluster,
39003903
},
39013904
},
3902-
Isolation: &btapb.AppProfile_DataBoostIsolationReadOnly_{
3903-
DataBoostIsolationReadOnly: &btapb.AppProfile_DataBoostIsolationReadOnly{
3904-
ComputeBillingOwner: ptr(btapb.AppProfile_DataBoostIsolationReadOnly_HOST_PAYS),
3905+
Isolation: &btapb.AppProfile_StandardIsolation_{
3906+
StandardIsolation: &btapb.AppProfile_StandardIsolation{
3907+
Priority: btapb.AppProfile_PRIORITY_HIGH,
39053908
},
39063909
},
39073910
},
39083911
},
39093912
{
3910-
desc: "routing only update SingleClusterRoutingConfig",
3913+
desc: "isolation only update DataBoost",
39113914
uattrs: ProfileAttrsToUpdate{
3912-
RoutingConfig: &SingleClusterRoutingConfig{},
3915+
Isolation: &DataBoostIsolationReadOnly{
3916+
ComputeBillingOwner: HostPays,
3917+
},
39133918
},
39143919
wantProfile: &btapb.AppProfile{
39153920
Name: gotProfile.Name,
@@ -3919,14 +3924,19 @@ func TestIntegration_InstanceAdminClient_UpdateAppProfile(t *testing.T) {
39193924
ClusterId: testEnv.Config().Cluster,
39203925
},
39213926
},
3922-
Isolation: &btapb.AppProfile_StandardIsolation_{
3923-
StandardIsolation: &btapb.AppProfile_StandardIsolation{
3924-
Priority: btapb.AppProfile_PRIORITY_HIGH,
3927+
Isolation: &btapb.AppProfile_DataBoostIsolationReadOnly_{
3928+
DataBoostIsolationReadOnly: &btapb.AppProfile_DataBoostIsolationReadOnly{
3929+
ComputeBillingOwner: ptr(btapb.AppProfile_DataBoostIsolationReadOnly_HOST_PAYS),
39253930
},
39263931
},
39273932
},
3933+
skip: true,
39283934
},
39293935
} {
3936+
if test.skip {
3937+
t.Logf("skipping test: %s", test.desc)
3938+
continue
3939+
}
39303940
gotErr = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, profileID, test.uattrs)
39313941
if gotErr == nil && test.wantErrMsg != "" {
39323942
t.Fatalf("%s: UpdateAppProfile: got: nil, want: error: %v", test.desc, test.wantErrMsg)
@@ -3935,13 +3945,13 @@ func TestIntegration_InstanceAdminClient_UpdateAppProfile(t *testing.T) {
39353945
t.Fatalf("%s: UpdateAppProfile: got: %v, want: nil", test.desc, gotErr)
39363946
}
39373947
if gotErr != nil {
3938-
return
3948+
continue
39393949
}
39403950
// Retry to see if the update has been completed
39413951
testutil.Retry(t, 10, 10*time.Second, func(r *testutil.R) {
39423952
got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, profileID)
39433953
if !proto.Equal(got, test.wantProfile) {
3944-
r.Errorf("%s: got profile: %v, want profile: %v", test.desc, gotProfile, test.wantProfile)
3954+
r.Errorf("%s: got profile: %v,\n want profile: %v", test.desc, gotProfile, test.wantProfile)
39453955
}
39463956
})
39473957
}

0 commit comments

Comments
 (0)