Skip to content

Commit 3de6499

Browse files
authored
modify test: close while sending (#8)
1 parent 03a158f commit 3de6499

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ output:
2121
linters:
2222
enable-all: true
2323
disable:
24-
- exportloopref # deprecated
24+
- tenv # deprecated
2525
- nlreturn
2626
- wsl
2727
- lll

workpool_test.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ func TestMain(m *testing.M) {
2020
}
2121

2222
func TestWorkerPoolLifeCycle(t *testing.T) {
23+
safeWait := func(initCTX context.Context, d time.Duration) func(context.Context, int) {
24+
return func(ctx context.Context, _ int) {
25+
tm := time.NewTicker(d)
26+
select {
27+
case <-tm.C:
28+
case <-ctx.Done():
29+
case <-initCTX.Done():
30+
}
31+
}
32+
}
33+
2334
t.Run("noop", func(t *testing.T) {
2435
ctx := context.Background()
2536
cb := func(ctx context.Context, item int) {}
@@ -121,28 +132,34 @@ func TestWorkerPoolLifeCycle(t *testing.T) {
121132
ErrorIs(ErrWorkerPoolStopped)(t, err)
122133
})
123134

124-
t.Run("close while sending", func(t *testing.T) {
135+
t.Run("close while 20 senders submitting", func(t *testing.T) {
125136
ctx := context.Background()
126-
cb := func(_ context.Context, it int) {
127-
time.Sleep(60 * time.Millisecond)
128-
}
129-
subject := NewWorkerPool(cb)
130-
subject.Start(ctx, 10)
137+
ctx2, ctx2Cancel := context.WithCancel(ctx)
138+
cb := safeWait(ctx2, 30*time.Millisecond)
139+
subject := NewWorkerPool(cb, WithChannelBufferSize(0))
140+
subject.Start(ctx, 2)
131141

142+
failedSubmits := atomic.Int64{}
143+
const senders = 20
132144
wg := sync.WaitGroup{}
133-
wg.Add(1)
134-
go func() {
135-
defer wg.Done()
136-
for i := range 100 {
137-
err := subject.Submit(ctx, i)
138-
if err != nil {
139-
ErrorIs(ErrWorkerPoolStopped)(t, err)
145+
wg.Add(senders)
146+
for range senders {
147+
go func() {
148+
defer wg.Done()
149+
for i := range 400 {
150+
err := subject.Submit(ctx, i)
151+
if err != nil {
152+
ErrorIs(ErrWorkerPoolStopped)(t, err)
153+
failedSubmits.Add(1)
154+
}
140155
}
141-
}
142-
}()
156+
}()
157+
}
143158

159+
ctx2Cancel()
144160
subject.Stop(ctx)
145161
wg.Wait()
162+
require.Positive(t, failedSubmits.Load(), "at least one submit should fail")
146163
})
147164

148165
t.Run("close with ctx cancel while sending", func(t *testing.T) {

0 commit comments

Comments
 (0)