Skip to content

Commit 6a435b0

Browse files
authored
Support enhanced fan-out feature (#90)
* Implement enhanced fan-out consumer Signed-off-by: Ilia Cimpoes <[email protected]> * Add test cases Signed-off-by: Ilia Cimpoes <[email protected]> * Small adjustments in fan-out consumer Signed-off-by: Ilia Cimpoes <[email protected]>
1 parent 022ec8d commit 6a435b0

16 files changed

+846
-384
lines changed

clientlibrary/checkpoint/dynamodb-checkpointer.go

+9-13
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ import (
3131
"time"
3232

3333
"github.com/aws/aws-sdk-go/aws"
34-
"github.com/aws/aws-sdk-go/aws/awserr"
3534
"github.com/aws/aws-sdk-go/aws/client"
3635
"github.com/aws/aws-sdk-go/aws/session"
3736
"github.com/aws/aws-sdk-go/service/dynamodb"
3837
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
3938

4039
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
4140
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
41+
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
4242
"github.com/vmware/vmware-go-kcl/logger"
4343
)
4444

@@ -144,7 +144,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
144144
return ErrLeaseNotAcquired{"current lease timeout not yet expired"}
145145
}
146146

147-
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
147+
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s, newAssignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo, newAssignTo)
148148
conditionalExpression = "ShardID = :id AND AssignedTo = :assigned_to AND LeaseTimeout = :lease_timeout"
149149
expressionAttributeValues = map[string]*dynamodb.AttributeValue{
150150
":id": {
@@ -175,18 +175,16 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
175175
marshalledCheckpoint[ParentShardIdKey] = &dynamodb.AttributeValue{S: aws.String(shard.ParentShardId)}
176176
}
177177

178-
if shard.Checkpoint != "" {
178+
if shard.GetCheckpoint() != "" {
179179
marshalledCheckpoint[SequenceNumberKey] = &dynamodb.AttributeValue{
180-
S: aws.String(shard.Checkpoint),
180+
S: aws.String(shard.GetCheckpoint()),
181181
}
182182
}
183183

184184
err = checkpointer.conditionalUpdate(conditionalExpression, expressionAttributeValues, marshalledCheckpoint)
185185
if err != nil {
186-
if awsErr, ok := err.(awserr.Error); ok {
187-
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
188-
return ErrLeaseNotAcquired{dynamodb.ErrCodeConditionalCheckFailedException}
189-
}
186+
if utils.AWSErrCode(err) == dynamodb.ErrCodeConditionalCheckFailedException {
187+
return ErrLeaseNotAcquired{dynamodb.ErrCodeConditionalCheckFailedException}
190188
}
191189
return err
192190
}
@@ -207,7 +205,7 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus)
207205
S: aws.String(shard.ID),
208206
},
209207
SequenceNumberKey: {
210-
S: aws.String(shard.Checkpoint),
208+
S: aws.String(shard.GetCheckpoint()),
211209
},
212210
LeaseOwnerKey: {
213211
S: aws.String(shard.AssignedTo),
@@ -236,12 +234,10 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
236234
return ErrSequenceIDNotFound
237235
}
238236
checkpointer.log.Debugf("Retrieved Shard Iterator %s", *sequenceID.S)
239-
shard.Mux.Lock()
240-
defer shard.Mux.Unlock()
241-
shard.Checkpoint = aws.StringValue(sequenceID.S)
237+
shard.SetCheckpoint(aws.StringValue(sequenceID.S))
242238

243239
if assignedTo, ok := checkpoint[LeaseOwnerKey]; ok {
244-
shard.AssignedTo = aws.StringValue(assignedTo.S)
240+
shard.SetLeaseOwner(aws.StringValue(assignedTo.S))
245241
}
246242
return nil
247243
}

