diff --git a/async_producer.go b/async_producer.go index a6fa3d4a2..b359832cf 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1,9 +1,11 @@ package sarama import ( + "context" "encoding/binary" "errors" "fmt" + "golang.org/x/sync/semaphore" "math" "sync" "time" @@ -777,6 +779,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { bridge = make(chan *produceSet) pending = make(chan *brokerProducerResponse) responses = make(chan *brokerProducerResponse) + weighted = semaphore.NewWeighted(int64(p.conf.Net.MaxOpenRequests)) ) bp := &brokerProducer{ @@ -790,13 +793,14 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { } go withRecover(bp.run) + ctx := context.Background() // minimal bridge to make the network response `select`able go withRecover(func() { // Use a wait group to know if we still have in flight requests var wg sync.WaitGroup - for set := range bridge { request := set.buildRequest() + _ = weighted.Acquire(ctx, 1) // Count the in flight requests to know when we can close the pending channel safely wg.Add(1) @@ -810,6 +814,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { res: response, } wg.Done() + weighted.Release(1) } }(set)