Skip to content

Commit b7e576b

Browse files
Vlad Volodkinvladem
authored andcommitted
Use armon/circbuf
Signed-off-by: Vlad Volodkin <vlaad@amazon.com>
1 parent ab5a882 commit b7e576b

6 files changed

Lines changed: 20 additions & 132 deletions

File tree

Dockerfile.local

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ COPY --from=builder /go/src/github.com/awslabs/mountpoint-s3-csi-driver/LICENSES
7676
COPY --from=builder /go/src/github.com/awslabs/mountpoint-s3-csi-driver/bin/aws-s3-csi-driver /bin/aws-s3-csi-driver
7777
COPY --from=builder /go/src/github.com/awslabs/mountpoint-s3-csi-driver/bin/aws-s3-csi-controller /bin/aws-s3-csi-controller
7878
COPY --from=builder /go/src/github.com/awslabs/mountpoint-s3-csi-driver/bin/aws-s3-csi-mounter /bin/aws-s3-csi-mounter
79+
COPY --from=builder /go/src/github.com/awslabs/mountpoint-s3-csi-driver/bin/aws-s3-csi-daemonset-mounter /bin/aws-s3-csi-daemonset-mounter
7980

8081
ENTRYPOINT ["/bin/aws-s3-csi-driver"]

cmd/aws-s3-csi-daemonset-mounter/process_manager.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const errorFileExt = ".error"
2727
// ProcessManager tracks and manages Mountpoint child processes.
2828
type ProcessManager struct {
2929
commDir string
30-
runner ProcessRunner
30+
runner ProcessRunner // interface for spawning processes; substituted in tests
3131
mu sync.Mutex
3232
processes map[string]ProcessHandle // mountId -> process handle
3333
wg sync.WaitGroup // tracks waiter goroutines
@@ -143,9 +143,15 @@ func (pw *prefixWriter) Write(p []byte) (int, error) {
143143
if len(line) == 0 && i == len(lines)-1 {
144144
break
145145
}
146-
pw.w.Write([]byte(pw.prefix))
147-
pw.w.Write(line)
148-
pw.w.Write([]byte("\n"))
146+
if _, err := pw.w.Write([]byte(pw.prefix)); err != nil {
147+
return 0, err
148+
}
149+
if _, err := pw.w.Write(line); err != nil {
150+
return 0, err
151+
}
152+
if _, err := pw.w.Write([]byte("\n")); err != nil {
153+
return 0, err
154+
}
149155
}
150156
return len(p), nil
151157
}

cmd/aws-s3-csi-daemonset-mounter/process_runner.go

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"io"
99
"os"
1010
"os/exec"
11-
"sync"
1211

12+
"github.com/armon/circbuf"
1313
"k8s.io/klog/v2"
1414
)
1515

@@ -31,7 +31,10 @@ type defaultProcessRunner struct {
3131
}
3232

3333
func (r *defaultProcessRunner) Start(cmd *exec.Cmd) (ProcessHandle, error) {
34-
stderrBuf := newTailBuf(r.stderrCapacity)
34+
stderrBuf, err := circbuf.NewBuffer(int64(r.stderrCapacity))
35+
if err != nil {
36+
return nil, err
37+
}
3538
if cmd.Stderr != nil {
3639
cmd.Stderr = io.MultiWriter(cmd.Stderr, stderrBuf)
3740
} else {
@@ -45,7 +48,7 @@ func (r *defaultProcessRunner) Start(cmd *exec.Cmd) (ProcessHandle, error) {
4548

4649
type defaultProcessHandle struct {
4750
cmd *exec.Cmd
48-
stderrBuf *tailBuf
51+
stderrBuf *circbuf.Buffer
4952
}
5053

5154
func (h *defaultProcessHandle) Pid() int { return h.cmd.Process.Pid }
@@ -69,53 +72,3 @@ func (h *defaultProcessHandle) Wait() (int, []byte) {
6972
func (h *defaultProcessHandle) Signal(sig os.Signal) error {
7073
return h.cmd.Process.Signal(sig)
7174
}
72-
73-
// tailBuf is a fixed-capacity ring buffer that retains the last N bytes written.
74-
type tailBuf struct {
75-
mu sync.Mutex
76-
buf []byte
77-
pos int
78-
full bool
79-
}
80-
81-
func newTailBuf(capacity uint) *tailBuf {
82-
return &tailBuf{buf: make([]byte, capacity)}
83-
}
84-
85-
func (tb *tailBuf) Write(p []byte) (int, error) {
86-
tb.mu.Lock()
87-
defer tb.mu.Unlock()
88-
n := len(p)
89-
cap := len(tb.buf)
90-
if cap == 0 {
91-
return n, nil
92-
}
93-
if len(p) >= cap {
94-
copy(tb.buf, p[len(p)-cap:])
95-
tb.pos = 0
96-
tb.full = true
97-
return n, nil
98-
}
99-
for len(p) > 0 {
100-
space := cap - tb.pos
101-
copied := copy(tb.buf[tb.pos:], p[:min(len(p), space)])
102-
tb.pos = (tb.pos + copied) % cap
103-
if tb.pos == 0 || copied == space {
104-
tb.full = true
105-
}
106-
p = p[copied:]
107-
}
108-
return n, nil
109-
}
110-
111-
func (tb *tailBuf) Bytes() []byte {
112-
tb.mu.Lock()
113-
defer tb.mu.Unlock()
114-
if !tb.full {
115-
return append([]byte(nil), tb.buf[:tb.pos]...)
116-
}
117-
out := make([]byte, len(tb.buf))
118-
n := copy(out, tb.buf[tb.pos:])
119-
copy(out[n:], tb.buf[:tb.pos])
120-
return out
121-
}

cmd/aws-s3-csi-daemonset-mounter/process_runner_test.go

Lines changed: 0 additions & 75 deletions
This file was deleted.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/awslabs/mountpoint-s3-csi-driver
33
go 1.26.2
44

55
require (
6+
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2
67
github.com/aws/aws-sdk-go-v2 v1.39.2
78
github.com/aws/aws-sdk-go-v2/config v1.31.12
89
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.9

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
3+
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs=
4+
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
35
github.com/aws/aws-sdk-go-v2 v1.39.2 h1:EJLg8IdbzgeD7xgvZ+I8M1e0fL0ptn/M47lianzth0I=
46
github.com/aws/aws-sdk-go-v2 v1.39.2/go.mod h1:sDioUELIUO9Znk23YVmIk86/9DOpkbyyVb1i/gUNFXY=
57
github.com/aws/aws-sdk-go-v2/config v1.31.12 h1:pYM1Qgy0dKZLHX2cXslNacbcEFMkDMl+Bcj5ROuS6p8=

0 commit comments

Comments
 (0)