Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename mux->mu and update amqp #168

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type Consumer struct {
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions
handlerMux *sync.RWMutex
handlerMu *sync.RWMutex

isClosedMux *sync.RWMutex
isClosed bool
isClosedMu *sync.RWMutex
isClosed bool
}

// Delivery captures the fields for a previously delivered message resident in
Expand Down Expand Up @@ -73,7 +73,7 @@ func NewConsumer(
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
isClosedMux: &sync.RWMutex{},
isClosedMu: &sync.RWMutex{},
isClosed: false,
}

Expand All @@ -92,10 +92,10 @@ func (consumer *Consumer) Run(handler Handler) error {
}

handler = func(d Delivery) (action Action) {
if !consumer.handlerMux.TryRLock() {
if !consumer.handlerMu.TryRLock() {
return NackRequeue
}
defer consumer.handlerMux.RUnlock()
defer consumer.handlerMu.RUnlock()
return handler(d)
}

Expand Down Expand Up @@ -134,8 +134,8 @@ func (consumer *Consumer) Close() {
}

func (consumer *Consumer) cleanupResources() {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosedMu.Lock()
defer consumer.isClosedMu.Unlock()
consumer.isClosed = true
// close the channel so that rabbitmq server knows that the
// consumer has been stopped.
Expand Down Expand Up @@ -175,8 +175,8 @@ func (consumer *Consumer) startGoroutines(
handler Handler,
options ConsumerOptions,
) error {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosedMu.Lock()
defer consumer.isClosedMu.Unlock()
err := consumer.chanManager.QosSafe(
options.QOSPrefetch,
0,
Expand Down Expand Up @@ -221,8 +221,8 @@ func (consumer *Consumer) startGoroutines(
}

func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMux.RLock()
defer consumer.isClosedMux.RUnlock()
consumer.isClosedMu.RLock()
defer consumer.isClosedMu.RUnlock()
return consumer.isClosed
}

Expand Down Expand Up @@ -259,15 +259,13 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
}

func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
} else if ctx.Err() != nil {
if ctx.Err() != nil {
return ctx.Err()
}
c := make(chan struct{})
go func() {
consumer.handlerMux.Lock()
defer consumer.handlerMux.Unlock()
consumer.handlerMu.Lock()
defer consumer.handlerMu.Unlock()
close(c)
}()
select {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq

go 1.20

require github.com/rabbitmq/amqp091-go v1.9.0
require github.com/rabbitmq/amqp091-go v1.10.0
51 changes: 3 additions & 48 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
48 changes: 24 additions & 24 deletions internal/channelmanager/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (

// ChannelManager -
type ChannelManager struct {
logger logger.Logger
channel *amqp.Channel
connManager *connectionmanager.ConnectionManager
channelMux *sync.RWMutex
reconnectInterval time.Duration
reconnectionCount uint
reconnectionCountMux *sync.Mutex
dispatcher *dispatcher.Dispatcher
logger logger.Logger
channel *amqp.Channel
connManager *connectionmanager.ConnectionManager
channelMu *sync.RWMutex
reconnectInterval time.Duration
reconnectionCount uint
reconnectionCountMu *sync.Mutex
dispatcher *dispatcher.Dispatcher
}

// NewChannelManager creates a new connection manager
Expand All @@ -31,14 +31,14 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log
}

chanManager := ChannelManager{
logger: log,
connManager: connManager,
channel: ch,
channelMux: &sync.RWMutex{},
reconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMux: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
logger: log,
connManager: connManager,
channel: ch,
channelMu: &sync.RWMutex{},
reconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
}
go chanManager.startNotifyCancelOrClosed()
return &chanManager, nil
Expand Down Expand Up @@ -84,14 +84,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {

// GetReconnectionCount -
func (chanManager *ChannelManager) GetReconnectionCount() uint {
chanManager.reconnectionCountMux.Lock()
defer chanManager.reconnectionCountMux.Unlock()
chanManager.reconnectionCountMu.Lock()
defer chanManager.reconnectionCountMu.Unlock()
return chanManager.reconnectionCount
}

func (chanManager *ChannelManager) incrementReconnectionCount() {
chanManager.reconnectionCountMux.Lock()
defer chanManager.reconnectionCountMux.Unlock()
chanManager.reconnectionCountMu.Lock()
defer chanManager.reconnectionCountMu.Unlock()
chanManager.reconnectionCount++
}

Expand All @@ -113,8 +113,8 @@ func (chanManager *ChannelManager) reconnectLoop() {

// reconnect safely closes the current channel and obtains a new one
func (chanManager *ChannelManager) reconnect() error {
chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock()
chanManager.channelMu.Lock()
defer chanManager.channelMu.Unlock()
newChannel, err := getNewChannel(chanManager.connManager)
if err != nil {
return err
Expand All @@ -131,8 +131,8 @@ func (chanManager *ChannelManager) reconnect() error {
// Close safely closes the current channel and connection
func (chanManager *ChannelManager) Close() error {
chanManager.logger.Infof("closing channel manager...")
chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock()
chanManager.channelMu.Lock()
defer chanManager.channelMu.Unlock()

err := chanManager.channel.Close()
if err != nil {
Expand Down
56 changes: 28 additions & 28 deletions internal/channelmanager/safe_wraps.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func (chanManager *ChannelManager) ConsumeSafe(
noWait bool,
args amqp.Table,
) (<-chan amqp.Delivery, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.Consume(
queue,
Expand All @@ -39,8 +39,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
noWait bool,
args amqp.Table,
) (amqp.Queue, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.QueueDeclarePassive(
name,
Expand All @@ -56,8 +56,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
func (chanManager *ChannelManager) QueueDeclareSafe(
name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table,
) (amqp.Queue, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.QueueDeclare(
name,
Expand All @@ -73,8 +73,8 @@ func (chanManager *ChannelManager) QueueDeclareSafe(
func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.ExchangeDeclarePassive(
name,
Expand All @@ -91,8 +91,8 @@ func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
func (chanManager *ChannelManager) ExchangeDeclareSafe(
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.ExchangeDeclare(
name,
Expand All @@ -109,8 +109,8 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe(
func (chanManager *ChannelManager) QueueBindSafe(
name string, key string, exchange string, noWait bool, args amqp.Table,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.QueueBind(
name,
Expand All @@ -125,8 +125,8 @@ func (chanManager *ChannelManager) QueueBindSafe(
func (chanManager *ChannelManager) QosSafe(
prefetchCount int, prefetchSize int, global bool,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.Qos(
prefetchCount,
Expand All @@ -141,8 +141,8 @@ PublishSafe safely wraps the (*amqp.Channel).Publish method.
func (chanManager *ChannelManager) PublishSafe(
exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.PublishWithContext(
context.Background(),
Expand All @@ -158,8 +158,8 @@ func (chanManager *ChannelManager) PublishSafe(
func (chanManager *ChannelManager) PublishWithContextSafe(
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.PublishWithContext(
ctx,
Expand All @@ -174,8 +174,8 @@ func (chanManager *ChannelManager) PublishWithContextSafe(
func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) (*amqp.DeferredConfirmation, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.PublishWithDeferredConfirmWithContext(
ctx,
Expand All @@ -191,8 +191,8 @@ func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
func (chanManager *ChannelManager) NotifyReturnSafe(
c chan amqp.Return,
) chan amqp.Return {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.NotifyReturn(
c,
Expand All @@ -203,8 +203,8 @@ func (chanManager *ChannelManager) NotifyReturnSafe(
func (chanManager *ChannelManager) ConfirmSafe(
noWait bool,
) error {
chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock()
chanManager.channelMu.Lock()
defer chanManager.channelMu.Unlock()

return chanManager.channel.Confirm(
noWait,
Expand All @@ -215,8 +215,8 @@ func (chanManager *ChannelManager) ConfirmSafe(
func (chanManager *ChannelManager) NotifyPublishSafe(
confirm chan amqp.Confirmation,
) chan amqp.Confirmation {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.NotifyPublish(
confirm,
Expand All @@ -227,8 +227,8 @@ func (chanManager *ChannelManager) NotifyPublishSafe(
func (chanManager *ChannelManager) NotifyFlowSafe(
c chan bool,
) chan bool {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
chanManager.channelMu.RLock()
defer chanManager.channelMu.RUnlock()

return chanManager.channel.NotifyFlow(
c,
Expand Down
Loading
Loading