Skip to content

Commit ff6f70d

Browse files
author
Tao Jiang
authored
Fix naming convention (#85)
Minor fix on constant naming convention. Signed-off-by: Tao Jiang <[email protected]>
1 parent 055382a commit ff6f70d

File tree

8 files changed

+46
-44
lines changed

8 files changed

+46
-44
lines changed

clientlibrary/checkpoint/checkpointer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ const (
4242
// We've completely processed all records in this shard.
4343
ShardEnd = "SHARD_END"
4444

45-
// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
46-
ErrLeaseNotAquired = "Lease is already held by another node"
45+
// ErrLeaseNotAcquired is returned when we failed to get a lock on the shard
46+
ErrLeaseNotAcquired = "lease is already held by another node"
4747
)
4848

4949
// Checkpointer handles checkpointing when a record has been processed

clientlibrary/checkpoint/dynamodb-checkpointer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
142142
}
143143

144144
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
145-
return errors.New(ErrLeaseNotAquired)
145+
return errors.New(ErrLeaseNotAcquired)
146146
}
147147

148148
checkpointer.log.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo)
@@ -186,7 +186,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
186186
if err != nil {
187187
if awsErr, ok := err.(awserr.Error); ok {
188188
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
189-
return errors.New(ErrLeaseNotAquired)
189+
return errors.New(ErrLeaseNotAcquired)
190190
}
191191
}
192192
return err

clientlibrary/checkpoint/dynamodb-checkpointer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestGetLeaseNotAquired(t *testing.T) {
8585
Checkpoint: "",
8686
Mux: &sync.Mutex{},
8787
}, "ijkl-mnop")
88-
if err == nil || err.Error() != ErrLeaseNotAquired {
88+
if err == nil || err.Error() != ErrLeaseNotAcquired {
8989
t.Errorf("Got a lease when it was already held by abcd-efgh: %s", err)
9090
}
9191
}

clientlibrary/config/config.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -55,72 +55,72 @@ const (
5555

5656
// The location in the shard from which the KinesisClientLibrary will start fetching records from
5757
// when the application starts for the first time and there is no checkpoint for the shard.
58-
DEFAULT_INITIAL_POSITION_IN_STREAM = LATEST
58+
DefaultInitialPositionInStream = LATEST
5959

6060
// Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
6161
// will be regarded as having problems and it's shards will be assigned to other workers.
6262
// For applications that have a large number of shards, this may be set to a higher number to reduce
6363
// the number of DynamoDB IOPS required for tracking leases.
64-
DEFAULT_FAILOVER_TIME_MILLIS = 10000
64+
DefaultFailoverTimeMillis = 10000
6565

6666
// Period before the end of lease during which a lease is refreshed by the owner.
67-
DEFAULT_LEASE_REFRESH_PERIOD_MILLIS = 5000
67+
DefaultLeaseRefreshPeriodMillis = 5000
6868

6969
// Max records to fetch from Kinesis in a single GetRecords call.
70-
DEFAULT_MAX_RECORDS = 10000
70+
DefaultMaxRecords = 10000
7171

7272
// The default value for how long the {@link ShardConsumer} should sleep if no records are returned
7373
// from the call to
74-
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000
74+
DefaultIdletimeBetweenReadsMillis = 1000
7575

7676
// Don't call processRecords() on the record processor for empty record lists.
77-
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false
77+
DefaultDontCallProcessRecordsForEmptyRecordList = false
7878

7979
// Interval in milliseconds between polling to check for parent shard completion.
8080
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
8181
// completion of parent shards).
82-
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000
82+
DefaultParentShardPollIntervalMillis = 10000
8383

8484
// Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
85-
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000
85+
DefaultShardSyncIntervalMillis = 60000
8686

8787
// Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
8888
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
8989
// default we try to delete the ones we don't need any longer.
90-
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true
90+
DefaultCleanupLeasesUponShardsCompletion = true
9191

9292
// Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
93-
DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500
93+
DefaultTaskBackoffTimeMillis = 500
9494

9595
// KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
9696
// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
97-
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true
97+
DefaultValidateSequenceNumberBeforeCheckpointing = true
9898

9999
// The max number of leases (shards) this worker should process.
100100
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
101101
// or during deployment.
102102
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
103103
// stream due to the max limit.
104-
DEFAULT_MAX_LEASES_FOR_WORKER = math.MaxInt16
104+
DefaultMaxLeasesForWorker = math.MaxInt16
105105

106106
// Max leases to steal from another worker at one time (for load balancing).
107107
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
108108
// but can cause higher churn in the system.
109-
DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1
109+
DefaultMaxLeasesToStealAtOneTime = 1
110110

111111
// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
112-
DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10
112+
DefaultInitialLeaseTableReadCapacity = 10
113113

114114
// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
115-
DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10
115+
DefaultInitialLeaseTableWriteCapacity = 10
116116

117117
// The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
118118
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
119119
// during incremental deployments of an application).
120-
DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false
120+
DefaultSkipShardSyncAtStartupIfLeasesExist = false
121121

122122
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
123-
DEFAULT_SHUTDOWN_GRACE_MILLIS = 5000
123+
DefaultShutdownGraceMillis = 5000
124124
)
125125

