Skip to content

Commit 9ceea63

Browse files
authored
Use application name as default enhanced fan-out consumer name (#91)
* Use ApplicationName as default for EnhancedFanOutConsumerName Signed-off-by: Ilia Cimpoes <[email protected]> * Add tests Signed-off-by: Ilia Cimpoes <[email protected]>
1 parent 6a435b0 commit 9ceea63

File tree

5 files changed

+54
-13
lines changed

5 files changed

+54
-13
lines changed

clientlibrary/config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ type (
175175
// Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled.
176176
EnableEnhancedFanOutConsumer bool
177177

178-
// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create.
178+
// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used.
179179
EnhancedFanOutConsumerName string
180180

181181
// EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted

clientlibrary/config/config_test.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestConfig(t *testing.T) {
3434
WithIdleTimeBetweenReadsInMillis(20).
3535
WithCallProcessRecordsEvenForEmptyRecordList(true).
3636
WithTaskBackoffTimeMillis(10).
37-
WithEnhancedFanOutConsumer("fan-out-consumer")
37+
WithEnhancedFanOutConsumerName("fan-out-consumer")
3838

3939
assert.Equal(t, "appName", kclConfig.ApplicationName)
4040
assert.Equal(t, 500, kclConfig.FailoverTimeMillis)
@@ -47,9 +47,17 @@ func TestConfig(t *testing.T) {
4747
contextLogger.Infof("Default logger is awesome")
4848
}
4949

50+
func TestConfigDefaultEnhancedFanOutConsumerName(t *testing.T) {
51+
kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "us-west-2", "workerId")
52+
53+
assert.Equal(t, "appName", kclConfig.ApplicationName)
54+
assert.False(t, kclConfig.EnableEnhancedFanOutConsumer)
55+
assert.Equal(t, "appName", kclConfig.EnhancedFanOutConsumerName)
56+
}
57+
5058
func TestEmptyEnhancedFanOutConsumerName(t *testing.T) {
5159
assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerName, actual: ", func() {
52-
NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumer("")
60+
NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumerName("")
5361
})
5462
}
5563

clientlibrary/config/kcl-config.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
7373
KinesisCredentials: kiniesisCreds,
7474
DynamoDBCredentials: dynamodbCreds,
7575
TableName: applicationName,
76+
EnhancedFanOutConsumerName: applicationName,
7677
StreamName: streamName,
7778
RegionName: regionName,
7879
WorkerID: workerID,
@@ -213,10 +214,18 @@ func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.M
213214
return c
214215
}
215216

216-
// WithEnhancedFanOutConsumer enables enhanced fan-out consumer with the specified name
217+
// WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName.
217218
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
218219
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
219-
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(consumerName string) *KinesisClientLibConfiguration {
220+
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration {
221+
c.EnableEnhancedFanOutConsumer = enable
222+
return c
223+
}
224+
225+
// WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name
226+
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
227+
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
228+
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration {
220229
checkIsValueNotEmpty("EnhancedFanOutConsumerName", consumerName)
221230
c.EnhancedFanOutConsumerName = consumerName
222231
c.EnableEnhancedFanOutConsumer = true

clientlibrary/worker/worker.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -184,19 +184,14 @@ func (w *Worker) initialize() error {
184184

185185
if w.kclConfig.EnableEnhancedFanOutConsumer {
186186
log.Debugf("Enhanced fan-out is enabled")
187-
switch {
188-
case w.kclConfig.EnhancedFanOutConsumerARN != "":
189-
w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN
190-
case w.kclConfig.EnhancedFanOutConsumerName != "":
187+
w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN
188+
if w.consumerARN == "" {
191189
var err error
192190
w.consumerARN, err = w.fetchConsumerARNWithRetry()
193191
if err != nil {
194192
log.Errorf("Failed to fetch consumer ARN for: %s, %v", w.kclConfig.EnhancedFanOutConsumerName, err)
195193
return err
196194
}
197-
default:
198-
log.Errorf("Consumer Name or ARN were not specified with enhanced fan-out enabled")
199-
return errors.New("Consumer Name or ARN must be specified when enhanced fan-out is enabled")
200195
}
201196
}
202197

test/worker_test.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,36 @@ func TestEnhancedFanOutConsumer(t *testing.T) {
191191

192192
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
193193
WithInitialPositionInStream(cfg.LATEST).
194-
WithEnhancedFanOutConsumer(consumerName).
194+
WithEnhancedFanOutConsumerName(consumerName).
195+
WithMaxRecords(10).
196+
WithMaxLeasesForWorker(1).
197+
WithShardSyncIntervalMillis(5000).
198+
WithFailoverTimeMillis(300000).
199+
WithLogger(log)
200+
201+
runTest(kclConfig, false, t)
202+
}
203+
204+
func TestEnhancedFanOutConsumerDefaultConsumerName(t *testing.T) {
205+
// At miminal, use standard logrus logger
206+
// log := logger.NewLogrusLogger(logrus.StandardLogger())
207+
//
208+
// In order to have precise control over logging. Use logger with config
209+
config := logger.Configuration{
210+
EnableConsole: true,
211+
ConsoleLevel: logger.Debug,
212+
ConsoleJSONFormat: false,
213+
EnableFile: true,
214+
FileLevel: logger.Info,
215+
FileJSONFormat: true,
216+
Filename: "log.log",
217+
}
218+
// Use logrus logger
219+
log := logger.NewLogrusLoggerWithConfig(config)
220+
221+
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
222+
WithInitialPositionInStream(cfg.LATEST).
223+
WithEnhancedFanOutConsumer(true).
195224
WithMaxRecords(10).
196225
WithMaxLeasesForWorker(1).
197226
WithShardSyncIntervalMillis(5000).

0 commit comments

Comments
 (0)