Skip to content

Commit 952e8ec

Browse files
committed
Wait forever for fatal errors
1 parent 24c44b7 commit 952e8ec

File tree

4 files changed

+63
-32
lines changed

4 files changed

+63
-32
lines changed

pkg/common/helper.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,19 @@ func RetryFunc(ctx context.Context,
4141
attempts int,
4242
retryInterval *time.Duration,
4343
backoff *Backoff,
44-
fn func(int) (bool, error)) error {
44+
fn func(int) (bool, error, int)) error {
4545

4646
var err error
4747
var retry bool
48+
var addAttempts int
4849

49-
for attempt := 1; attempt <= attempts; attempt++ {
50-
retry, err = fn(attempt)
50+
var attempt = 0
51+
for attempt <= attempts {
52+
53+
attempt += 1
54+
// some errors might require more attempts than expected, so allow incrementing attempts from outside
55+
retry, err, addAttempts = fn(attempt)
56+
attempt += addAttempts
5157

5258
// if there's no need to retry - we're done
5359
if !retry {
@@ -178,9 +184,19 @@ func EngineErrorIsNonFatal(err error) bool {
178184
"timeout",
179185
"refused",
180186
}
187+
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
188+
}
189+
190+
func EngineErrorIsFatal(err error) bool {
191+
var fatalEngineErrorsPartialMatch = []string{
192+
"Failed to fetch record batches",
193+
}
194+
return errorMatches(err, fatalEngineErrorsPartialMatch)
195+
}
196+
func errorMatches(err error, substrings []string) bool {
181197
if err != nil && len(err.Error()) > 0 {
182-
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
183-
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
198+
for _, substring := range substrings {
199+
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
184200
return true
185201
}
186202
}

pkg/dataplane/streamconsumergroup/claim.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,31 +126,46 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
126126
c.logger,
127127
c.getShardLocationAttempts,
128128
nil,
129-
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
129+
&c.getShardLocationBackoff, func(attempt int) (bool, error, int) {
130130
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
131131
if err != nil {
132132
if common.EngineErrorIsNonFatal(err) {
133-
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
133+
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
134134
}
135+
136+
// if the error is fatal and requires external resolution,
137+
// we don't want to fail; instead, we will inform the user via a log
138+
if common.EngineErrorIsFatal(err) {
139+
// although RetryFunc already logs the error, it logs it as a warning
140+
// to emphasize the severity, we log it again as an error
141+
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
142+
"error", err,
143+
"shard", c.shardID)
144+
// for this type of error, we always increment the attempt counter
145+
// this ensures the smooth operation of other components in Nuclio
146+
// we avoid panicking and simply wait for the issue to be resolved
147+
return true, errors.Wrap(err, "Failed to get shard location"), 1
148+
}
149+
135150
// requested for an immediate stop
136151
if err == v3ioerrors.ErrStopped {
137-
return false, nil
152+
return false, nil, 0
138153
}
139154

140155
switch errors.RootCause(err).(type) {
141156

142157
// in case of a network error, retry to avoid transient errors
143158
case *net.OpError:
144-
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
159+
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
145160

146161
// unknown error, fail now
147162
default:
148-
return false, errors.Wrap(err, "Failed to get shard location")
163+
return false, errors.Wrap(err, "Failed to get shard location"), 0
149164
}
150165
}
151166

152167
// we have shard location
153-
return false, nil
168+
return false, nil, 0
154169
}); err != nil {
155170
return errors.Wrapf(err,
156171
"Failed to get shard location state, attempts exhausted. shard id: %d",

pkg/dataplane/streamconsumergroup/streamconsumergroup.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,19 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
113113
backoff := scg.config.State.ModifyRetry.Backoff
114114
attempts := scg.config.State.ModifyRetry.Attempts
115115

116-
err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) {
116+
err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error, int) {
117117
state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency()
118118
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
119-
return true, errors.Wrap(err, "Failed getting current state from persistency")
119+
return true, errors.Wrap(err, "Failed getting current state from persistency"), 0
120120
}
121121
if common.EngineErrorIsNonFatal(err) {
122-
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
122+
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error"), 0
123123
}
124124

125125
if state == nil {
126126
state, err = newState()
127127
if err != nil {
128-
return true, errors.Wrap(err, "Failed to create state")
128+
return true, errors.Wrap(err, "Failed to create state"), 0
129129
}
130130
}
131131

@@ -137,9 +137,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
137137
if errors.Is(errors.RootCause(err), errShardRetention) {
138138

139139
// if shard retention failed the member needs to be aborted, so we can stop retrying
140-
return false, errors.Wrap(err, "Failed modifying state")
140+
return false, errors.Wrap(err, "Failed modifying state"), 0
141141
}
142-
return true, errors.Wrap(err, "Failed modifying state")
142+
return true, errors.Wrap(err, "Failed modifying state"), 0
143143
}
144144

145145
// log only on change
@@ -157,14 +157,14 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
157157
"attempt", attempt,
158158
"err", errors.RootCause(err).Error())
159159
}
160-
return true, errors.Wrap(err, "Failed setting state in persistency state")
160+
return true, errors.Wrap(err, "Failed setting state in persistency state"), 0
161161
}
162162

