Skip to content

Commit 7896c14

Browse files
committed
Support infinite Task.Spec.retryLimit
Instead, Task.Status.History now only records the limited recent TaskRecords. The number varies on backend type. 10 for Redis.
1 parent 5599568 commit 7896c14

File tree

4 files changed

+26
-14
lines changed

4 files changed

+26
-14
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,10 @@ name: "this is just an display name"
319319
# redis: 1KB
320320
payload: |
321321
You can define any task information in payload
322-
# retryLimit max value varies on backend type to prevent from overloading backend.
323-
# redis: 10
322+
# retryLimit is the maximum number of retry (negative number means infinite)
323+
# NOTE: only the limited number of recent task records will be recorded in its status.
324+
# so, if you set large value or infinite here, you will loose old task records.
325+
# please see the description of status.history field in the next section.
324326
retryLimit: 3
325327
# timeoutSeconds is for task handler timeout.
326328
# If set positive value, task handler timeout for processing this task.
@@ -361,7 +363,12 @@ status:
361363
# timestamps
362364
receivedAt: 2020-02-12T20:20:39.350631+09:00
363365
startedAt: 2020-02-12T20:20:39.351479+09:00
364-
# history of processing the task.
366+
# history of recent records of processing the task.
367+
# the limited number of recent records are recorded in this field.
368+
# the value varies on backend types to prvent overloading backends:
369+
# - redis: 10 entries
370+
# NOTE: so, if you set larger value than this limit in spec.rertryLimit,
371+
# you will loose old task records.
365372
history:
366373
# TaskRecord:
367374
# this represents a record of task handler invocation

pkg/apis/task/task.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (t *Task) IsWorkerLost(defaultTimeoutSeconds int) bool {
178178
return t.Status.CurrentRecord.ReceivedAt.Add(timeout).Before(time.Now())
179179
}
180180

181-
func (t *Task) SetSuccess(payload *string, message *string) error {
181+
func (t *Task) SetSuccess(payload *string, message *string, historyLengthLimit int) error {
182182
if t.Status.Phase != TaskPhaseProcessing {
183183
return errors.Errorf("invalid status: actual=%s expected=%s", t.Status.Phase, TaskPhaseProcessing)
184184
}
@@ -205,10 +205,14 @@ func (t *Task) SetSuccess(payload *string, message *string) error {
205205
}
206206
t.Status.History = append(t.Status.History, *current)
207207

208+
if len(t.Status.History) > historyLengthLimit {
209+
t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:]
210+
}
211+
208212
return nil
209213
}
210214

211-
func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string) (bool, error) {
215+
func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string, historyLengthLimit int) (bool, error) {
212216
if t.Status.Phase != TaskPhaseProcessing && t.Status.Phase != TaskPhaseReceived {
213217
return false, errors.Errorf("invalid status: actual=%s expected=[%s,%s]", t.Status.Phase, TaskPhaseProcessing, TaskPhaseReceived)
214218
}
@@ -233,13 +237,16 @@ func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *
233237
t.Status.History = []TaskRecord{}
234238
}
235239
t.Status.History = append(t.Status.History, *current)
240+
if len(t.Status.History) > historyLengthLimit {
241+
t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:]
242+
}
243+
236244
t.Status.FailureCount = t.Status.FailureCount + 1
237245

238246
requeue := true
239247
t.Status.Phase = TaskPhasePending
240-
241248
// no requeue because retry exceeded
242-
if t.Status.FailureCount > t.Spec.RetryLimit {
249+
if t.Spec.RetryLimit >= 0 && t.Status.FailureCount > t.Spec.RetryLimit {
243250
requeue = false
244251
t.Status.Phase = TaskPhaseFailed
245252
}

pkg/backend/redis/redis_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ var (
8989
TimeoutSeconds: 60,
9090
}
9191
SampleInvalidTaskSpec = task.TaskSpec{
92+
Name: strings.Repeat("a", MaxNameLength+1),
9293
Payload: strings.Repeat("x", PayloadMaxSizeInKB*KB+1),
9394
RetryLimit: 100,
9495
TimeoutSeconds: 0,
@@ -712,8 +713,8 @@ var _ = Describe("Backend", func() {
712713
vErr, ok := err.(*util.ValidationError)
713714
Expect(ok).To(Equal(true))
714715
Expect(len(vErr.Errors)).To(Equal(2))
716+
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Name max length"))
715717
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Payload max size is"))
716-
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.retryLimit max is"))
717718
})
718719
})
719720
When("Spec is valid", func() {

pkg/backend/redis/task.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ const (
4141
KB = 1 << 10
4242
PayloadMaxSizeInKB = 1
4343
MessageMaxSizeInKB = 1
44-
RetryLimitMax = 10
44+
HistoryLengthMax = 10
4545
MaxNameLength = 1024
4646
)
4747

@@ -650,6 +650,7 @@ func (b *Backend) SetSucceeded(ctx context.Context, queueUID, workerUID uuid.UUI
650650
err = t.SetSuccess(
651651
util.Truncate(resultPayload, PayloadMaxSizeInKB*KB),
652652
util.Truncate(message, MessageMaxSizeInKB*KB),
653+
HistoryLengthMax,
653654
)
654655
if err != nil {
655656
dlerr := b.invalidMessageDLError(
@@ -791,6 +792,7 @@ func (b *Backend) RecordFailure(ctx context.Context, queueUID, workerUID uuid.UU
791792
reason,
792793
util.Truncate(resultPayload, PayloadMaxSizeInKB*KB),
793794
util.Truncate(message, MessageMaxSizeInKB*KB),
795+
HistoryLengthMax,
794796
)
795797
if err != nil {
796798
dlerr := b.invalidMessageDLError(
@@ -924,11 +926,6 @@ func (b *Backend) validateTaskSpec(s task.TaskSpec) error {
924926
errors.Errorf("TaskSpec.Payload max size is %d Bytes (actual=%d Bytes)", maxBytes, len(s.Payload)),
925927
)
926928
}
927-
if s.RetryLimit > RetryLimitMax {
928-
vErrors = multierror.Append(vErrors,
929-
errors.Errorf("TaskSpec.retryLimit max is %d (actual=%d)", RetryLimitMax, s.RetryLimit),
930-
)
931-
}
932929
if vErrors != nil {
933930
return (*util.ValidationError)(vErrors)
934931
}

0 commit comments

Comments
 (0)