Skip to content

Commit 31cb345

Browse files
authored
Merge pull request #2179 from dgageot/fix-races-buffer
Fix data races with shared bytes.Buffer using concurrent.Buffer
2 parents da45428 + 79e843e commit 31cb345

File tree

3 files changed

+53
-10
lines changed

3 files changed

+53
-10
lines changed

pkg/agent/agent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package agent
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"log/slog"
@@ -11,6 +10,7 @@ import (
1110
"github.com/stretchr/testify/require"
1211

1312
"github.com/docker/docker-agent/pkg/chat"
13+
"github.com/docker/docker-agent/pkg/concurrent"
1414
"github.com/docker/docker-agent/pkg/model/provider/base"
1515
"github.com/docker/docker-agent/pkg/tools"
1616
)
@@ -144,7 +144,7 @@ func TestModelOverride(t *testing.T) {
144144
func TestModel_LogsSelection(t *testing.T) {
145145
t.Parallel()
146146

147-
var buf bytes.Buffer
147+
var buf concurrent.Buffer
148148
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo})
149149
prev := slog.Default()
150150
slog.SetDefault(slog.New(handler))

pkg/concurrent/buffer.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package concurrent
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
)
7+
8+
// Buffer is a concurrency-safe [bytes.Buffer].
9+
// It implements [io.Writer] so it can be used anywhere a plain buffer would,
10+
// e.g. as the output target for a log handler or as subprocess stderr.
11+
type Buffer struct {
12+
mu sync.Mutex
13+
buf bytes.Buffer
14+
}
15+
16+
// Write appends p to the buffer.
17+
func (b *Buffer) Write(p []byte) (int, error) {
18+
b.mu.Lock()
19+
defer b.mu.Unlock()
20+
return b.buf.Write(p)
21+
}
22+
23+
// String returns the buffered content.
24+
func (b *Buffer) String() string {
25+
b.mu.Lock()
26+
defer b.mu.Unlock()
27+
return b.buf.String()
28+
}
29+
30+
// Reset clears the buffer.
31+
func (b *Buffer) Reset() {
32+
b.mu.Lock()
33+
defer b.mu.Unlock()
34+
b.buf.Reset()
35+
}
36+
37+
// Drain returns the buffered content and resets the buffer atomically.
38+
func (b *Buffer) Drain() string {
39+
b.mu.Lock()
40+
defer b.mu.Unlock()
41+
s := b.buf.String()
42+
b.buf.Reset()
43+
return s
44+
}

pkg/tools/builtin/lsp.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package builtin
22

33
import (
44
"bufio"
5-
"bytes"
65
"cmp"
76
"context"
87
"encoding/json"
@@ -20,6 +19,7 @@ import (
2019
"sync/atomic"
2120
"time"
2221

22+
"github.com/docker/docker-agent/pkg/concurrent"
2323
"github.com/docker/docker-agent/pkg/tools"
2424
)
2525

@@ -492,8 +492,8 @@ func (h *lspHandler) startLocked() error {
492492
return fmt.Errorf("failed to create stdout pipe: %w", err)
493493
}
494494

495-
var stderrBuf bytes.Buffer
496-
cmd.Stderr = &stderrBuf
495+
stderrBuf := &concurrent.Buffer{}
496+
cmd.Stderr = stderrBuf
497497

498498
if err := cmd.Start(); err != nil {
499499
stdin.Close()
@@ -506,7 +506,7 @@ func (h *lspHandler) startLocked() error {
506506
h.stdin = stdin
507507
h.stdout = bufio.NewReader(stdout)
508508

509-
go h.readNotifications(processCtx, &stderrBuf)
509+
go h.readNotifications(processCtx, stderrBuf)
510510

511511
slog.Debug("LSP server started successfully")
512512
return nil
@@ -1432,7 +1432,7 @@ func (h *lspHandler) readMessageLocked() ([]byte, error) {
14321432
return body, nil
14331433
}
14341434

1435-
func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *bytes.Buffer) {
1435+
func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *concurrent.Buffer) {
14361436
ticker := time.NewTicker(100 * time.Millisecond)
14371437
defer ticker.Stop()
14381438

@@ -1441,9 +1441,8 @@ func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *bytes.Buf
14411441
case <-ctx.Done():
14421442
return
14431443
case <-ticker.C:
1444-
if stderrBuf.Len() > 0 {
1445-
slog.Debug("LSP stderr", "content", stderrBuf.String())
1446-
stderrBuf.Reset()
1444+
if content := stderrBuf.Drain(); content != "" {
1445+
slog.Debug("LSP stderr", "content", content)
14471446
}
14481447
}
14491448
}

0 commit comments

Comments
 (0)