163163
if err := handlePostSetStateInPersistency(); err != nil {
164-
return false, errors.Wrap(err, "Failed handling post set state in persistency")
164+
return false, errors.Wrap(err, "Failed handling post set state in persistency"), 0
165165
}
166166

167-
return false, nil
167+
return false, nil, 0
168168
})
169169

170170
if err != nil {

pkg/dataplane/test/streamconsumergroup_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -214,20 +214,20 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
214214
30,
215215
&duration,
216216
nil,
217-
func(attempt int) (bool, error) {
217+
func(attempt int) (bool, error, int) {
218218
observedState, err := streamConsumerGroup.GetState()
219219
suite.Require().NoError(err)
220220
for _, sessionState := range observedState.SessionStates {
221221
if sessionState.MemberID == member.streamConsumerGroupMember.GetID() {
222222
suite.logger.DebugWith("Session state was not removed just yet")
223-
return true, nil
223+
return true, nil, 0
224224
}
225225
}
226226

227227
suite.logger.DebugWith("Session state was removed",
228228
"observedState", observedState,
229229
"memberID", member.id)
230-
return false, nil
230+
return false, nil, 0
231231
})
232232
})
233233

@@ -270,7 +270,7 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
270270
30,
271271
&duration,
272272
nil,
273-
func(attempt int) (bool, error) {
273+
func(attempt int) (bool, error, int) {
274274
observedState, err := streamConsumerGroup.GetState()
275275
suite.Require().NoError(err)
276276

@@ -279,14 +279,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
279279
suite.logger.DebugWith("retained shards",
280280
"shards", sessionState.Shards,
281281
"memberID", sessionState.MemberID)
282-
return false, nil
282+
return false, nil, 0
283283
}
284284
}
285285

286286
suite.logger.DebugWith("Session state shards were no retained just yet",
287287
"sessionStates", observedState.SessionStates,
288288
"memberID", member.streamConsumerGroupMember.GetID())
289-
return true, nil
289+
return true, nil, 0
290290

291291
})
292292
})
@@ -368,14 +368,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
368368
10,
369369
&duration,
370370
nil,
371-
func(attempt int) (bool, error) {
371+
func(attempt int) (bool, error, int) {
372372
state, err = suite.getStateFromPersistency(suite.streamPath, consumerGroupName)
373373
if err != nil {
374374
suite.logger.DebugWith("State was not retrieved from persistency",
375375
"err", err)
376-
return true, err
376+
return true, err, 0
377377
}
378-
return false, nil
378+
return false, nil, 0
379379
})
380380
suite.Require().NoError(err)
381381

@@ -391,14 +391,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
391391
10,
392392
&duration,
393393
nil,
394-
func(attempt int) (bool, error) {
394+
func(attempt int) (bool, error, int) {
395395
err = suite.setStateInPersistency(suite.streamPath, consumerGroupName, state)
396396
if err != nil {
397397
suite.logger.DebugWith("State was not set in persistency yet",
398398
"err", err)
399-
return true, err
399+
return true, err, 0
400400
}
401-
return false, nil
401+
return false, nil, 0
402402
})
403403
suite.Require().NoError(err)
404404

0 commit comments

Comments
 (0)