126126
type (

clientlibrary/config/kcl-config.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -76,24 +76,24 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
7676
StreamName: streamName,
7777
RegionName: regionName,
7878
WorkerID: workerID,
79-
InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM,
80-
InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM),
81-
FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS,
82-
LeaseRefreshPeriodMillis: DEFAULT_LEASE_REFRESH_PERIOD_MILLIS,
83-
MaxRecords: DEFAULT_MAX_RECORDS,
84-
IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
85-
CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
86-
ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
87-
ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
88-
CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
89-
TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS,
90-
ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
91-
ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS,
92-
MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER,
93-
MaxLeasesToStealAtOneTime: DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
94-
InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
95-
InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
96-
SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
79+
InitialPositionInStream: DefaultInitialPositionInStream,
80+
InitialPositionInStreamExtended: *newInitialPosition(DefaultInitialPositionInStream),
81+
FailoverTimeMillis: DefaultFailoverTimeMillis,
82+
LeaseRefreshPeriodMillis: DefaultLeaseRefreshPeriodMillis,
83+
MaxRecords: DefaultMaxRecords,
84+
IdleTimeBetweenReadsInMillis: DefaultIdletimeBetweenReadsMillis,
85+
CallProcessRecordsEvenForEmptyRecordList: DefaultDontCallProcessRecordsForEmptyRecordList,
86+
ParentShardPollIntervalMillis: DefaultParentShardPollIntervalMillis,
87+
ShardSyncIntervalMillis: DefaultShardSyncIntervalMillis,
88+
CleanupTerminatedShardsBeforeExpiry: DefaultCleanupLeasesUponShardsCompletion,
89+
TaskBackoffTimeMillis: DefaultTaskBackoffTimeMillis,
90+
ValidateSequenceNumberBeforeCheckpointing: DefaultValidateSequenceNumberBeforeCheckpointing,
91+
ShutdownGraceMillis: DefaultShutdownGraceMillis,
92+
MaxLeasesForWorker: DefaultMaxLeasesForWorker,
93+
MaxLeasesToStealAtOneTime: DefaultMaxLeasesToStealAtOneTime,
94+
InitialLeaseTableReadCapacity: DefaultInitialLeaseTableReadCapacity,
95+
InitialLeaseTableWriteCapacity: DefaultInitialLeaseTableWriteCapacity,
96+
SkipShardSyncAtWorkerInitializationIfLeasesExist: DefaultSkipShardSyncAtStartupIfLeasesExist,
9797
Logger: logger.GetDefaultLogger(),
9898
}
9999
}

clientlibrary/worker/shard-consumer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
162162
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
163163
err = sc.checkpointer.GetLease(shard, sc.consumerID)
164164
if err != nil {
165-
if err.Error() == chk.ErrLeaseNotAquired {
165+
if err.Error() == chk.ErrLeaseNotAcquired {
166166
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
167167
return nil
168168
}
@@ -225,7 +225,7 @@ func (sc *ShardConsumer) getRecords(shard *par.ShardStatus) error {
225225

226226
recordLength := len(input.Records)
227227
recordBytes := int64(0)
228-
log.Debugf("Received %d records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest)
228+
log.Debugf("Received %d de-aggregated records, MillisBehindLatest: %v", recordLength, input.MillisBehindLatest)
229229

230230
for _, r := range dars {
231231
recordBytes += int64(len(r.Data))

clientlibrary/worker/worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func (w *Worker) eventLoop() {
277277
err = w.checkpointer.GetLease(shard, w.workerID)
278278
if err != nil {
279279
// cannot get lease on the shard
280-
if err.Error() != chk.ErrLeaseNotAquired {
280+
if err.Error() != chk.ErrLeaseNotAcquired {
281281
log.Errorf("Cannot get lease: %+v", err)
282282
}
283283
continue

test/record_processor_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
7171
// Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch
7272
// because de-aggregated records share the same sequence number.
7373
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
74-
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNumber, input.MillisBehindLatest)
74+
// Calculate the time taken from polling records and delivering to record processor for a batch.
75+
diff := input.CacheExitTime.Sub(*input.CacheEntryTime)
76+
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff)
7577
input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
7678
}
7779

0 commit comments

Comments
 (0)