16
16
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
17
17
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18
18
*/
19
+
20
+ // Package worker
19
21
// The implementation is derived from https://github.com/patrobinson/gokini
20
22
//
21
23
// Copyright 2018 Patrick robinson
28
30
package worker
29
31
30
32
import (
33
+ "crypto/rand"
31
34
"errors"
32
- "math/rand "
35
+ "math/big "
33
36
"sync"
34
37
"time"
35
38
@@ -45,11 +48,9 @@ import (
45
48
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
46
49
)
47
50
48
- /**
49
- * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
50
- * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
51
- * the shards).
52
- */
51
+ //Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
52
+ //different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
53
+ //the shards).
53
54
type Worker struct {
54
55
streamName string
55
56
regionName string
@@ -66,7 +67,7 @@ type Worker struct {
66
67
waitGroup * sync.WaitGroup
67
68
done bool
68
69
69
- rng * rand. Rand
70
+ randomSeed int64
70
71
71
72
shardStatus map [string ]* par.ShardStatus
72
73
shardStealInProgress bool
@@ -80,9 +81,6 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
80
81
mService = metrics.NoopMonitoringService {}
81
82
}
82
83
83
- // Create a pseudo-random number generator and seed it.
84
- rng := rand .New (rand .NewSource (time .Now ().UnixNano ()))
85
-
86
84
return & Worker {
87
85
streamName : kclConfig .StreamName ,
88
86
regionName : kclConfig .RegionName ,
@@ -91,7 +89,7 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli
91
89
kclConfig : kclConfig ,
92
90
mService : mService ,
93
91
done : false ,
94
- rng : rng ,
92
+ randomSeed : time . Now (). UTC (). UnixNano () ,
95
93
}
96
94
}
97
95
@@ -108,7 +106,7 @@ func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker {
108
106
return w
109
107
}
110
108
111
- // Run starts consuming data from the stream, and pass it to the application record processors.
109
+ // Start Run starts consuming data from the stream, and pass it to the application record processors.
112
110
func (w * Worker ) Start () error {
113
111
log := w .kclConfig .Logger
114
112
if err := w .initialize (); err != nil {
@@ -133,7 +131,7 @@ func (w *Worker) Start() error {
133
131
return nil
134
132
}
135
133
136
- // Shutdown signals worker to shutdown . Worker will try initiating shutdown of all record processors.
134
+ // Shutdown signals worker to shut down . Worker will try initiating shutdown of all record processors.
137
135
func (w * Worker ) Shutdown () {
138
136
log := w .kclConfig .Logger
139
137
log .Infof ("Worker shutdown in requested." )
@@ -258,7 +256,8 @@ func (w *Worker) eventLoop() {
258
256
// starts at the same time, this decreases the probability of them calling
259
257
// kinesis.DescribeStream at the same time, and hit the hard-limit on aws API calls.
260
258
// On average the period remains the same so that doesn't affect behavior.
261
- shardSyncSleep := w .kclConfig .ShardSyncIntervalMillis / 2 + w .rng .Intn (w .kclConfig .ShardSyncIntervalMillis )
259
+ rnd , _ := rand .Int (rand .Reader , big .NewInt (int64 (w .kclConfig .ShardSyncIntervalMillis )))
260
+ shardSyncSleep := w .kclConfig .ShardSyncIntervalMillis / 2 + int (rnd .Int64 ())
262
261
263
262
err := w .syncShard ()
264
263
if err != nil {
@@ -290,7 +289,7 @@ func (w *Worker) eventLoop() {
290
289
291
290
err := w .checkpointer .FetchCheckpoint (shard )
292
291
if err != nil {
293
- // checkpoint may not existed yet is not an error condition.
292
+ // checkpoint may not exist yet is not an error condition.
294
293
if err != chk .ErrSequenceIDNotFound {
295
294
log .Warnf ("Couldn't fetch checkpoint: %+v" , err )
296
295
// move on to next shard
@@ -371,7 +370,7 @@ func (w *Worker) rebalance() error {
371
370
return err
372
371
}
373
372
374
- // Only attempt to steal one shard at at time, to allow for linear convergence
373
+ // Only attempt to steal one shard at time, to allow for linear convergence
375
374
if w .shardStealInProgress {
376
375
shardInfo := make (map [string ]bool )
377
376
err := w .getShardIDs ("" , shardInfo )
@@ -418,12 +417,12 @@ func (w *Worker) rebalance() error {
418
417
log .Debugf ("We have enough shards, not attempting to steal any. workerID: %s" , w .workerID )
419
418
return nil
420
419
}
421
- maxShards := int ( optimalShards )
420
+
422
421
var workerSteal string
423
422
for worker , shards := range workers {
424
- if worker != w .workerID && len (shards ) > maxShards {
423
+ if worker != w .workerID && len (shards ) > optimalShards {
425
424
workerSteal = worker
426
- maxShards = len (shards )
425
+ optimalShards = len (shards )
427
426
}
428
427
}
429
428
// Not all shards are allocated so fallback to default shard allocation mechanisms
@@ -434,7 +433,8 @@ func (w *Worker) rebalance() error {
434
433
435
434
// Steal a random shard from the worker with the most shards
436
435
w .shardStealInProgress = true
437
- randIndex := rand .Intn (len (workers [workerSteal ]))
436
+ rnd , _ := rand .Int (rand .Reader , big .NewInt (int64 (len (workers [workerSteal ]))))
437
+ randIndex := int (rnd .Int64 ())
438
438
shardToSteal := workers [workerSteal ][randIndex ]
439
439
log .Debugf ("Stealing shard %s from %s" , shardToSteal , workerSteal )
440
440
0 commit comments