diff --git a/balancer.go b/balancer.go index 4136fce7..ee3a2588 100644 --- a/balancer.go +++ b/balancer.go @@ -7,7 +7,6 @@ import ( "math/rand" "sort" "sync" - "sync/atomic" ) // The Balancer interface provides an abstraction of the message distribution @@ -42,8 +41,10 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int { type RoundRobin struct { ChunkSize int // Use a 32 bits integer so RoundRobin values don't need to be aligned to - // apply atomic increments. + // apply increments. counter uint32 + + mutex sync.Mutex } // Balance satisfies the Balancer interface. @@ -52,14 +53,17 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int { } func (rr *RoundRobin) balance(partitions []int) int { + rr.mutex.Lock() + defer rr.mutex.Unlock() + if rr.ChunkSize < 1 { rr.ChunkSize = 1 } length := len(partitions) - counterNow := atomic.LoadUint32(&rr.counter) + counterNow := rr.counter offset := int(counterNow / uint32(rr.ChunkSize)) - atomic.AddUint32(&rr.counter, 1) + rr.counter++ return partitions[offset%length] }