Skip to content

Commit

Permalink
use semaphore to control the call to async produce
Browse files Browse the repository at this point in the history
Signed-off-by: 3AceShowHand <[email protected]>
  • Loading branch information
3AceShowHand committed Dec 4, 2024
1 parent 4178837 commit 2f00b62
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package sarama

import (
"context"
"encoding/binary"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"math"
"sync"
"time"
Expand Down Expand Up @@ -777,6 +779,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
bridge = make(chan *produceSet)
pending = make(chan *brokerProducerResponse)
responses = make(chan *brokerProducerResponse)
weighted = semaphore.NewWeighted(int64(p.conf.Net.MaxOpenRequests))
)

bp := &brokerProducer{
Expand All @@ -790,13 +793,14 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
}
go withRecover(bp.run)

ctx := context.Background()
// minimal bridge to make the network response `select`able
go withRecover(func() {
// Use a wait group to know if we still have in flight requests
var wg sync.WaitGroup

for set := range bridge {
request := set.buildRequest()
_ = weighted.Acquire(ctx, 1)

// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)
Expand All @@ -810,6 +814,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
res: response,
}
wg.Done()
weighted.Release(1)
}
}(set)

Expand Down

0 comments on commit 2f00b62

Please sign in to comment.