forked from rewardStyle/kinetic
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathkinesis_writer.go
More file actions
270 lines (236 loc) · 8.58 KB
/
kinesis_writer.go
File metadata and controls
270 lines (236 loc) · 8.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package kinetic
import (
"context"
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
const (
kinesisMsgCountRateLimit = 1000 // AWS Kinesis limit of 1000 records/sec
kinesisMsgSizeRateLimit = 1000000 // AWS Kinesis limit of 1 MB/sec
)
// kinesisWriterOptions is a struct that holds all of the KinesisWriter's configurable parameters.
type kinesisWriterOptions struct {
responseReadTimeout time.Duration // maximum time to wait for PutRecords API call before timing out
msgCountRateLimit int // maximum number of records to be sent per second
msgSizeRateLimit int // maximum (transmission) size of records to be sent per second
logLevel aws.LogLevelType // log level for configuring the LogHelper's log level
Stats ProducerStatsCollector // stats collection mechanism
}
// defaultKinesisWriterOptions instantiates a kinesisWriterOptions with default values.
func defaultKinesisWriterOptions() *kinesisWriterOptions {
return &kinesisWriterOptions{
responseReadTimeout: time.Second,
msgCountRateLimit: kinesisMsgCountRateLimit,
msgSizeRateLimit: kinesisMsgSizeRateLimit,
logLevel: aws.LogOff,
Stats: &NilProducerStatsCollector{},
}
}
// KinesisWriterOptionsFn is a method signature for defining functional option methods for configuring
// the KinesisWriter.
type KinesisWriterOptionsFn func(*KinesisWriter) error
// KinesisWriterResponseReadTimeout is a functional option method for configuring the KinesisWriter's
// response read timeout
func KinesisWriterResponseReadTimeout(timeout time.Duration) KinesisWriterOptionsFn {
return func(o *KinesisWriter) error {
o.responseReadTimeout = timeout
return nil
}
}
// KinesisWriterMsgCountRateLimit is a functional option method for configuring the KinesisWriter's
// message count rate limit
func KinesisWriterMsgCountRateLimit(limit int) KinesisWriterOptionsFn {
return func(o *KinesisWriter) error {
if limit > 0 && limit <= kinesisMsgCountRateLimit {
o.msgSizeRateLimit = limit
return nil
}
return ErrInvalidMsgSizeRateLimit
}
}
// KinesisWriterMsgSizeRateLimit is a functional option method for configuring the KinesisWriter's
// message size rate limit
func KinesisWriterMsgSizeRateLimit(limit int) KinesisWriterOptionsFn {
return func(o *KinesisWriter) error {
if limit > 0 && limit <= kinesisMsgSizeRateLimit {
o.msgSizeRateLimit = limit
return nil
}
return ErrInvalidMsgSizeRateLimit
}
}
// KinesisWriterLogLevel is a functional option method for configuring the KinesisWriter's log level
func KinesisWriterLogLevel(ll aws.LogLevelType) KinesisWriterOptionsFn {
return func(o *KinesisWriter) error {
o.logLevel = ll & 0xffff0000
return nil
}
}
// KinesisWriterStats is a functional option method for configuring the KinesisWriter's stats collector
func KinesisWriterStats(sc ProducerStatsCollector) KinesisWriterOptionsFn {
return func(o *KinesisWriter) error {
o.Stats = sc
return nil
}
}
// KinesisWriter handles the API to send records to Kinesis.
type KinesisWriter struct {
*kinesisWriterOptions
*LogHelper
stream string
client kinesisiface.KinesisAPI
}
// NewKinesisWriter creates a new stream writer to write records to a Kinesis.
func NewKinesisWriter(c *aws.Config, stream string, optionFns ...KinesisWriterOptionsFn) (*KinesisWriter, error) {
sess, err := session.NewSession(c)
if err != nil {
return nil, err
}
kinesisWriter := &KinesisWriter{
kinesisWriterOptions: defaultKinesisWriterOptions(),
stream: stream,
client: kinesis.New(sess),
}
for _, option := range optionFns {
option(kinesisWriter)
}
kinesisWriter.LogHelper = &LogHelper{
LogLevel: kinesisWriter.logLevel,
Logger: c.Logger,
}
return kinesisWriter, nil
}
// PutRecords sends a batch of records to Kinesis and returns a list of records that need to be retried.
func (w *KinesisWriter) PutRecords(ctx context.Context, messages []*Message, fn messageHandler) error {
var startSendTime time.Time
var startBuildTime time.Time
start := time.Now()
var records []*kinesis.PutRecordsRequestEntry
for _, msg := range messages {
if msg != nil {
records = append(records, msg.ToRequestEntry())
}
}
req, resp := w.client.PutRecordsRequest(&kinesis.PutRecordsInput{
StreamName: aws.String(w.stream),
Records: records,
})
req.ApplyOptions(request.WithResponseReadTimeout(w.responseReadTimeout))
req.Handlers.Build.PushFront(func(r *request.Request) {
startBuildTime = time.Now()
w.LogDebug("Start PutRecords Build, took", time.Since(start))
})
req.Handlers.Build.PushBack(func(r *request.Request) {
w.Stats.UpdatePutRecordsBuildDuration(time.Since(startBuildTime))
w.LogDebug("Finished PutRecords Build, took", time.Since(start))
})
req.Handlers.Send.PushFront(func(r *request.Request) {
startSendTime = time.Now()
w.LogDebug("Start PutRecords Send took", time.Since(start))
})
req.Handlers.Send.PushBack(func(r *request.Request) {
w.Stats.UpdatePutRecordsSendDuration(time.Since(startSendTime))
w.LogDebug("Finished PutRecords Send, took", time.Since(start))
})
w.LogDebug("Starting PutRecords Build/Sign request, took", time.Since(start))
w.Stats.AddPutRecordsCalled(1)
if err := req.Send(); err != nil {
w.LogError("Error putting records:", err.Error())
return err
}
w.Stats.UpdatePutRecordsDuration(time.Since(start))
if resp == nil {
return ErrNilPutRecordsResponse
}
if resp.FailedRecordCount == nil {
return ErrNilFailedRecordCount
}
attempted := len(messages)
failed := int(aws.Int64Value(resp.FailedRecordCount))
sent := attempted - failed
w.LogDebug(fmt.Sprintf("Finished PutRecords request, %d records attempted, %d records successful, %d records failed, took %v\n", attempted, sent, failed, time.Since(start)))
for idx, record := range resp.Records {
if record.SequenceNumber != nil && record.ShardId != nil {
// TODO: per-shard metrics
messages[idx].SequenceNumber = record.SequenceNumber
messages[idx].ShardID = record.ShardId
w.Stats.AddSentSuccess(1)
} else {
switch aws.StringValue(record.ErrorCode) {
case kinesis.ErrCodeProvisionedThroughputExceededException:
w.Stats.AddWriteProvisionedThroughputExceeded(1)
default:
w.LogDebug("PutRecords record failed with error:", aws.StringValue(record.ErrorCode), aws.StringValue(record.ErrorMessage))
}
messages[idx].ErrorCode = record.ErrorCode
messages[idx].ErrorMessage = record.ErrorMessage
messages[idx].FailCount++
w.Stats.AddSentFailed(1)
fn(messages[idx])
}
}
return nil
}
// getMsgCountRateLimit returns the writer's message count rate limit
func (w *KinesisWriter) getMsgCountRateLimit() int {
return w.msgCountRateLimit
}
// getMsgSizeRateLimit returns the writer's message size rate limit
func (w *KinesisWriter) getMsgSizeRateLimit() int {
return w.msgSizeRateLimit
}
// getConcurrencyMultiplier returns the writer's concurrency multiplier. For the kinesiswriter the multiplier is the
// number of active shards for the Kinesis stream
func (w *KinesisWriter) getConcurrencyMultiplier() (int, error) {
resp, err := w.client.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(w.stream),
})
if err != nil {
w.LogError("Error describing kinesis stream: ", w.stream, err)
return 0, err
}
if resp == nil {
return 0, ErrNilDescribeStreamResponse
}
if resp.StreamDescription == nil {
return 0, ErrNilStreamDescription
}
// maps shardID to a boolean that indicates whether or not the shard is a parent shard or an adjacent parent shard
shardMap := make(map[string]bool)
for _, shard := range resp.StreamDescription.Shards {
if shard.ShardId != nil {
shardID := aws.StringValue(shard.ShardId)
if _, ok := shardMap[shardID]; !ok {
shardMap[shardID] = false
}
}
}
// Loop through all the shards and mark which ones are parents
for _, shard := range resp.StreamDescription.Shards {
if shard.ParentShardId != nil {
shardID := aws.StringValue(shard.ParentShardId)
if _, ok := shardMap[shardID]; ok {
shardMap[shardID] = true
}
}
if shard.AdjacentParentShardId != nil {
shardID := aws.StringValue(shard.AdjacentParentShardId)
if _, ok := shardMap[shardID]; ok {
shardMap[shardID] = true
}
}
}
// Determine the number of open shards by removing those shards that are reported as parents
openShardCount := len(shardMap)
for _, isParent := range shardMap {
if isParent {
openShardCount--
}
}
return openShardCount, nil
}