clientlibrary/checkpoint/dynamodb-checkpointer_test.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,11 @@ import (
3333
"testing"
3434
"time"
3535

36-
"github.com/stretchr/testify/assert"
37-
3836
"github.com/aws/aws-sdk-go/aws"
3937
"github.com/aws/aws-sdk-go/aws/awserr"
4038
"github.com/aws/aws-sdk-go/service/dynamodb"
4139
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
40+
"github.com/stretchr/testify/assert"
4241

4342
cfg "github.com/vmware/vmware-go-kcl/clientlibrary/config"
4443
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
@@ -75,7 +74,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
7574
err := checkpoint.GetLease(&par.ShardStatus{
7675
ID: "0001",
7776
Checkpoint: "",
78-
Mux: &sync.Mutex{},
77+
Mux: &sync.RWMutex{},
7978
}, "abcd-efgh")
8079
if err != nil {
8180
t.Errorf("Error getting lease %s", err)
@@ -84,7 +83,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
8483
err = checkpoint.GetLease(&par.ShardStatus{
8584
ID: "0001",
8685
Checkpoint: "",
87-
Mux: &sync.Mutex{},
86+
Mux: &sync.RWMutex{},
8887
}, "ijkl-mnop")
8988
if err == nil || !errors.As(err, &ErrLeaseNotAcquired{}) {
9089
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
@@ -124,7 +123,7 @@ func TestGetLeaseAquired(t *testing.T) {
124123
shard := &par.ShardStatus{
125124
ID: "0001",
126125
Checkpoint: "deadbeef",
127-
Mux: &sync.Mutex{},
126+
Mux: &sync.RWMutex{},
128127
}
129128
err := checkpoint.GetLease(shard, "ijkl-mnop")
130129

@@ -145,7 +144,7 @@ func TestGetLeaseAquired(t *testing.T) {
145144

146145
status := &par.ShardStatus{
147146
ID: shard.ID,
148-
Mux: &sync.Mutex{},
147+
Mux: &sync.RWMutex{},
149148
}
150149
checkpoint.FetchCheckpoint(status)
151150

clientlibrary/config/config.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141

4242
"github.com/aws/aws-sdk-go/aws"
4343
creds "github.com/aws/aws-sdk-go/aws/credentials"
44+
4445
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
4546
"github.com/vmware/vmware-go-kcl/logger"
4647
)
@@ -169,13 +170,24 @@ type (
169170
// StreamName is the name of Kinesis stream
170171
StreamName string
171172

173+
// EnableEnhancedFanOutConsumer enables enhanced fan-out consumer
174+
// See: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
175+
// Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled.
176+
EnableEnhancedFanOutConsumer bool
177+
178+
// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create.
179+
EnhancedFanOutConsumerName string
180+
181+
// EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted
182+
EnhancedFanOutConsumerARN string
183+
172184
// WorkerID used to distinguish different workers/processes of a Kinesis application
173185
WorkerID string
174186

175187
// InitialPositionInStream specifies the Position in the stream where a new application should start from
176188
InitialPositionInStream InitialPositionInStream
177189

178-
// InitialPositionInStreamExtended provides actual AT_TMESTAMP value
190+
// InitialPositionInStreamExtended provides actual AT_TIMESTAMP value
179191
InitialPositionInStreamExtended InitialPositionInStreamExtended
180192

181193
// credentials to access Kinesis/Dynamo: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/
@@ -262,18 +274,18 @@ func empty(s string) bool {
262274
return len(strings.TrimSpace(s)) == 0
263275
}
264276

265-
// checkIsValuePositive make sure the value is possitive.
277+
// checkIsValueNotEmpty makes sure the value is not empty.
266278
func checkIsValueNotEmpty(key string, value string) {
267279
if empty(value) {
268280
// There is no point to continue for incorrect configuration. Fail fast!
269-
log.Panicf("Non-empty value exepected for %v, actual: %v", key, value)
281+
log.Panicf("Non-empty value expected for %v, actual: %v", key, value)
270282
}
271283
}
272284

273-
// checkIsValuePositive make sure the value is possitive.
285+
// checkIsValuePositive makes sure the value is possitive.
274286
func checkIsValuePositive(key string, value int) {
275287
if value <= 0 {
276288
// There is no point to continue for incorrect configuration. Fail fast!
277-
log.Panicf("Positive value exepected for %v, actual: %v", key, value)
289+
log.Panicf("Positive value expected for %v, actual: %v", key, value)
278290
}
279291
}

clientlibrary/config/config_test.go

+26-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
package config
2020

2121
import (
22-
"github.com/vmware/vmware-go-kcl/logger"
2322
"testing"
2423

2524
"github.com/stretchr/testify/assert"
25+
26+
"github.com/vmware/vmware-go-kcl/logger"
2627
)
2728

2829
func TestConfig(t *testing.T) {
@@ -32,13 +33,36 @@ func TestConfig(t *testing.T) {
3233
WithInitialPositionInStream(TRIM_HORIZON).
3334
WithIdleTimeBetweenReadsInMillis(20).
3435
WithCallProcessRecordsEvenForEmptyRecordList(true).
35-
WithTaskBackoffTimeMillis(10)
36+
WithTaskBackoffTimeMillis(10).
37+
WithEnhancedFanOutConsumer("fan-out-consumer")
3638

3739
assert.Equal(t, "appName", kclConfig.ApplicationName)
3840
assert.Equal(t, 500, kclConfig.FailoverTimeMillis)
3941
assert.Equal(t, 10, kclConfig.TaskBackoffTimeMillis)
42+
assert.True(t, kclConfig.EnableEnhancedFanOutConsumer)
43+
assert.Equal(t, "fan-out-consumer", kclConfig.EnhancedFanOutConsumerName)
4044

4145
contextLogger := kclConfig.Logger.WithFields(logger.Fields{"key1": "value1"})
4246
contextLogger.Debugf("Starting with default logger")
4347
contextLogger.Infof("Default logger is awesome")
4448
}
49+
50+
func TestEmptyEnhancedFanOutConsumerName(t *testing.T) {
51+
assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerName, actual: ", func() {
52+
NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumer("")
53+
})
54+
}
55+
56+
func TestConfigWithEnhancedFanOutConsumerARN(t *testing.T) {
57+
kclConfig := NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").
58+
WithEnhancedFanOutConsumerARN("consumer:arn")
59+
60+
assert.True(t, kclConfig.EnableEnhancedFanOutConsumer)
61+
assert.Equal(t, "consumer:arn", kclConfig.EnhancedFanOutConsumerARN)
62+
}
63+
64+
func TestEmptyEnhancedFanOutConsumerARN(t *testing.T) {
65+
assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerARN, actual: ", func() {
66+
NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumerARN("")
67+
})
68+
}

clientlibrary/config/kcl-config.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ import (
3737
"log"
3838
"time"
3939

40-
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
41-
4240
"github.com/aws/aws-sdk-go/aws/credentials"
41+
42+
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
4343
"github.com/vmware/vmware-go-kcl/clientlibrary/utils"
4444
"github.com/vmware/vmware-go-kcl/logger"
4545
)
@@ -212,3 +212,23 @@ func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.M
212212
c.MonitoringService = mService
213213
return c
214214
}
215+
216+
// WithEnhancedFanOutConsumer enables enhanced fan-out consumer with the specified name
217+
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
218+
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
219+
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(consumerName string) *KinesisClientLibConfiguration {
220+
checkIsValueNotEmpty("EnhancedFanOutConsumerName", consumerName)
221+
c.EnhancedFanOutConsumerName = consumerName
222+
c.EnableEnhancedFanOutConsumer = true
223+
return c
224+
}
225+
226+
// WithEnhancedFanOutConsumerARN enables enhanced fan-out consumer with the specified consumer ARN
227+
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
228+
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
229+
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN(consumerARN string) *KinesisClientLibConfiguration {
230+
checkIsValueNotEmpty("EnhancedFanOutConsumerARN", consumerARN)
231+
c.EnhancedFanOutConsumerARN = consumerARN
232+
c.EnableEnhancedFanOutConsumer = true
233+
return c
234+
}

clientlibrary/partition/partition.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type ShardStatus struct {
3737
ParentShardId string
3838
Checkpoint string
3939
AssignedTo string
40-
Mux *sync.Mutex
40+
Mux *sync.RWMutex
4141
LeaseTimeout time.Time
4242
// Shard Range
4343
StartingSequenceNumber string
@@ -46,8 +46,8 @@ type ShardStatus struct {
4646
}
4747

4848
func (ss *ShardStatus) GetLeaseOwner() string {
49-
ss.Mux.Lock()
50-
defer ss.Mux.Unlock()
49+
ss.Mux.RLock()
50+
defer ss.Mux.RUnlock()
5151
return ss.AssignedTo
5252
}
5353

@@ -56,3 +56,15 @@ func (ss *ShardStatus) SetLeaseOwner(owner string) {
5656
defer ss.Mux.Unlock()
5757
ss.AssignedTo = owner
5858
}
59+
60+
func (ss *ShardStatus) GetCheckpoint() string {
61+
ss.Mux.RLock()
62+
defer ss.Mux.RUnlock()
63+
return ss.Checkpoint
64+
}
65+
66+
func (ss *ShardStatus) SetCheckpoint(c string) {
67+
ss.Mux.Lock()
68+
defer ss.Mux.Unlock()
69+
ss.Checkpoint = c
70+
}

clientlibrary/utils/awserr.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2021 VMware, Inc.
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
5+
* associated documentation files (the "Software"), to deal in the Software without restriction, including
6+
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
* copies of the Software, and to permit persons to whom the Software is furnished to do
8+
* so, subject to the following conditions:
9+
*
10+
* The above copyright notice and this permission notice shall be included in all copies or substantial
11+
* portions of the Software.
12+
*
13+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
14+
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
15+
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
16+
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
17+
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18+
*/
19+
package utils
20+
21+
import (
22+
"github.com/aws/aws-sdk-go/aws/awserr"
23+
)
24+
25+
func AWSErrCode(err error) string {
26+
awsErr, _ := err.(awserr.Error)
27+
if awsErr != nil {
28+
return awsErr.Code()
29+
}
30+
return ""
31+
}

0 commit comments

Comments
 (0)