Skip to content

Commit 2d21b06

Browse files
committed
introduce ringbuffer and refactor repo structure
1 parent 3857dc3 commit 2d21b06

File tree

7 files changed

+306
-50
lines changed

7 files changed

+306
-50
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ A kitchen sink of data structures and algorithms in Go
33

44
# Contents
55

6-
## io
7-
### LineReader
8-
Read lines from a reader while truncating lines that exceed the destination buffer's size.
6+
* collections
7+
* io
8+
* math

collections/ringbuffer/ringbuffer.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package ringbuffer
2+
3+
import (
4+
"fmt"
5+
6+
armath "github.com/asymmetric-research/go-commons/math"
7+
)
8+
9+
type T[C any] struct {
10+
buflen uint
11+
buf []C
12+
13+
// head points to the next free slot
14+
head uint
15+
}
16+
17+
func New[C any](size int) (*T[C], error) {
18+
ret := &T[C]{}
19+
return ret, NewInto(ret, make([]C, size))
20+
21+
}
22+
23+
// NewInto assumes that
24+
func NewInto[C any](dst *T[C], buf []C) error {
25+
if len(buf) <= 0 {
26+
return fmt.Errorf("backing buffer must have a greater than zero")
27+
}
28+
*dst = T[C]{
29+
buflen: uint(len(buf)),
30+
buf: buf,
31+
head: 0,
32+
}
33+
return nil
34+
}
35+
36+
func (r *T[C]) Push(item C) {
37+
r.buf[r.head%r.buflen] = item
38+
r.head += 1
39+
}
40+
41+
func (r *T[C]) Last(dst []C) int {
42+
// how many entries can we write?
43+
maxWritable := armath.Min(r.head, r.buflen)
44+
45+
// if the dst is larger than the amount of entries we can write, let's clamp it.
46+
if len(dst) > int(maxWritable) {
47+
// only consider the first available slots of dst
48+
dst = dst[:maxWritable]
49+
}
50+
51+
headmod := int(r.head % r.buflen)
52+
53+
// we must do at most 2 copies
54+
n := 0
55+
// copy the head of our internal buffer to the tail of dst
56+
{
57+
// end of src is the head slot
58+
srcend := headmod
59+
60+
srcstart := armath.Max(0, headmod-len(dst))
61+
src := r.buf[srcstart:srcend]
62+
63+
dststart := armath.Max(0, len(dst)-headmod)
64+
dst := dst[dststart:]
65+
66+
n += copy(dst, src)
67+
}
68+
69+
// if we haven't filled the buffer, copy the tail of our internal buffer to the head of dst
70+
if n != len(dst) {
71+
// copy start of src to end of dst
72+
dst := dst[:len(dst)-n]
73+
74+
srcstart := int(maxWritable) - len(dst)
75+
src := r.buf[srcstart:]
76+
77+
n += copy(dst, src)
78+
}
79+
80+
return n
81+
}
82+
83+
func (r *T[C]) Len() uint {
84+
used := armath.Min(r.buflen, r.head)
85+
return used
86+
}
87+
88+
type SeqMode int
89+
90+
const (
91+
SEQ_MODE_FIFO SeqMode = iota
92+
SEQ_MODE_FILO
93+
)
94+
95+
func (r *T[C]) Seq(seqMode SeqMode) func(yield func(uint, C) bool) {
96+
return func(yield func(uint, C) bool) {
97+
if r.buflen == 0 {
98+
return
99+
}
100+
101+
// how many entries can we write?
102+
maxWritable := armath.Min(r.head, r.buflen)
103+
104+
if seqMode == SEQ_MODE_FIFO {
105+
start := (((r.head - 1) % r.buflen) - maxWritable) % r.buflen
106+
107+
for i := range maxWritable {
108+
idx := (start + i) % r.buflen
109+
if !yield(i, r.buf[idx]) {
110+
return
111+
}
112+
}
113+
return
114+
}
115+
if seqMode == SEQ_MODE_FILO {
116+
start := r.head - 1
117+
for i := range maxWritable {
118+
idx := (start - i) % r.buflen
119+
if !yield(i, r.buf[idx]) {
120+
return
121+
}
122+
}
123+
return
124+
}
125+
}
126+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package ringbuffer
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestRange(t *testing.T) {
11+
rb, err := New[string](5)
12+
require.NoError(t, err)
13+
14+
for i := range 7 {
15+
rb.Push(fmt.Sprintf("%d", i))
16+
}
17+
18+
expected := []string{"2", "3", "4", "5", "6"}
19+
for i, v := range rb.Seq(SEQ_MODE_FIFO) {
20+
require.Equal(t, expected[i], v)
21+
}
22+
23+
expected = []string{"6", "5", "4", "3", "2"}
24+
for i, v := range rb.Seq(SEQ_MODE_FILO) {
25+
require.Equal(t, expected[i], v)
26+
}
27+
}
28+
29+
func TestCycledRingBuffer(t *testing.T) {
30+
rb, err := New[string](5)
31+
require.NoError(t, err)
32+
33+
for i := range 7 {
34+
rb.Push(fmt.Sprintf("%d", i))
35+
}
36+
37+
// ask for 3 last items
38+
lastN := make([]string, 3)
39+
n := rb.Last(lastN)
40+
lastN = lastN[:n]
41+
require.Equal(t, []string{"4", "5", "6"}, lastN)
42+
43+
// ask for 5 last
44+
lastN = make([]string, 5)
45+
n = rb.Last(lastN)
46+
lastN = lastN[:n]
47+
require.Equal(t, []string{"2", "3", "4", "5", "6"}, lastN)
48+
49+
// ask for 2 last
50+
lastN = make([]string, 2)
51+
n = rb.Last(lastN)
52+
lastN = lastN[:n]
53+
require.Equal(t, []string{"5", "6"}, lastN)
54+
55+
// ask for 1 last
56+
lastN = make([]string, 1)
57+
n = rb.Last(lastN)
58+
lastN = lastN[:n]
59+
require.Equal(t, []string{"6"}, lastN)
60+
}
61+
62+
func TestNotFilledRingBuffer(t *testing.T) {
63+
rb, err := New[string](5)
64+
require.NoError(t, err)
65+
66+
for i := range 3 {
67+
rb.Push(fmt.Sprintf("%d", i))
68+
}
69+
70+
// ask for 3 last items
71+
lastN := make([]string, 3)
72+
n := rb.Last(lastN)
73+
lastN = lastN[:n]
74+
require.Equal(t, []string{"0", "1", "2"}, lastN)
75+
76+
// ask for 5 last
77+
lastN = make([]string, 5)
78+
n = rb.Last(lastN)
79+
lastN = lastN[:n]
80+
require.Equal(t, []string{"0", "1", "2"}, lastN)
81+
82+
// ask for 2 last
83+
lastN = make([]string, 2)
84+
n = rb.Last(lastN)
85+
lastN = lastN[:n]
86+
require.Equal(t, []string{"1", "2"}, lastN)
87+
88+
// ask for 1 last
89+
lastN = make([]string, 1)
90+
n = rb.Last(lastN)
91+
lastN = lastN[:n]
92+
require.Equal(t, []string{"2"}, lastN)
93+
94+
expected := []string{"0", "1", "2"}
95+
for i, v := range rb.Seq(SEQ_MODE_FIFO) {
96+
require.Equal(t, expected[i], v)
97+
}
98+
}
99+
100+
func TestEmptyRingBuffer(t *testing.T) {
101+
rb, err := New[string](5)
102+
require.NoError(t, err)
103+
104+
// ask for 3 last items
105+
lastN := make([]string, 3)
106+
n := rb.Last(lastN)
107+
lastN = lastN[:n]
108+
require.Equal(t, []string{}, lastN)
109+
110+
// ask for 5 last
111+
lastN = make([]string, 5)
112+
n = rb.Last(lastN)
113+
lastN = lastN[:n]
114+
require.Equal(t, []string{}, lastN)
115+
116+
// ask for 2 last
117+
lastN = make([]string, 2)
118+
n = rb.Last(lastN)
119+
lastN = lastN[:n]
120+
require.Equal(t, []string{}, lastN)
121+
122+
// ask for 1 last
123+
lastN = make([]string, 1)
124+
n = rb.Last(lastN)
125+
lastN = lastN[:n]
126+
require.Equal(t, []string{}, lastN)
127+
}
128+
129+
func TestRingbufferWithoutRoom(t *testing.T) {
130+
_, err := New[string](0)
131+
require.Error(t, err)
132+
}

io/README.md

Lines changed: 0 additions & 32 deletions
This file was deleted.

io/linereader/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# LineReader
2+
## Usage
3+
```go
4+
import "github.com/asymmetric-research/go-commons/io/linereader"
5+
6+
lr := linereader.New(reader, 4096 /* blockSize */)
7+
line := [12288]byte{}
8+
9+
var err error
10+
for err == nil {
11+
n, ntrunc, err := lr.Read(line[:])
12+
lastline := line[:n]
13+
fmt.Println("%d bytes didn't fit", ntrunc)
14+
}
15+
```
16+
17+
## Benchmarks
18+
```
19+
go test -benchmem -benchtime=5s -bench=. ./io/linereader/...
20+
goos: linux
21+
goarch: amd64
22+
pkg: github.com/asymmetric-research/go-commons/io/linereader
23+
cpu: AMD Ryzen 9 5950X 16-Core Processor
24+
BenchmarkLineReaderUnbuffered-32 1241234 4680 ns/op 22560 B/op 5 allocs/op
25+
BenchmarkHashicorpsUnbuffered-32 4722 1349399 ns/op 2295410 B/op 29602 allocs/op
26+
BenchmarkGoCmdUnbuffered-32 234085 24217 ns/op 41636 B/op 289 allocs/op
27+
BenchmarkLineReaderLargeReads-32 2210827 2713 ns/op 12328 B/op 4 allocs/op
28+
BenchmarkHashicorpsLargeReads-32 4208 1406119 ns/op 2285073 B/op 29601 allocs/op
29+
BenchmarkGoCmdLargeReads-32 274774 21724 ns/op 31563 B/op 292 allocs/op
30+
```

io/line_reader.go renamed to io/linereader/linereader.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io
1+
package linereader
22

33
import (
44
"bytes"
@@ -7,21 +7,21 @@ import (
77
armath "github.com/asymmetric-research/go-commons/math"
88
)
99

10-
type LineReader struct {
10+
type T struct {
1111
reader io.Reader
1212
readbufbase []byte
1313
readbuf []byte
1414
blocksize uint
1515
}
1616

17-
func NewLineReader(reader io.Reader, blockSize uint) *LineReader {
18-
lr := &LineReader{}
19-
NewlineReaderInto(lr, reader, blockSize)
17+
func New(reader io.Reader, blockSize uint) *T {
18+
lr := &T{}
19+
NewInto(lr, reader, blockSize)
2020
return lr
2121
}
2222

23-
func NewlineReaderInto(dst *LineReader, reader io.Reader, blockSize uint) {
24-
*dst = LineReader{
23+
func NewInto(dst *T, reader io.Reader, blockSize uint) {
24+
*dst = T{
2525
reader: reader,
2626
readbufbase: make([]byte, blockSize),
2727
blocksize: blockSize,
@@ -30,7 +30,7 @@ func NewlineReaderInto(dst *LineReader, reader io.Reader, blockSize uint) {
3030

3131
// Read reads as much as possible into p, until the next newline or EOF is reached.
3232
// Every new call to read starts on a new line. The remainder of the previous line will be discarted.
33-
func (lr *LineReader) Read(dst []byte) (nread int, ndiscarted int, err error) {
33+
func (lr *T) Read(dst []byte) (nread int, ndiscarted int, err error) {
3434
// copy as much of read buffer as possible to dst
3535
if len(lr.readbuf) > 0 {
3636
// fast path: can we get a new line from the read buffer?
@@ -98,7 +98,7 @@ func (lr *LineReader) Read(dst []byte) (nread int, ndiscarted int, err error) {
9898
}
9999
}
100100

101-
func (lr *LineReader) discardRestOfLine() int {
101+
func (lr *T) discardRestOfLine() int {
102102
// discard the rest of the line in the read buffer
103103

104104
if len(lr.readbuf) > 0 {

0 commit comments

Comments
 (0)