Skip to content

Commit 428b7ab

Browse files
authored
Merge branch 'main' into feat/ulimit
2 parents b0e9a6d + 66f54ac commit 428b7ab

File tree

3 files changed

+36
-0
lines changed

3 files changed

+36
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2323
### Fixed
2424

2525
- [#8211](https://github.com/thanos-io/thanos/pull/8211) Query: fix panic on nested partial response in distributed instant query
26+
- [#8216](https://github.com/thanos-io/thanos/pull/8216) Query/Receive: fix iter race between `next()` and `stop()` introduced in https://github.com/thanos-io/thanos/pull/7821.
2627

2728
## [v0.38.0](https://github.com/thanos-io/thanos/tree/release-0.38) - 03.04.2025
2829

pkg/store/storepb/inprocess.go

+7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"iter"
1111
"runtime/debug"
12+
"sync"
1213

1314
"google.golang.org/grpc"
1415
)
@@ -40,17 +41,21 @@ type inProcessClient struct {
4041
ctx context.Context
4142
next func() (*SeriesResponse, error, bool)
4243
stop func()
44+
mu sync.Mutex // protects next and stop
4345
}
4446

4547
func newInProcessClient(ctx context.Context, next func() (*SeriesResponse, error, bool), stop func()) *inProcessClient {
4648
return &inProcessClient{
4749
ctx: ctx,
4850
next: next,
4951
stop: stop,
52+
mu: sync.Mutex{},
5053
}
5154
}
5255

5356
func (c *inProcessClient) Recv() (*SeriesResponse, error) {
57+
c.mu.Lock()
58+
defer c.mu.Unlock()
5459
resp, err, ok := c.next()
5560
if err != nil {
5661
c.stop()
@@ -70,6 +75,8 @@ func (c *inProcessClient) Context() context.Context {
7075
}
7176

7277
func (c *inProcessClient) CloseSend() error {
78+
c.mu.Lock()
79+
defer c.mu.Unlock()
7380
c.stop()
7481
return nil
7582
}

pkg/store/storepb/inprocess_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package storepb
66
import (
77
"context"
88
"io"
9+
"sync"
910
"testing"
1011

1112
"github.com/thanos-io/thanos/pkg/testutil/custom"
@@ -152,6 +153,33 @@ func TestServerAsClient(t *testing.T) {
152153
s.seriesLastReq = nil
153154
}
154155
})
156+
t.Run("race", func(t *testing.T) {
157+
s.err = nil
158+
for i := 0; i < 20; i++ {
159+
r := &SeriesRequest{
160+
MinTime: -214,
161+
MaxTime: 213,
162+
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
163+
PartialResponseStrategy: PartialResponseStrategy_ABORT,
164+
}
165+
client, err := ServerAsClient(s).Series(ctx, r)
166+
testutil.Ok(t, err)
167+
var wg sync.WaitGroup
168+
wg.Add(1)
169+
go func() {
170+
defer wg.Done()
171+
for {
172+
_, err := client.Recv()
173+
if err != nil {
174+
break
175+
}
176+
}
177+
}()
178+
testutil.Ok(t, client.CloseSend())
179+
wg.Wait()
180+
s.seriesLastReq = nil
181+
}
182+
})
155183
})
156184
t.Run("LabelNames", func(t *testing.T) {
157185
s := &testStoreServer{}

0 commit comments

Comments
 (0)