Skip to content

Commit ab5a882

Browse files
Vlad Volodkinvladem
authored andcommitted
Address review feedback
Signed-off-by: Vlad Volodkin <vlaad@amazon.com>
1 parent 5e4f0b1 commit ab5a882

3 files changed

Lines changed: 147 additions & 13 deletions

File tree

cmd/aws-s3-csi-daemonset-mounter/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package main
2828

2929
import (
30+
"errors"
3031
"flag"
3132
"net"
3233
"os"
@@ -42,6 +43,7 @@ var (
4243
commDir = flag.String("comm-dir", "/comm", "Directory for communication socket and error files")
4344
mountpointBinDir = flag.String("mountpoint-bin-dir", os.Getenv("MOUNTPOINT_BIN_DIR"), "Directory of mount-s3 binary")
4445
recvTimeout = flag.Duration("recv-timeout", 30*time.Second, "Timeout for receiving mount options from a connection")
46+
stderrCapacity = flag.Uint("stderr-capacity", 1024*1024, "Maximum bytes of stderr to retain per Mountpoint process (tail)")
4547
)
4648

4749
const (
@@ -67,7 +69,7 @@ func main() {
6769

6870
klog.Infof("Listening on %s, mountpoint binary: %s", sockPath, mountpointPath)
6971

70-
pm := NewProcessManager(*commDir, &defaultProcessRunner{})
72+
pm := NewProcessManager(*commDir, &defaultProcessRunner{stderrCapacity: *stderrCapacity})
7173

7274
// Handle shutdown signals: terminate all MP processes gracefully
7375
sigCh := make(chan os.Signal, 1)
@@ -86,7 +88,7 @@ func main() {
8688
conn, err := listener.Accept()
8789
if err != nil {
8890
// Check if listener was closed (shutdown)
89-
if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" {
91+
if errors.Is(err, net.ErrClosed) {
9092
klog.Info("Listener closed, exiting accept loop")
9193
break
9294
}

cmd/aws-s3-csi-daemonset-mounter/process_runner.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package main
66

77
import (
8-
"bytes"
98
"io"
109
"os"
1110
"os/exec"
11+
"sync"
12+
13+
"k8s.io/klog/v2"
1214
)
1315

1416
// ProcessHandle represents a started process that can be waited on.
@@ -24,41 +26,96 @@ type ProcessRunner interface {
2426
}
2527

2628
// defaultProcessRunner is the real implementation that starts OS processes.
27-
type defaultProcessRunner struct{}
29+
type defaultProcessRunner struct {
30+
stderrCapacity uint
31+
}
2832

2933
func (r *defaultProcessRunner) Start(cmd *exec.Cmd) (ProcessHandle, error) {
30-
var stderrBuf bytes.Buffer
34+
stderrBuf := newTailBuf(r.stderrCapacity)
3135
if cmd.Stderr != nil {
32-
cmd.Stderr = io.MultiWriter(cmd.Stderr, &stderrBuf)
36+
cmd.Stderr = io.MultiWriter(cmd.Stderr, stderrBuf)
3337
} else {
34-
cmd.Stderr = &stderrBuf
38+
cmd.Stderr = stderrBuf
3539
}
3640
if err := cmd.Start(); err != nil {
3741
return nil, err
3842
}
39-
return &realProcessHandle{cmd: cmd, stderrBuf: &stderrBuf}, nil
43+
return &defaultProcessHandle{cmd: cmd, stderrBuf: stderrBuf}, nil
4044
}
4145

42-
type realProcessHandle struct {
46+
type defaultProcessHandle struct {
4347
cmd *exec.Cmd
44-
stderrBuf *bytes.Buffer
48+
stderrBuf *tailBuf
4549
}
4650

47-
func (h *realProcessHandle) Pid() int { return h.cmd.Process.Pid }
51+
func (h *defaultProcessHandle) Pid() int { return h.cmd.Process.Pid }
4852

49-
func (h *realProcessHandle) Wait() (int, []byte) {
53+
func (h *defaultProcessHandle) Wait() (int, []byte) {
5054
err := h.cmd.Wait()
5155
exitCode := 0
5256
if err != nil {
5357
if exitErr, ok := err.(*exec.ExitError); ok {
5458
exitCode = exitErr.ExitCode()
59+
} else {
60+
klog.Errorf("Unexpected error waiting for process %d: %v", h.cmd.Process.Pid, err)
61+
exitCode = 1
5562
}
5663
} else {
5764
exitCode = h.cmd.ProcessState.ExitCode()
5865
}
5966
return exitCode, h.stderrBuf.Bytes()
6067
}
6168

62-
func (h *realProcessHandle) Signal(sig os.Signal) error {
69+
func (h *defaultProcessHandle) Signal(sig os.Signal) error {
6370
return h.cmd.Process.Signal(sig)
6471
}
72+
73+
// tailBuf is a fixed-capacity ring buffer that retains the last N bytes written.
74+
type tailBuf struct {
75+
mu sync.Mutex
76+
buf []byte
77+
pos int
78+
full bool
79+
}
80+
81+
func newTailBuf(capacity uint) *tailBuf {
82+
return &tailBuf{buf: make([]byte, capacity)}
83+
}
84+
85+
func (tb *tailBuf) Write(p []byte) (int, error) {
86+
tb.mu.Lock()
87+
defer tb.mu.Unlock()
88+
n := len(p)
89+
cap := len(tb.buf)
90+
if cap == 0 {
91+
return n, nil
92+
}
93+
if len(p) >= cap {
94+
copy(tb.buf, p[len(p)-cap:])
95+
tb.pos = 0
96+
tb.full = true
97+
return n, nil
98+
}
99+
for len(p) > 0 {
100+
space := cap - tb.pos
101+
copied := copy(tb.buf[tb.pos:], p[:min(len(p), space)])
102+
tb.pos = (tb.pos + copied) % cap
103+
if tb.pos == 0 || copied == space {
104+
tb.full = true
105+
}
106+
p = p[copied:]
107+
}
108+
return n, nil
109+
}
110+
111+
func (tb *tailBuf) Bytes() []byte {
112+
tb.mu.Lock()
113+
defer tb.mu.Unlock()
114+
if !tb.full {
115+
return append([]byte(nil), tb.buf[:tb.pos]...)
116+
}
117+
out := make([]byte, len(tb.buf))
118+
n := copy(out, tb.buf[tb.pos:])
119+
copy(out[n:], tb.buf[:tb.pos])
120+
return out
121+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestTailBuf_UnderCapacity(t *testing.T) {
8+
tb := newTailBuf(10)
9+
tb.Write([]byte("hello"))
10+
got := string(tb.Bytes())
11+
if got != "hello" {
12+
t.Fatalf("expected %q, got %q", "hello", got)
13+
}
14+
}
15+
16+
func TestTailBuf_ExactCapacity(t *testing.T) {
17+
tb := newTailBuf(5)
18+
tb.Write([]byte("abcde"))
19+
got := string(tb.Bytes())
20+
if got != "abcde" {
21+
t.Fatalf("expected %q, got %q", "abcde", got)
22+
}
23+
}
24+
25+
func TestTailBuf_OverflowSingleWrite(t *testing.T) {
26+
tb := newTailBuf(5)
27+
tb.Write([]byte("abcdefgh"))
28+
got := string(tb.Bytes())
29+
if got != "defgh" {
30+
t.Fatalf("expected %q, got %q", "defgh", got)
31+
}
32+
}
33+
34+
func TestTailBuf_OverflowMultipleWrites(t *testing.T) {
35+
tb := newTailBuf(5)
36+
tb.Write([]byte("abc"))
37+
tb.Write([]byte("defgh"))
38+
got := string(tb.Bytes())
39+
if got != "defgh" {
40+
t.Fatalf("expected %q, got %q", "defgh", got)
41+
}
42+
}
43+
44+
func TestTailBuf_WrapAround(t *testing.T) {
45+
tb := newTailBuf(5)
46+
tb.Write([]byte("abcd"))
47+
tb.Write([]byte("ef"))
48+
got := string(tb.Bytes())
49+
if got != "bcdef" {
50+
t.Fatalf("expected %q, got %q", "bcdef", got)
51+
}
52+
}
53+
54+
func TestTailBuf_ManySmallWrites(t *testing.T) {
55+
tb := newTailBuf(4)
56+
for _, b := range []byte("abcdefghij") {
57+
tb.Write([]byte{b})
58+
}
59+
got := string(tb.Bytes())
60+
if got != "ghij" {
61+
t.Fatalf("expected %q, got %q", "ghij", got)
62+
}
63+
}
64+
65+
func TestTailBuf_ZeroCapacity(t *testing.T) {
66+
tb := newTailBuf(0)
67+
n, err := tb.Write([]byte("data"))
68+
if err != nil || n != 4 {
69+
t.Fatalf("expected n=4 err=nil, got n=%d err=%v", n, err)
70+
}
71+
got := string(tb.Bytes())
72+
if got != "" {
73+
t.Fatalf("expected empty, got %q", got)
74+
}
75+
}

0 commit comments

Comments
 (0)