diff --git a/consume.go b/consume.go index ec1b4b0..805ae66 100644 --- a/consume.go +++ b/consume.go @@ -33,10 +33,10 @@ type Consumer struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions - handlerMux *sync.RWMutex + handlerMu *sync.RWMutex - isClosedMux *sync.RWMutex - isClosed bool + isClosedMu *sync.RWMutex + isClosed bool } // Delivery captures the fields for a previously delivered message resident in @@ -73,7 +73,7 @@ func NewConsumer( reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, options: *options, - isClosedMux: &sync.RWMutex{}, + isClosedMu: &sync.RWMutex{}, isClosed: false, } @@ -92,10 +92,10 @@ func (consumer *Consumer) Run(handler Handler) error { } handler = func(d Delivery) (action Action) { - if !consumer.handlerMux.TryRLock() { + if !consumer.handlerMu.TryRLock() { return NackRequeue } - defer consumer.handlerMux.RUnlock() + defer consumer.handlerMu.RUnlock() return handler(d) } @@ -134,8 +134,8 @@ func (consumer *Consumer) Close() { } func (consumer *Consumer) cleanupResources() { - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() + consumer.isClosedMu.Lock() + defer consumer.isClosedMu.Unlock() consumer.isClosed = true // close the channel so that rabbitmq server knows that the // consumer has been stopped. @@ -175,8 +175,8 @@ func (consumer *Consumer) startGoroutines( handler Handler, options ConsumerOptions, ) error { - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() + consumer.isClosedMu.Lock() + defer consumer.isClosedMu.Unlock() err := consumer.chanManager.QosSafe( options.QOSPrefetch, 0, @@ -221,8 +221,8 @@ func (consumer *Consumer) startGoroutines( } func (consumer *Consumer) getIsClosed() bool { - consumer.isClosedMux.RLock() - defer consumer.isClosedMux.RUnlock() + consumer.isClosedMu.RLock() + defer consumer.isClosedMu.RUnlock() return consumer.isClosed } @@ -259,15 +259,13 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti } func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { - if ctx == nil { - ctx = context.Background() - } else if ctx.Err() != nil { + if ctx.Err() != nil { return ctx.Err() } c := make(chan struct{}) go func() { - consumer.handlerMux.Lock() - defer consumer.handlerMux.Unlock() + consumer.handlerMu.Lock() + defer consumer.handlerMu.Unlock() close(c) }() select { diff --git a/go.mod b/go.mod index 5bee736..ec6f231 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.9.0 +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/go.sum b/go.sum index ee6e1e8..da51250 100644 --- a/go.sum +++ b/go.sum @@ -1,48 +1,3 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= -github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= -github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= -github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..f07ab9b 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -13,14 +13,14 @@ import ( // ChannelManager - type ChannelManager struct { - logger logger.Logger - channel *amqp.Channel - connManager *connectionmanager.ConnectionManager - channelMux *sync.RWMutex - reconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMu *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMu *sync.Mutex + dispatcher *dispatcher.Dispatcher } // NewChannelManager creates a new connection manager @@ -31,14 +31,14 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log } chanManager := ChannelManager{ - logger: log, - connManager: connManager, - channel: ch, - channelMux: &sync.RWMutex{}, - reconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + logger: log, + connManager: connManager, + channel: ch, + channelMu: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMu: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil @@ -84,14 +84,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { // GetReconnectionCount - func (chanManager *ChannelManager) GetReconnectionCount() uint { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCountMu.Lock() + defer chanManager.reconnectionCountMu.Unlock() return chanManager.reconnectionCount } func (chanManager *ChannelManager) incrementReconnectionCount() { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCountMu.Lock() + defer chanManager.reconnectionCountMu.Unlock() chanManager.reconnectionCount++ } @@ -113,8 +113,8 @@ func (chanManager *ChannelManager) reconnectLoop() { // reconnect safely closes the current channel and obtains a new one func (chanManager *ChannelManager) reconnect() error { - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() newChannel, err := getNewChannel(chanManager.connManager) if err != nil { return err @@ -131,8 +131,8 @@ func (chanManager *ChannelManager) reconnect() error { // Close safely closes the current channel and connection func (chanManager *ChannelManager) Close() error { chanManager.logger.Infof("closing channel manager...") - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() err := chanManager.channel.Close() if err != nil { diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index b75a5f5..8d02019 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -16,8 +16,8 @@ func (chanManager *ChannelManager) ConsumeSafe( noWait bool, args amqp.Table, ) (<-chan amqp.Delivery, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.Consume( queue, @@ -39,8 +39,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe( noWait bool, args amqp.Table, ) (amqp.Queue, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueDeclarePassive( name, @@ -56,8 +56,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe( func (chanManager *ChannelManager) QueueDeclareSafe( name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, ) (amqp.Queue, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueDeclare( name, @@ -73,8 +73,8 @@ func (chanManager *ChannelManager) QueueDeclareSafe( func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.ExchangeDeclarePassive( name, @@ -91,8 +91,8 @@ func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( func (chanManager *ChannelManager) ExchangeDeclareSafe( name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.ExchangeDeclare( name, @@ -109,8 +109,8 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe( func (chanManager *ChannelManager) QueueBindSafe( name string, key string, exchange string, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueBind( name, @@ -125,8 +125,8 @@ func (chanManager *ChannelManager) QueueBindSafe( func (chanManager *ChannelManager) QosSafe( prefetchCount int, prefetchSize int, global bool, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.Qos( prefetchCount, @@ -141,8 +141,8 @@ PublishSafe safely wraps the (*amqp.Channel).Publish method. func (chanManager *ChannelManager) PublishSafe( exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithContext( context.Background(), @@ -158,8 +158,8 @@ func (chanManager *ChannelManager) PublishSafe( func (chanManager *ChannelManager) PublishWithContextSafe( ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithContext( ctx, @@ -174,8 +174,8 @@ func (chanManager *ChannelManager) PublishWithContextSafe( func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) (*amqp.DeferredConfirmation, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithDeferredConfirmWithContext( ctx, @@ -191,8 +191,8 @@ func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( func (chanManager *ChannelManager) NotifyReturnSafe( c chan amqp.Return, ) chan amqp.Return { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyReturn( c, @@ -203,8 +203,8 @@ func (chanManager *ChannelManager) NotifyReturnSafe( func (chanManager *ChannelManager) ConfirmSafe( noWait bool, ) error { - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() return chanManager.channel.Confirm( noWait, @@ -215,8 +215,8 @@ func (chanManager *ChannelManager) ConfirmSafe( func (chanManager *ChannelManager) NotifyPublishSafe( confirm chan amqp.Confirmation, ) chan amqp.Confirmation { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyPublish( confirm, @@ -227,8 +227,8 @@ func (chanManager *ChannelManager) NotifyPublishSafe( func (chanManager *ChannelManager) NotifyFlowSafe( c chan bool, ) chan bool { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyFlow( c, diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 541c57f..8e659ea 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -13,15 +13,15 @@ import ( // ConnectionManager - type ConnectionManager struct { - logger logger.Logger - resolver Resolver - connection *amqp.Connection - amqpConfig amqp.Config - connectionMux *sync.RWMutex - ReconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + resolver Resolver + connection *amqp.Connection + amqpConfig amqp.Config + connectionMu *sync.RWMutex + ReconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMu *sync.Mutex + dispatcher *dispatcher.Dispatcher } type Resolver interface { @@ -56,15 +56,15 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger } connManager := ConnectionManager{ - logger: log, - resolver: resolver, - connection: conn, - amqpConfig: conf, - connectionMux: &sync.RWMutex{}, - ReconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + logger: log, + resolver: resolver, + connection: conn, + amqpConfig: conf, + connectionMu: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMu: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), } go connManager.startNotifyClose() return &connManager, nil @@ -73,8 +73,8 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger // Close safely closes the current channel and connection func (connManager *ConnectionManager) Close() error { connManager.logger.Infof("closing connection manager...") - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() + connManager.connectionMu.Lock() + defer connManager.connectionMu.Unlock() err := connManager.connection.Close() if err != nil { @@ -91,13 +91,13 @@ func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- st // CheckoutConnection - func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { - connManager.connectionMux.RLock() + connManager.connectionMu.RLock() return connManager.connection } // CheckinConnection - func (connManager *ConnectionManager) CheckinConnection() { - connManager.connectionMux.RUnlock() + connManager.connectionMu.RUnlock() } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -121,14 +121,14 @@ func (connManager *ConnectionManager) startNotifyClose() { // GetReconnectionCount - func (connManager *ConnectionManager) GetReconnectionCount() uint { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCountMu.Lock() + defer connManager.reconnectionCountMu.Unlock() return connManager.reconnectionCount } func (connManager *ConnectionManager) incrementReconnectionCount() { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCountMu.Lock() + defer connManager.reconnectionCountMu.Unlock() connManager.reconnectionCount++ } @@ -150,8 +150,8 @@ func (connManager *ConnectionManager) reconnectLoop() { // reconnect safely closes the current channel and obtains a new one func (connManager *ConnectionManager) reconnect() error { - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() + connManager.connectionMu.Lock() + defer connManager.connectionMu.Unlock() conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig)) if err != nil { diff --git a/internal/connectionmanager/safe_wraps.go b/internal/connectionmanager/safe_wraps.go index b6702af..6a6abbc 100644 --- a/internal/connectionmanager/safe_wraps.go +++ b/internal/connectionmanager/safe_wraps.go @@ -8,8 +8,8 @@ import ( func (connManager *ConnectionManager) NotifyBlockedSafe( receiver chan amqp.Blocking, ) chan amqp.Blocking { - connManager.connectionMux.RLock() - defer connManager.connectionMux.RUnlock() + connManager.connectionMu.RLock() + defer connManager.connectionMu.RUnlock() return connManager.connection.NotifyBlocked( receiver, diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 52385c6..a4592a1 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -10,8 +10,8 @@ import ( // Dispatcher - type Dispatcher struct { - subscribers map[int]dispatchSubscriber - subscribersMux *sync.Mutex + subscribers map[int]dispatchSubscriber + subscribersMu *sync.Mutex } type dispatchSubscriber struct { @@ -22,15 +22,15 @@ type dispatchSubscriber struct { // NewDispatcher - func NewDispatcher() *Dispatcher { return &Dispatcher{ - subscribers: make(map[int]dispatchSubscriber), - subscribersMux: &sync.Mutex{}, + subscribers: make(map[int]dispatchSubscriber), + subscribersMu: &sync.Mutex{}, } } // Dispatch - func (d *Dispatcher) Dispatch(err error) error { - d.subscribersMux.Lock() - defer d.subscribersMux.Unlock() + d.subscribersMu.Lock() + defer d.subscribersMu.Unlock() for _, subscriber := range d.subscribers { select { case <-time.After(time.Second * 5): @@ -50,17 +50,17 @@ func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}) { closeCh := make(chan struct{}) notifyCancelOrCloseChan := make(chan error) - d.subscribersMux.Lock() + d.subscribersMu.Lock() d.subscribers[id] = dispatchSubscriber{ notifyCancelOrCloseChan: notifyCancelOrCloseChan, closeCh: closeCh, } - d.subscribersMux.Unlock() + d.subscribersMu.Unlock() go func(id int) { <-closeCh - d.subscribersMux.Lock() - defer d.subscribersMux.Unlock() + d.subscribersMu.Lock() + defer d.subscribersMu.Unlock() sub, ok := d.subscribers[id] if !ok { return diff --git a/internal/dispatcher/dispatcher_test.go b/internal/dispatcher/dispatcher_test.go index afc5509..3e80dcb 100644 --- a/internal/dispatcher/dispatcher_test.go +++ b/internal/dispatcher/dispatcher_test.go @@ -10,8 +10,8 @@ func TestNewDispatcher(t *testing.T) { if d.subscribers == nil { t.Error("Dispatcher subscribers is nil") } - if d.subscribersMux == nil { - t.Error("Dispatcher subscribersMux is nil") + if d.subscribersMu == nil { + t.Error("Dispatcher subscribersMu is nil") } } diff --git a/publish.go b/publish.go index a58b48d..06f9cb0 100644 --- a/publish.go +++ b/publish.go @@ -47,13 +47,13 @@ type Publisher struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} - disablePublishDueToFlow bool - disablePublishDueToFlowMux *sync.RWMutex + disablePublishDueToFlow bool + disablePublishDueToFlowMu *sync.RWMutex - disablePublishDueToBlocked bool - disablePublishDueToBlockedMux *sync.RWMutex + disablePublishDueToBlocked bool + disablePublishDueToBlockedMu *sync.RWMutex - handlerMux *sync.Mutex + handlerMu *sync.Mutex notifyReturnHandler func(r Return) notifyPublishHandler func(p Confirmation) @@ -85,18 +85,18 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe reconnectErrCh, closeCh := chanManager.NotifyReconnect() publisher := &Publisher{ - chanManager: chanManager, - connManager: conn.connectionManager, - reconnectErrCh: reconnectErrCh, - closeConnectionToManagerCh: closeCh, - disablePublishDueToFlow: false, - disablePublishDueToFlowMux: &sync.RWMutex{}, - disablePublishDueToBlocked: false, - disablePublishDueToBlockedMux: &sync.RWMutex{}, - handlerMux: &sync.Mutex{}, - notifyReturnHandler: nil, - notifyPublishHandler: nil, - options: *options, + chanManager: chanManager, + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + disablePublishDueToFlow: false, + disablePublishDueToFlowMu: &sync.RWMutex{}, + disablePublishDueToBlocked: false, + disablePublishDueToBlockedMu: &sync.RWMutex{}, + handlerMu: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, + options: *options, } err = publisher.startup() @@ -155,14 +155,14 @@ func (publisher *Publisher) PublishWithContext( routingKeys []string, optionFuncs ...func(*PublishOptions), ) error { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() + publisher.disablePublishDueToFlowMu.RLock() + defer publisher.disablePublishDueToFlowMu.RUnlock() if publisher.disablePublishDueToFlow { return fmt.Errorf("publishing blocked due to high flow on the server") } - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() + publisher.disablePublishDueToBlockedMu.RLock() + defer publisher.disablePublishDueToBlockedMu.RUnlock() if publisher.disablePublishDueToBlocked { return fmt.Errorf("publishing blocked due to TCP block on the server") } @@ -219,14 +219,14 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( routingKeys []string, optionFuncs ...func(*PublishOptions), ) (PublisherConfirmation, error) { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() + publisher.disablePublishDueToFlowMu.RLock() + defer publisher.disablePublishDueToFlowMu.RUnlock() if publisher.disablePublishDueToFlow { return nil, fmt.Errorf("publishing blocked due to high flow on the server") } - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() + publisher.disablePublishDueToBlockedMu.RLock() + defer publisher.disablePublishDueToBlockedMu.RUnlock() if publisher.disablePublishDueToBlocked { return nil, fmt.Errorf("publishing blocked due to TCP block on the server") } @@ -296,10 +296,10 @@ func (publisher *Publisher) Close() { // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyReturn(handler func(r Return)) { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() start := publisher.notifyReturnHandler == nil publisher.notifyReturnHandler = handler - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() if start { publisher.startReturnHandler() @@ -310,10 +310,10 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) { // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() shouldStart := publisher.notifyPublishHandler == nil publisher.notifyPublishHandler = handler - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() if shouldStart { publisher.startPublishHandler() @@ -321,12 +321,12 @@ func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { } func (publisher *Publisher) startReturnHandler() { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() if publisher.notifyReturnHandler == nil { - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() return } - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() go func() { returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) @@ -337,12 +337,12 @@ func (publisher *Publisher) startReturnHandler() { } func (publisher *Publisher) startPublishHandler() { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() if publisher.notifyPublishHandler == nil { - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() return } - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() publisher.chanManager.ConfirmSafe(false) go func() { diff --git a/publish_flow_block.go b/publish_flow_block.go index 5033037..b978a21 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -6,12 +6,12 @@ import ( func (publisher *Publisher) startNotifyFlowHandler() { notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool)) - publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlowMu.Lock() publisher.disablePublishDueToFlow = false - publisher.disablePublishDueToFlowMux.Unlock() + publisher.disablePublishDueToFlowMu.Unlock() for ok := range notifyFlowChan { - publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlowMu.Lock() if ok { publisher.options.Logger.Warnf("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true @@ -19,18 +19,18 @@ func (publisher *Publisher) startNotifyFlowHandler() { publisher.disablePublishDueToFlow = false publisher.options.Logger.Warnf("resuming publishing due to flow request from server") } - publisher.disablePublishDueToFlowMux.Unlock() + publisher.disablePublishDueToFlowMu.Unlock() } } func (publisher *Publisher) startNotifyBlockedHandler() { blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking)) - publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlockedMu.Lock() publisher.disablePublishDueToBlocked = false - publisher.disablePublishDueToBlockedMux.Unlock() + publisher.disablePublishDueToBlockedMu.Unlock() for b := range blockings { - publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlockedMu.Lock() if b.Active { publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server") publisher.disablePublishDueToBlocked = true @@ -38,6 +38,6 @@ func (publisher *Publisher) startNotifyBlockedHandler() { publisher.disablePublishDueToBlocked = false publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server") } - publisher.disablePublishDueToBlockedMux.Unlock() + publisher.disablePublishDueToBlockedMu.Unlock() } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md index db633d4..fd03c1f 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -1,5 +1,73 @@ # Changelog +## [v1.10.0](https://github.com/rabbitmq/amqp091-go/tree/v1.10.0) (2024-05-08) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.9.0...v1.10.0) + +**Implemented enhancements:** + +- Undeprecate non-context publish functions [\#259](https://github.com/rabbitmq/amqp091-go/pull/259) ([Zerpet](https://github.com/Zerpet)) +- Update Go directive [\#257](https://github.com/rabbitmq/amqp091-go/pull/257) ([Zerpet](https://github.com/Zerpet)) + +**Fixed bugs:** + +- republishing on reconnect bug in the example [\#249](https://github.com/rabbitmq/amqp091-go/issues/249) +- Channel Notify Close not receive event when connection is closed by RMQ server. [\#241](https://github.com/rabbitmq/amqp091-go/issues/241) +- Inconsistent documentation [\#231](https://github.com/rabbitmq/amqp091-go/issues/231) +- Data race in the client example [\#72](https://github.com/rabbitmq/amqp091-go/issues/72) +- Fix string function of URI [\#258](https://github.com/rabbitmq/amqp091-go/pull/258) ([Zerpet](https://github.com/Zerpet)) + +**Closed issues:** + +- Documentation needed \(`PublishWithContext` does not use context\) [\#195](https://github.com/rabbitmq/amqp091-go/issues/195) +- concurrent dispatch data race [\#226](https://github.com/rabbitmq/amqp091-go/issues/226) + +**Merged pull requests:** + +- Fix data race in example [\#260](https://github.com/rabbitmq/amqp091-go/pull/260) ([Zerpet](https://github.com/Zerpet)) +- Address CodeQL warning [\#252](https://github.com/rabbitmq/amqp091-go/pull/252) ([lukebakken](https://github.com/lukebakken)) +- Add support for additional AMQP URI query parameters [\#251](https://github.com/rabbitmq/amqp091-go/pull/251) ([vilius-g](https://github.com/vilius-g)) +- Example fix [\#250](https://github.com/rabbitmq/amqp091-go/pull/250) ([Boris-Plato](https://github.com/Boris-Plato)) +- Increasing the code coverage [\#248](https://github.com/rabbitmq/amqp091-go/pull/248) ([edercarloscosta](https://github.com/edercarloscosta)) +- Use correct mutex to guard confirms.published [\#240](https://github.com/rabbitmq/amqp091-go/pull/240) ([hjr265](https://github.com/hjr265)) +- Documenting Publishing.Expiration usage [\#232](https://github.com/rabbitmq/amqp091-go/pull/232) ([niksteff](https://github.com/niksteff)) +- fix comment typo in example\_client\_test.go [\#228](https://github.com/rabbitmq/amqp091-go/pull/228) ([wisaTong](https://github.com/wisaTong)) +- Bump go.uber.org/goleak from 1.2.1 to 1.3.0 [\#227](https://github.com/rabbitmq/amqp091-go/pull/227) ([dependabot[bot]](https://github.com/apps/dependabot)) + +## [v1.9.0](https://github.com/rabbitmq/amqp091-go/tree/v1.9.0) (2023-10-02) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.1...v1.9.0) + +**Implemented enhancements:** + +- Use of buffered delivery channels when prefetch\_count is not null [\#200](https://github.com/rabbitmq/amqp091-go/issues/200) + +**Fixed bugs:** + +- connection block when write connection reset by peer [\#222](https://github.com/rabbitmq/amqp091-go/issues/222) +- Test failure on 32bit architectures [\#202](https://github.com/rabbitmq/amqp091-go/issues/202) + +**Closed issues:** + +- Add a constant to set consumer timeout as queue argument [\#201](https://github.com/rabbitmq/amqp091-go/issues/201) +- Add a constant for CQ version [\#199](https://github.com/rabbitmq/amqp091-go/issues/199) +- Examples may need to be updated after \#140 [\#153](https://github.com/rabbitmq/amqp091-go/issues/153) + +**Merged pull requests:** + +- Update spec091.go [\#224](https://github.com/rabbitmq/amqp091-go/pull/224) ([pinkfish](https://github.com/pinkfish)) +- Closes 222 [\#223](https://github.com/rabbitmq/amqp091-go/pull/223) ([yywing](https://github.com/yywing)) +- Update write.go [\#221](https://github.com/rabbitmq/amqp091-go/pull/221) ([pinkfish](https://github.com/pinkfish)) +- Bump versions [\#219](https://github.com/rabbitmq/amqp091-go/pull/219) ([lukebakken](https://github.com/lukebakken)) +- remove extra word 'accept' from ExchangeDeclare description [\#217](https://github.com/rabbitmq/amqp091-go/pull/217) ([a-sabzian](https://github.com/a-sabzian)) +- Misc Windows CI updates [\#216](https://github.com/rabbitmq/amqp091-go/pull/216) ([lukebakken](https://github.com/lukebakken)) +- Stop using deprecated Publish function [\#207](https://github.com/rabbitmq/amqp091-go/pull/207) ([Zerpet](https://github.com/Zerpet)) +- Constant for consumer timeout queue argument [\#206](https://github.com/rabbitmq/amqp091-go/pull/206) ([Zerpet](https://github.com/Zerpet)) +- Add a constant for CQ v2 queue argument [\#205](https://github.com/rabbitmq/amqp091-go/pull/205) ([Zerpet](https://github.com/Zerpet)) +- Fix example for 32-bit compatibility [\#204](https://github.com/rabbitmq/amqp091-go/pull/204) ([Zerpet](https://github.com/Zerpet)) +- Fix to increase timeout milliseconds since it's too tight [\#203](https://github.com/rabbitmq/amqp091-go/pull/203) ([t2y](https://github.com/t2y)) +- Add Channel.ConsumeWithContext to be able to cancel delivering [\#192](https://github.com/rabbitmq/amqp091-go/pull/192) ([t2y](https://github.com/t2y)) + ## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1) diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index 0dcec90..3dfd7fa 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -7,7 +7,6 @@ package amqp091 import ( "context" - "errors" "reflect" "sync" "sync/atomic" @@ -971,9 +970,6 @@ func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table /* QueueUnbind removes a binding between an exchange and queue matching the key and arguments. - -It is possible to send and empty string for the exchange name which means to -unbind the queue from the default exchange. */ func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error { if err := args.Validate(); err != nil { @@ -1487,17 +1483,17 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. - -Deprecated: Use PublishWithContext instead. */ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error { - _, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) + _, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) return err } /* PublishWithContext sends a Publishing from the client to an exchange on the server. +NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. + When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is because every declared queue gets an implicit route to the default exchange. @@ -1527,34 +1523,17 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. */ -func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { - _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) - return err +func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + return ch.Publish(exchange, key, mandatory, immediate, msg) } /* -PublishWithDeferredConfirm behaves identically to Publish but additionally returns a -DeferredConfirmation, allowing the caller to wait on the publisher confirmation -for this message. If the channel has not been put into confirm mode, -the DeferredConfirmation will be nil. - -Deprecated: Use PublishWithDeferredConfirmWithContext instead. +PublishWithDeferredConfirm behaves identically to Publish, but additionally +returns a DeferredConfirmation, allowing the caller to wait on the publisher +confirmation for this message. If the channel has not been put into confirm +mode, the DeferredConfirmation will be nil. */ func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) -} - -/* -PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a -DeferredConfirmation, allowing the caller to wait on the publisher confirmation -for this message. If the channel has not been put into confirm mode, -the DeferredConfirmation will be nil. -*/ -func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - if ctx == nil { - return nil, errors.New("amqp091-go: nil Context") - } - if err := msg.Headers.Validate(); err != nil { return nil, err } @@ -1598,6 +1577,19 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex return dc, nil } +/* +PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. + +NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed +to this function is not honoured. +*/ +func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +} + /* Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be @@ -1829,8 +1821,8 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error { // GetNextPublishSeqNo returns the sequence number of the next message to be // published, when in confirm mode. func (ch *Channel) GetNextPublishSeqNo() uint64 { - ch.confirms.m.Lock() - defer ch.confirms.m.Unlock() + ch.confirms.publishedMut.Lock() + defer ch.confirms.publishedMut.Unlock() return ch.confirms.published + 1 } diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index c8bb820..e167a23 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -28,11 +28,11 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "AMQP 0.9.1 Client" - buildVersion = "1.9.0" + buildVersion = "1.10.0" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. - defaultChannelMax = (2 << 10) - 1 + defaultChannelMax = uint16((2 << 10) - 1) defaultLocale = "en_US" ) @@ -49,7 +49,7 @@ type Config struct { // bindings on the server. Dial sets this to the path parsed from the URL. Vhost string - ChannelMax int // 0 max channels means 2^16 - 1 + ChannelMax uint16 // 0 max channels means 2^16 - 1 FrameSize int // 0 max bytes means unlimited Heartbeat time.Duration // less than 1s uses the server's interval @@ -157,8 +157,7 @@ func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (ne // scheme. It is equivalent to calling DialTLS(amqp, nil). func Dial(url string) (*Connection, error) { return DialConfig(url, Config{ - Heartbeat: defaultHeartbeat, - Locale: defaultLocale, + Locale: defaultLocale, }) } @@ -169,7 +168,6 @@ func Dial(url string) (*Connection, error) { // DialTLS uses the provided tls.Config when encountering an amqps:// scheme. func DialTLS(url string, amqps *tls.Config) (*Connection, error) { return DialConfig(url, Config{ - Heartbeat: defaultHeartbeat, TLSClientConfig: amqps, Locale: defaultLocale, }) @@ -186,7 +184,6 @@ func DialTLS(url string, amqps *tls.Config) (*Connection, error) { // amqps:// scheme. func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) { return DialConfig(url, Config{ - Heartbeat: defaultHeartbeat, TLSClientConfig: amqps, SASL: []Authentication{&ExternalAuth{}}, }) @@ -195,7 +192,9 @@ func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) { // DialConfig accepts a string in the AMQP URI format and a configuration for // the transport and connection setup, returning a new Connection. Defaults to // a server heartbeat interval of 10 seconds and sets the initial read deadline -// to 30 seconds. +// to 30 seconds. The heartbeat interval specified in the AMQP URI takes precedence +// over the value specified in the config. To disable heartbeats, you must use +// the AMQP URI and set heartbeat=0 there. func DialConfig(url string, config Config) (*Connection, error) { var err error var conn net.Conn @@ -206,18 +205,50 @@ func DialConfig(url string, config Config) (*Connection, error) { } if config.SASL == nil { - config.SASL = []Authentication{uri.PlainAuth()} + if uri.AuthMechanism != nil { + for _, identifier := range uri.AuthMechanism { + switch strings.ToUpper(identifier) { + case "PLAIN": + config.SASL = append(config.SASL, uri.PlainAuth()) + case "AMQPLAIN": + config.SASL = append(config.SASL, uri.AMQPlainAuth()) + case "EXTERNAL": + config.SASL = append(config.SASL, &ExternalAuth{}) + default: + return nil, fmt.Errorf("unsupported auth_mechanism: %v", identifier) + } + } + } else { + config.SASL = []Authentication{uri.PlainAuth()} + } } if config.Vhost == "" { config.Vhost = uri.Vhost } + if uri.Heartbeat.hasValue { + config.Heartbeat = uri.Heartbeat.value + } else { + if config.Heartbeat == 0 { + config.Heartbeat = defaultHeartbeat + } + } + + if config.ChannelMax == 0 { + config.ChannelMax = uri.ChannelMax + } + + connectionTimeout := defaultConnectionTimeout + if uri.ConnectionTimeout != 0 { + connectionTimeout = time.Duration(uri.ConnectionTimeout) * time.Millisecond + } + addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10)) dialer := config.Dial if dialer == nil { - dialer = DefaultDial(defaultConnectionTimeout) + dialer = DefaultDial(connectionTimeout) } conn, err = dialer("tcp", addr) @@ -991,13 +1022,13 @@ func (c *Connection) openTune(config Config, auth Authentication) error { // When the server and client both use default 0, then the max channel is // only limited by uint16. - c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax)) + c.Config.ChannelMax = pickUInt16(config.ChannelMax, tune.ChannelMax) if c.Config.ChannelMax == 0 { c.Config.ChannelMax = defaultChannelMax } - c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax) + c.Config.ChannelMax = minUInt16(c.Config.ChannelMax, maxChannelMax) - c.allocator = newAllocator(1, c.Config.ChannelMax) + c.allocator = newAllocator(1, int(c.Config.ChannelMax)) c.m.Unlock() @@ -1104,6 +1135,13 @@ func max(a, b int) int { return b } +func maxUInt16(a, b uint16) uint16 { + if a > b { + return a + } + return b +} + func min(a, b int) int { if a < b { return a @@ -1111,6 +1149,21 @@ func min(a, b int) int { return b } +func minUInt16(a, b uint16) uint16 { + if a < b { + return a + } + return b +} + +func pickUInt16(client, server uint16) uint16 { + if client == 0 || server == 0 { + return maxUInt16(client, server) + } else { + return minUInt16(client, server) + } +} + func pick(client, server int) int { if client == 0 || server == 0 { return max(client, server) diff --git a/vendor/github.com/rabbitmq/amqp091-go/doc.go b/vendor/github.com/rabbitmq/amqp091-go/doc.go index 8cb0b64..461173f 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/doc.go +++ b/vendor/github.com/rabbitmq/amqp091-go/doc.go @@ -95,12 +95,11 @@ prior to calling [Channel.PublishWithContext] or [Channel.Consume]. When Dial encounters an amqps:// scheme, it will use the zero value of a tls.Config. This will only perform server certificate and host verification. -Use DialTLS when you wish to provide a client certificate (recommended), -include a private certificate authority's certificate in the cert chain for -server validity, or run insecure by not verifying the server certificate dial -your own connection. DialTLS will use the provided tls.Config when it -encounters an amqps:// scheme and will dial a plain connection when it -encounters an amqp:// scheme. +Use DialTLS when you wish to provide a client certificate (recommended), include +a private certificate authority's certificate in the cert chain for server +validity, or run insecure by not verifying the server certificate. DialTLS will +use the provided tls.Config when it encounters an amqps:// scheme and will dial +a plain connection when it encounters an amqp:// scheme. SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html @@ -110,17 +109,18 @@ In order to be notified when a connection or channel gets closed, both structures offer the possibility to register channels using [Channel.NotifyClose] and [Connection.NotifyClose] functions: - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error)) + notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) No errors will be sent in case of a graceful connection close. In case of a non-graceful closure due to e.g. network issue, or forced connection closure from the Management UI, the error will be notified synchronously by the library. -The error is sent synchronously to the channel, so that the flow will wait until -the receiver consumes from the channel. To avoid deadlocks in the library, it is -necessary to consume from the channels. This could be done inside a -different goroutine with a select listening on the two channels inside a for -loop like: +The library sends to notification channels just once. After sending a +notification to all channels, the library closes all registered notification +channels. After receiving a notification, the application should create and +register a new channel. To avoid deadlocks in the library, it is necessary to +consume from the channels. This could be done inside a different goroutine with +a select listening on the two channels inside a for loop like: go func() { for notifyConnClose != nil || notifyChanClose != nil { @@ -141,13 +141,8 @@ loop like: } }() -Another approach is to use buffered channels: - - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) - -The library sends to notification channels just once. After sending a notification -to all channels, the library closes all registered notification channels. After -receiving a notification, the application should create and register a new channel. +It is strongly recommended to use buffered channels to avoid deadlocks inside +the library. # Best practises for NotifyPublish notifications: diff --git a/vendor/github.com/rabbitmq/amqp091-go/gen.ps1 b/vendor/github.com/rabbitmq/amqp091-go/gen.ps1 new file mode 100644 index 0000000..c933543 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/gen.ps1 @@ -0,0 +1,14 @@ +$DebugPreference = 'Continue' +$ErrorActionPreference = 'Stop' + +Set-PSDebug -Off +Set-StrictMode -Version 'Latest' -ErrorAction 'Stop' -Verbose + +New-Variable -Name curdir -Option Constant -Value $PSScriptRoot + +$specDir = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec') +$amqpSpecXml = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'amqp0-9-1.stripped.extended.xml') +$gen = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'gen.go') +$spec091 = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec091.go') + +Get-Content -LiteralPath $amqpSpecXml | go run $gen | gofmt | Set-Content -Force -Path $spec091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 8f43a72..1e15ed0 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -144,6 +144,19 @@ const ( flagReserved1 = 0x0004 ) +// Expiration. These constants can be used to set a messages expiration TTL. +// They should be viewed as a clarification of the expiration functionality in +// messages and their usage is not enforced by this pkg. +// +// The server requires a string value that is interpreted by the server as +// milliseconds. If no value is set, which translates to the nil value of +// string, the message will never expire by itself. This does not influence queue +// configured TTL configurations. +const ( + NeverExpire string = "" // empty value means never expire + ImmediatelyExpire string = "0" // 0 means immediately expire +) + // Queue captures the current server state of the queue on the server returned // from Channel.QueueDeclare or Channel.QueueInspect. type Queue struct { @@ -162,18 +175,25 @@ type Publishing struct { Headers Table // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) - Priority uint8 // 0 to 9 - CorrelationId string // correlation identifier - ReplyTo string // address to to reply to (ex: RPC) - Expiration string // message expiration spec - MessageId string // message identifier - Timestamp time.Time // message timestamp - Type string // message type name - UserId string // creating user id - ex: "guest" - AppId string // creating application id + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + Priority uint8 // 0 to 9 + CorrelationId string // correlation identifier + ReplyTo string // address to to reply to (ex: RPC) + // Expiration represents the message TTL in milliseconds. A value of "0" + // indicates that the message will immediately expire if the message arrives + // at its destination and the message is not directly handled by a consumer + // that currently has the capacatity to do so. If you wish the message to + // not expire on its own, set this value to any ttl value, empty string or + // use the corresponding constants NeverExpire and ImmediatelyExpire. This + // does not influence queue configured TTL values. + Expiration string + MessageId string // message identifier + Timestamp time.Time // message timestamp + Type string // message type name + UserId string // creating user id - ex: "guest" + AppId string // creating application id // The application specific payload of the message Body []byte @@ -533,3 +553,16 @@ type bodyFrame struct { } func (f *bodyFrame) channel() uint16 { return f.ChannelId } + +type heartbeatDuration struct { + value time.Duration + hasValue bool +} + +func newHeartbeatDurationFromSeconds(s int) heartbeatDuration { + v := time.Duration(s) * time.Second + return heartbeatDuration{ + value: v, + hasValue: true, + } +} diff --git a/vendor/github.com/rabbitmq/amqp091-go/uri.go b/vendor/github.com/rabbitmq/amqp091-go/uri.go index 87ef09e..ddc4b1a 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/uri.go +++ b/vendor/github.com/rabbitmq/amqp091-go/uri.go @@ -7,6 +7,7 @@ package amqp091 import ( "errors" + "fmt" "net" "net/url" "strconv" @@ -32,16 +33,20 @@ var defaultURI = URI{ // URI represents a parsed AMQP URI string. type URI struct { - Scheme string - Host string - Port int - Username string - Password string - Vhost string - CertFile string // client TLS auth - path to certificate (PEM) - CACertFile string // client TLS auth - path to CA certificate (PEM) - KeyFile string // client TLS auth - path to private key (PEM) - ServerName string // client TLS auth - server name + Scheme string + Host string + Port int + Username string + Password string + Vhost string + CertFile string // client TLS auth - path to certificate (PEM) + CACertFile string // client TLS auth - path to CA certificate (PEM) + KeyFile string // client TLS auth - path to private key (PEM) + ServerName string // client TLS auth - server name + AuthMechanism []string + Heartbeat heartbeatDuration + ConnectionTimeout int + ChannelMax uint16 } // ParseURI attempts to parse the given AMQP URI according to the spec. @@ -62,6 +67,10 @@ type URI struct { // keyfile: // cacertfile: // server_name_indication: +// auth_mechanism: +// heartbeat: +// connection_timeout: +// channel_max: // // If cacertfile is not provided, system CA certificates will be used. // Mutual TLS (client auth) will be enabled only in case keyfile AND certfile provided. @@ -134,6 +143,31 @@ func ParseURI(uri string) (URI, error) { builder.KeyFile = params.Get("keyfile") builder.CACertFile = params.Get("cacertfile") builder.ServerName = params.Get("server_name_indication") + builder.AuthMechanism = params["auth_mechanism"] + + if params.Has("heartbeat") { + value, err := strconv.Atoi(params.Get("heartbeat")) + if err != nil { + return builder, fmt.Errorf("heartbeat is not an integer: %v", err) + } + builder.Heartbeat = newHeartbeatDurationFromSeconds(value) + } + + if params.Has("connection_timeout") { + value, err := strconv.Atoi(params.Get("connection_timeout")) + if err != nil { + return builder, fmt.Errorf("connection_timeout is not an integer: %v", err) + } + builder.ConnectionTimeout = value + } + + if params.Has("channel_max") { + value, err := strconv.ParseUint(params.Get("channel_max"), 10, 16) + if err != nil { + return builder, fmt.Errorf("connection_timeout is not an integer: %v", err) + } + builder.ChannelMax = uint16(value) + } return builder, nil } @@ -192,5 +226,29 @@ func (uri URI) String() string { authority.Path = "/" } + if uri.CertFile != "" || uri.KeyFile != "" || uri.CACertFile != "" || uri.ServerName != "" { + rawQuery := strings.Builder{} + if uri.CertFile != "" { + rawQuery.WriteString("certfile=") + rawQuery.WriteString(uri.CertFile) + rawQuery.WriteRune('&') + } + if uri.KeyFile != "" { + rawQuery.WriteString("keyfile=") + rawQuery.WriteString(uri.KeyFile) + rawQuery.WriteRune('&') + } + if uri.CACertFile != "" { + rawQuery.WriteString("cacertfile=") + rawQuery.WriteString(uri.CACertFile) + rawQuery.WriteRune('&') + } + if uri.ServerName != "" { + rawQuery.WriteString("server_name_indication=") + rawQuery.WriteString(uri.ServerName) + } + authority.RawQuery = rawQuery.String() + } + return authority.String() } diff --git a/vendor/modules.txt b/vendor/modules.txt index c57d426..0286e1c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v1.9.0 -## explicit; go 1.16 +# github.com/rabbitmq/amqp091-go v1.10.0 +## explicit; go 1.20 github.com/rabbitmq/amqp091-go