Skip to content

Commit 32689e8

Browse files
alxtkr77Alex
andauthored
Add retries to itemscursor (#138)
* Add retries to itemscursor * Assaf's comments Co-authored-by: Alex <alext@iguaz.io>
1 parent 599eeea commit 32689e8

File tree

2 files changed

+129
-4
lines changed

2 files changed

+129
-4
lines changed

pkg/dataplane/itemscursor.go

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@ such restriction.
1919
*/
2020
package v3io
2121

22-
import "time"
22+
import (
23+
"context"
24+
"reflect"
25+
"runtime"
26+
"time"
27+
28+
"github.com/nuclio/errors"
29+
"github.com/nuclio/logger"
30+
)
2331

2432
type ItemsCursor struct {
2533
currentItem Item
@@ -32,15 +40,126 @@ type ItemsCursor struct {
3240
getItemsInput *GetItemsInput
3341
container Container
3442
scattered bool
43+
44+
logger logger.Logger
45+
retryAttempts int
46+
retryInterval time.Duration
47+
}
48+
49+
func getFunctionName(fn interface{}) string {
50+
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
51+
}
52+
53+
func retryFuncWithResult(ctx context.Context,
54+
loggerInstance logger.Logger,
55+
attempts int,
56+
retryInterval time.Duration,
57+
fn func() (interface{}, bool, error)) (interface{}, error) {
58+
59+
// If retries are not defined, execute the function once
60+
if attempts <= 0 {
61+
attempts = 1
62+
}
63+
64+
var err error
65+
var retry bool
66+
var result interface{}
67+
68+
for attempt := 1; attempt <= attempts; attempt++ {
69+
result, retry, err = fn()
70+
71+
// if there's no need to retry - we're done
72+
if !retry {
73+
return result, err
74+
}
75+
76+
if ctx.Err() != nil {
77+
loggerInstance.WarnWithCtx(ctx,
78+
"Context error detected during retries",
79+
"ctxErr", ctx.Err(),
80+
"previousErr", err,
81+
"function", getFunctionName(fn),
82+
"attempt", attempt)
83+
84+
// return the error if one was provided
85+
if err != nil {
86+
return result, err
87+
}
88+
89+
return result, ctx.Err()
90+
}
91+
92+
// not final attempt
93+
if attempt < attempts {
94+
95+
// don't over log, no output
96+
loggerInstance.DebugWithCtx(ctx,
97+
"Failed an attempt to invoke function",
98+
"function", getFunctionName(fn),
99+
"err", errors.GetErrorStackString(err, 10),
100+
"attempt", attempt)
101+
}
102+
103+
time.Sleep(retryInterval)
104+
}
105+
106+
// attempts exhausted and we're unsuccessful
107+
// Return the original error for later checking
108+
loggerInstance.WarnWithCtx(ctx,
109+
"Failed final attempt to invoke function",
110+
"function", getFunctionName(fn),
111+
"err", errors.GetErrorStackString(err, 10),
112+
"attempts", attempts)
113+
114+
// this shouldn't happen
115+
if err == nil {
116+
loggerInstance.ErrorWithCtx(ctx,
117+
"Failed final attempt to invoke function, but error is nil. This shouldn't happen",
118+
"function", getFunctionName(fn),
119+
"err", errors.GetErrorStackString(err, 10),
120+
"attempts", attempts)
121+
return result, errors.New("Failed final attempt to invoke function without proper error supplied")
122+
}
123+
return result, err
124+
}
125+
126+
func executeV3ioRequestWithRetriesAndResult(ctx context.Context,
127+
logger logger.Logger,
128+
retries int,
129+
interval time.Duration,
130+
fn func() (interface{}, error)) (interface{}, error) {
131+
132+
result, err := retryFuncWithResult(ctx,
133+
logger,
134+
retries,
135+
interval,
136+
func() (interface{}, bool, error) {
137+
result, err := fn()
138+
return result, err != nil, err
139+
})
140+
return result, err
141+
}
142+
func (ic *ItemsCursor) getItemsSync(container Container, getItemsInput *GetItemsInput) (*Response, error) {
143+
if getItemsInput.RetryAttempts < 2 {
144+
return container.GetItemsSync(getItemsInput)
145+
}
146+
responseInterface, err := executeV3ioRequestWithRetriesAndResult(context.TODO(),
147+
getItemsInput.Logger,
148+
getItemsInput.RetryAttempts,
149+
getItemsInput.RetryInterval,
150+
func() (interface{}, error) {
151+
response, err := container.GetItemsSync(getItemsInput)
152+
return response, err
153+
})
154+
return responseInterface.(*Response), err
35155
}
36156

37157
func NewItemsCursor(container Container, getItemsInput *GetItemsInput) (*ItemsCursor, error) {
38158
newItemsCursor := &ItemsCursor{
39159
container: container,
40160
getItemsInput: getItemsInput,
41161
}
42-
43-
response, err := container.GetItemsSync(getItemsInput)
162+
response, err := newItemsCursor.getItemsSync(container, getItemsInput)
44163
if err != nil {
45164
return nil, err
46165
}
@@ -101,7 +220,7 @@ func (ic *ItemsCursor) NextItemSync() (Item, error) {
101220
}
102221

103222
// invoke get items
104-
newResponse, err := ic.container.GetItemsSync(ic.getItemsInput)
223+
newResponse, err := ic.getItemsSync(ic.container, ic.getItemsInput)
105224
if err != nil {
106225
ic.currentError = err
107226
return nil, err

pkg/dataplane/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"strconv"
2424
"strings"
2525
"time"
26+
27+
"github.com/nuclio/logger"
2628
)
2729

2830
//
@@ -301,6 +303,10 @@ type GetItemsInput struct {
301303
DataMaxSize int
302304
RequestJSONResponse bool `json:"RequestJsonResponse"`
303305
ChokeGetItemsMS int
306+
307+
Logger logger.Logger
308+
RetryAttempts int
309+
RetryInterval time.Duration
304310
}
305311

306312
type GetItemsOutput struct {

0 commit comments

Comments
 (0)