Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: goroutine leak in publisher on close #198

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions internal/connectionmanager/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type ConnectionManager struct {
reconnectionCount uint
reconnectionCountMu *sync.Mutex
dispatcher *dispatcher.Dispatcher

// universalNotifyBlockingReceiver receives block signal from underlying
// connection which are broadcasted to all publisherNotifyBlockingReceivers
universalNotifyBlockingReceiver chan amqp.Blocking
universalNotifyBlockingReceiverUsed bool
publisherNotifyBlockingReceiversMu *sync.RWMutex
publisherNotifyBlockingReceivers []chan amqp.Blocking
}

type Resolver interface {
Expand Down Expand Up @@ -62,17 +69,20 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger
}

connManager := ConnectionManager{
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
universalNotifyBlockingReceiver: make(chan amqp.Blocking),
publisherNotifyBlockingReceiversMu: &sync.RWMutex{},
}
go connManager.startNotifyClose()
go connManager.readUniversalBlockReceiver()
return &connManager, nil
}

Expand Down
43 changes: 38 additions & 5 deletions internal/connectionmanager/safe_wraps.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,43 @@ import (
func (connManager *ConnectionManager) NotifyBlockedSafe(
receiver chan amqp.Blocking,
) chan amqp.Blocking {
connManager.connectionMu.RLock()
defer connManager.connectionMu.RUnlock()
connManager.connectionMu.Lock()
defer connManager.connectionMu.Unlock()

return connManager.connection.NotifyBlocked(
receiver,
)
// add receiver to connection manager.
connManager.publisherNotifyBlockingReceiversMu.Lock()
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers, receiver)
connManager.publisherNotifyBlockingReceiversMu.Unlock()

if !connManager.universalNotifyBlockingReceiverUsed {
connManager.connection.NotifyBlocked(
connManager.universalNotifyBlockingReceiver,
)
connManager.universalNotifyBlockingReceiverUsed = true
}

return receiver
}

// readUniversalBlockReceiver reads on universal blocking receiver and broadcasts event to all blocking receivers of
// connection manager.
func (connManager *ConnectionManager) readUniversalBlockReceiver() {
for b := range connManager.universalNotifyBlockingReceiver {
connManager.publisherNotifyBlockingReceiversMu.RLock()
for _, br := range connManager.publisherNotifyBlockingReceivers {
br <- b
}
connManager.publisherNotifyBlockingReceiversMu.RUnlock()
}
}

func (connManager *ConnectionManager) RemovePublisherBlockingReceiver(receiver chan amqp.Blocking) {
connManager.publisherNotifyBlockingReceiversMu.Lock()
for i, br := range connManager.publisherNotifyBlockingReceivers {
if br == receiver {
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers[:i], connManager.publisherNotifyBlockingReceivers[i+1:]...)
}
}
connManager.publisherNotifyBlockingReceiversMu.Unlock()
close(receiver)
}
3 changes: 3 additions & 0 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Publisher struct {
notifyPublishHandler func(p Confirmation)

options PublisherOptions

blockings chan amqp.Blocking
}

type PublisherConfirmation []*amqp.DeferredConfirmation
Expand Down Expand Up @@ -286,6 +288,7 @@ func (publisher *Publisher) Close() {
publisher.options.Logger.Warnf("error while closing the channel: %v", err)
}
publisher.options.Logger.Infof("closing publisher...")
publisher.connManager.RemovePublisherBlockingReceiver(publisher.blockings)
go func() {
publisher.closeConnectionToManagerCh <- struct{}{}
}()
Expand Down
1 change: 1 addition & 0 deletions publish_flow_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (publisher *Publisher) startNotifyFlowHandler() {
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMu.Lock()
publisher.blockings = blockings
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMu.Unlock()

Expand Down