Skip to content

Commit 431db91

Browse files
committed
Merge pull request #6 from postmates/rhettg-retry
Detect ProvisionedThroughputExceededException and retry
2 parents cd440b8 + 2d5c2ff commit 431db91

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

triton/stream.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package triton
22

33
import (
44
"fmt"
5+
"log"
56
"time"
67

78
"github.com/aws/aws-sdk-go/aws"
@@ -28,11 +29,13 @@ type ShardStreamReader struct {
2829
service KinesisService
2930
records []*kinesis.Record
3031
lastRequest *time.Time
32+
retries int64
3133
}
3234

3335
// Recommended minimum polling interval to keep from overloading a Kinesis
3436
// shard.
35-
const MIN_POLL_INTERVAL = 1.0 * time.Second
37+
const minPollInterval = 1.0 * time.Second
38+
const maxRetries = 3
3639

3740
func (s *ShardStreamReader) initIterator() (err error) {
3841
gsi := kinesis.GetShardIteratorInput{
@@ -56,7 +59,9 @@ func (s *ShardStreamReader) initIterator() (err error) {
5659

5760
func (s *ShardStreamReader) wait(minInterval time.Duration) {
5861
if s.lastRequest != nil {
59-
sleepTime := minInterval - time.Since(*s.lastRequest)
62+
retryDelay := time.Duration(s.retries*250) * time.Millisecond
63+
64+
sleepTime := (minInterval - time.Since(*s.lastRequest)) + retryDelay
6065
if sleepTime >= time.Duration(0) {
6166
time.Sleep(sleepTime)
6267
}
@@ -66,8 +71,44 @@ func (s *ShardStreamReader) wait(minInterval time.Duration) {
6671
s.lastRequest = &n
6772
}
6873

74+
// Documented Kinesis specific errors as well as common errors we
75+
// should probably just retry on.
76+
// http://docs.aws.amazon.com/kinesis/latest/APIReference/CommonErrors.html
77+
var retryErrorCodes = [...]string{
78+
"ProvisionedThroughputExceededException",
79+
"ServiceUnavailable",
80+
"InternalFailure",
81+
"Throttling",
82+
}
83+
84+
func (s *ShardStreamReader) isRetryError(err error) bool {
85+
if awsErr, ok := err.(awserr.Error); ok {
86+
retry := false
87+
88+
for _, code := range retryErrorCodes {
89+
if awsErr.Code() == code {
90+
retry = true
91+
break
92+
}
93+
}
94+
95+
if retry {
96+
s.retries += 1
97+
if s.retries <= maxRetries {
98+
log.Printf("%s: %s. Retrying", awsErr.Code(), awsErr.Message())
99+
return true
100+
} else {
101+
log.Printf("%s: %s. Max retries attempted", awsErr.Code(), awsErr.Message())
102+
return false
103+
}
104+
}
105+
}
106+
107+
return false
108+
}
109+
69110
func (s *ShardStreamReader) fetchMoreRecords() (err error) {
70-
s.wait(MIN_POLL_INTERVAL)
111+
s.wait(minPollInterval)
71112

72113
if s.NextIteratorValue == nil {
73114
err := s.initIterator()
@@ -83,9 +124,15 @@ func (s *ShardStreamReader) fetchMoreRecords() (err error) {
83124

84125
gro, err := s.service.GetRecords(gri)
85126
if err != nil {
86-
return err
127+
if s.isRetryError(err) {
128+
return nil
129+
} else {
130+
return err
131+
}
87132
}
88133

134+
s.retries = 0
135+
89136
s.records = gro.Records
90137
s.NextIteratorValue = gro.NextShardIterator
91138

triton/stream_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/aws/awserr"
910
"github.com/aws/aws-sdk-go/service/kinesis"
1011
)
1112

@@ -30,6 +31,22 @@ func (s *NullKinesisService) DescribeStream(input *kinesis.DescribeStreamInput)
3031
return nil, fmt.Errorf("Not Implemented")
3132
}
3233

34+
type FailingKinesisService struct{}
35+
36+
func (s *FailingKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
37+
return nil, fmt.Errorf("Not Implemented")
38+
}
39+
40+
func (s *FailingKinesisService) GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
41+
err := awserr.New("ProvisionedThroughputExceededException", "slow down dummy", fmt.Errorf("error"))
42+
return nil, err
43+
}
44+
45+
func (s *FailingKinesisService) GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) {
46+
gso := &kinesis.GetShardIteratorOutput{ShardIterator: aws.String("123")}
47+
return gso, nil
48+
}
49+
3350
func TestNewShardStreamReader(t *testing.T) {
3451
svc := NullKinesisService{}
3552

@@ -108,6 +125,26 @@ func TestFetchMoreRecords(t *testing.T) {
108125
}
109126
}
110127

128+
func TestRetryShardStreamReader(t *testing.T) {
129+
svc := FailingKinesisService{}
130+
131+
s := NewShardStreamReader(&svc, "test-stream", "shard-0001")
132+
133+
err := s.fetchMoreRecords()
134+
if err != nil {
135+
t.Errorf("Received error %v", err)
136+
return
137+
}
138+
139+
if len(s.records) != 0 {
140+
t.Errorf("Should have no records")
141+
}
142+
143+
if s.retries != 1 {
144+
t.Errorf("Should have a retry")
145+
}
146+
}
147+
111148
func TestRead(t *testing.T) {
112149
svc := newTestKinesisService()
113150
st := newTestKinesisStream("test-stream")

0 commit comments

Comments
 (0)