Skip to content

Commit 17f99b2

Browse files
authored
Merge pull request #6 from asymmetric-research/marctrem/make-concurrent-writing-to-ringbuf-racefree
make concurrent writing to ringbuf racefree
2 parents 8695358 + 2320c75 commit 17f99b2

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

collections/ringbuffer/ringbuffer.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package ringbuffer
22

33
import (
44
"fmt"
5+
"sync/atomic"
56

67
armath "github.com/asymmetric-research/go-commons/math"
78
)
89

910
type T[C any] struct {
10-
buflen uint
11+
buflen uint64
1112
buf []C
1213

1314
// head points to the next free slot
14-
head uint
15+
head atomic.Uint64
1516
}
1617

1718
func New[C any](size int) (*T[C], error) {
@@ -26,29 +27,29 @@ func NewInto[C any](dst *T[C], buf []C) error {
2627
return fmt.Errorf("backing buffer must have a greater than zero")
2728
}
2829
*dst = T[C]{
29-
buflen: uint(len(buf)),
30+
buflen: uint64(len(buf)),
3031
buf: buf,
31-
head: 0,
3232
}
33+
dst.head.Store(0)
3334
return nil
3435
}
3536

3637
func (r *T[C]) Push(item C) {
37-
r.buf[r.head%r.buflen] = item
38-
r.head += 1
38+
nextSlot := r.head.Add(1)
39+
r.buf[(nextSlot-1)%r.buflen] = item
3940
}
4041

4142
func (r *T[C]) Last(dst []C) int {
4243
// how many entries can we write?
43-
maxWritable := armath.Min(r.head, r.buflen)
44+
maxWritable := armath.Min(r.head.Load(), r.buflen)
4445

4546
// if the dst is larger than the amount of entries we can write, let's clamp it.
4647
if len(dst) > int(maxWritable) {
4748
// only consider the first available slots of dst
4849
dst = dst[:maxWritable]
4950
}
5051

51-
headmod := int(r.head % r.buflen)
52+
headmod := int(r.head.Load() % r.buflen)
5253

5354
// we must do at most 2 copies
5455
n := 0
@@ -80,8 +81,8 @@ func (r *T[C]) Last(dst []C) int {
8081
return n
8182
}
8283

83-
func (r *T[C]) Len() uint {
84-
used := armath.Min(r.buflen, r.head)
84+
func (r *T[C]) Len() uint64 {
85+
used := armath.Min(r.buflen, r.head.Load())
8586
return used
8687
}
8788

@@ -92,17 +93,19 @@ const (
9293
SEQ_MODE_FILO
9394
)
9495

95-
func (r *T[C]) Seq(seqMode SeqMode) func(yield func(uint, C) bool) {
96-
return func(yield func(uint, C) bool) {
96+
func (r *T[C]) Seq(seqMode SeqMode) func(yield func(uint64, C) bool) {
97+
return func(yield func(uint64, C) bool) {
9798
if r.buflen == 0 {
9899
return
99100
}
100101

102+
head := r.head.Load()
103+
101104
// how many entries can we write?
102-
maxWritable := armath.Min(r.head, r.buflen)
105+
maxWritable := armath.Min(head, r.buflen)
103106

104107
if seqMode == SEQ_MODE_FIFO {
105-
start := (((r.head - 1) % r.buflen) - maxWritable) % r.buflen
108+
start := (((head - 1) % r.buflen) - maxWritable) % r.buflen
106109

107110
for i := range maxWritable {
108111
idx := (start + i) % r.buflen
@@ -113,7 +116,7 @@ func (r *T[C]) Seq(seqMode SeqMode) func(yield func(uint, C) bool) {
113116
return
114117
}
115118
if seqMode == SEQ_MODE_FILO {
116-
start := r.head - 1
119+
start := head - 1
117120
for i := range maxWritable {
118121
idx := (start - i) % r.buflen
119122
if !yield(i, r.buf[idx]) {

0 commit comments

Comments
 (0)