Skip to content

Commit a00403c

Browse files
Merge pull request #213 from Lorak-mmk/lwt-retry-policy
Add LWTRetryPolicy interface
2 parents c2e98cb + 44c91c7 commit a00403c

File tree

5 files changed

+76
-3
lines changed

5 files changed

+76
-3
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,4 @@ Dmitry Kropachev <[email protected]>
142142
Oliver Boyle <[email protected]>
143143
Jackson Fleming <[email protected]>
144144
Sylwia Szunejko <[email protected]>
145+
Karol Baryła <[email protected]>

doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@
321321
// execution.
322322
//
323323
// Idempotent queries are retried in case of errors based on the configured RetryPolicy.
324+
// If the query is LWT and the configured RetryPolicy additionally implements LWTRetryPolicy
325+
// interface, then the policy will be cast to LWTRetryPolicy and used this way.
324326
//
325327
// Queries can be retried even before they fail by setting a SpeculativeExecutionPolicy. The policy can
326328
// cause the driver to retry on a different node if the query is taking longer than a specified delay even before the

policies.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ type RetryPolicy interface {
156156
GetRetryType(error) RetryType
157157
}
158158

159+
// LWTRetryPolicy is a similar interface to RetryPolicy
160+
// If a query is recognized as an LWT query and its RetryPolicy satisfies this
161+
// interface, then this interface will be used instead of RetryPolicy.
162+
type LWTRetryPolicy interface {
163+
AttemptLWT(RetryableQuery) bool
164+
GetRetryTypeLWT(error) RetryType
165+
}
166+
159167
// SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
160168
//
161169
// See below for examples of usage:
@@ -175,10 +183,22 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
175183
return q.Attempts() <= s.NumRetries
176184
}
177185

186+
func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool {
187+
return s.Attempt(q)
188+
}
189+
178190
func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
179191
return RetryNextHost
180192
}
181193

194+
// Retrying on a different host is fine for normal (non-LWT) queries,
195+
// but in case of LWTs it will cause Paxos contention and possibly
196+
// even timeouts if other clients send statements touching the same
197+
// partition to the original node at the same time.
198+
func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType {
199+
return Retry
200+
}
201+
182202
// ExponentialBackoffRetryPolicy sleeps between attempts
183203
type ExponentialBackoffRetryPolicy struct {
184204
NumRetries int
@@ -193,6 +213,10 @@ func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool {
193213
return true
194214
}
195215

216+
func (e *ExponentialBackoffRetryPolicy) AttemptLWT(q RetryableQuery) bool {
217+
return e.Attempt(q)
218+
}
219+
196220
// used to calculate exponentially growing time
197221
func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration {
198222
if min <= 0 {
@@ -215,6 +239,14 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
215239
return RetryNextHost
216240
}
217241

242+
// Retrying on a different host is fine for normal (non-LWT) queries,
243+
// but in case of LWTs it will cause Paxos contention and possibly
244+
// even timeouts if other clients send statements touching the same
245+
// partition to the original node at the same time.
246+
func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType {
247+
return Retry
248+
}
249+
218250
// DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level
219251
// provided in the slice
220252
//

policies_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,14 @@ func TestSimpleRetryPolicy(t *testing.T) {
422422
}
423423
}
424424

425+
func TestLWTSimpleRetryPolicy(t *testing.T) {
426+
ebrp := &SimpleRetryPolicy{NumRetries: 2}
427+
// Verify that SimpleRetryPolicy implements both interfaces
428+
var _ RetryPolicy = ebrp
429+
var lwt_rt LWTRetryPolicy = ebrp
430+
assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry)
431+
}
432+
425433
func TestExponentialBackoffPolicy(t *testing.T) {
426434
// test with defaults
427435
sut := &ExponentialBackoffRetryPolicy{NumRetries: 2}
@@ -450,6 +458,14 @@ func TestExponentialBackoffPolicy(t *testing.T) {
450458
}
451459
}
452460

461+
func TestLWTExponentialBackoffPolicy(t *testing.T) {
462+
ebrp := &ExponentialBackoffRetryPolicy{NumRetries: 2}
463+
// Verify that ExponentialBackoffRetryPolicy implements both interfaces
464+
var _ RetryPolicy = ebrp
465+
var lwt_rt LWTRetryPolicy = ebrp
466+
assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry)
467+
}
468+
453469
func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
454470

455471
q := &Query{cons: LocalQuorum, routingInfo: &queryRoutingInfo{}}

query_executor.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
109109
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
110110
selectedHost := hostIter()
111111
rt := qry.retryPolicy()
112+
lwt_rt, use_lwt_rt := rt.(LWTRetryPolicy)
113+
// We only want to apply LWT policy to LWT queries
114+
use_lwt_rt = use_lwt_rt && qry.IsLWT()
112115

113116
var lastErr error
114117
var iter *Iter
@@ -145,14 +148,33 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
145148
}
146149

147150
// Exit if the query was successful
148-
// or no retry policy defined or retry attempts were reached
149-
if iter.err == nil || rt == nil || !rt.Attempt(qry) {
151+
// or no retry policy defined
152+
if iter.err == nil || rt == nil {
150153
return iter
151154
}
155+
156+
// or retry policy decides to not retry anymore
157+
if use_lwt_rt {
158+
if !lwt_rt.AttemptLWT(qry) {
159+
return iter
160+
}
161+
} else {
162+
if !rt.Attempt(qry) {
163+
return iter
164+
}
165+
}
166+
152167
lastErr = iter.err
153168

169+
var retry_type RetryType
170+
if use_lwt_rt {
171+
retry_type = lwt_rt.GetRetryTypeLWT(iter.err)
172+
} else {
173+
retry_type = rt.GetRetryType(iter.err)
174+
}
175+
154176
// If query is unsuccessful, check the error with RetryPolicy to retry
155-
switch rt.GetRetryType(iter.err) {
177+
switch retry_type {
156178
case Retry:
157179
// retry on the same host
158180
continue

0 commit comments

Comments
 (0)