Skip to content

Commit 34efbd4

Browse files
committed
fix: if the pool is saturated, execute the job in the current goroutine
1 parent dd1a0ab commit 34efbd4

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

pool.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,18 @@ func (p *Pool) Module() interface{} {
120120
// from http/grpc handler, and asyncContext is the context for async job
121121
// handling. The asyncContext contains all values from requestContext, but it's
122122
// cancellation has nothing to do with the request, but is determined the timeout
123-
// set in pool constructor.
123+
// set in pool constructor. If the pool has reached max concurrency, the job will
124+
// be executed in the current goroutine. In other word, the job will be executed
125+
// synchronously.
124126
func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context)) {
125-
p.ch <- job{ctx: requestContext, function: function}
127+
select {
128+
case p.ch <- job{ctx: requestContext, function: function}:
129+
default:
130+
cancelCtx, cancel := context.WithTimeout(context.Background(), p.timeout)
131+
defer cancel()
132+
newCtx := asyncContext{valueCtx: requestContext, cancelCtx: cancelCtx}
133+
function(newCtx)
134+
}
126135
}
127136

128137
// Run starts the async worker pool and block until it finishes.

pool_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,29 @@ func TestPool_Timeout(t *testing.T) {
4141
p.Run(ctx)
4242
}
4343

44+
func TestPool_FallbackToSyncMode(t *testing.T) {
45+
t.Parallel()
46+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
47+
defer cancel()
48+
49+
dispatcher := events.SyncDispatcher{}
50+
p := newPool(WithTimeout(time.Second), WithConcurrency(1), WithShutdownEvents())(&dispatcher)
51+
p.Run(ctx)
52+
53+
var executed = make(chan struct{})
54+
go func() {
55+
// saturate the pool
56+
p.Go(ctx, func(asyncContext context.Context) {
57+
time.Sleep(time.Second)
58+
})
59+
// fallback to sync mode
60+
p.Go(ctx, func(asyncContext context.Context) {
61+
close(executed)
62+
})
63+
}()
64+
<-executed
65+
}
66+
4467
func TestPool_contextValue(t *testing.T) {
4568
t.Parallel()
4669
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)

0 commit comments

Comments
 (0)