From 28863cc450f1875e8ae137a5db2147bf68f73e85 Mon Sep 17 00:00:00 2001 From: wagslane Date: Mon, 4 Mar 2024 09:39:56 -0700 Subject: [PATCH] docs and multiple exchanges --- README.md | 4 +-- consume.go | 8 +++-- consumer_options.go | 86 +++++++++++++++++++++++++++++--------------- declare.go | 28 ++++++++------- exchange_options.go | 1 + publisher_options.go | 2 +- 6 files changed, 81 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 00e9ce9..f23b704 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # go-rabbitmq -Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ +A wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supported by [Boot.dev](https://boot.dev) @@ -103,7 +103,7 @@ See the [examples](examples) directory for more ideas. * By default, queues are declared if they didn't already exist by new consumers * By default, routing-key bindings are declared by consumers if you're using `WithConsumerOptionsRoutingKey` -* By default, exchanges are *not* declared by publishers or consumers if they didn't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`. +* By default, exchanges are *not* declared by publishers or consumers if they don't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`. Read up on all the options in the GoDoc, there are quite a few of them. I try to pick sane and simple defaults. diff --git a/consume.go b/consume.go index d1f802f..3d05a77 100644 --- a/consume.go +++ b/consume.go @@ -140,9 +140,11 @@ func (consumer *Consumer) startGoroutines( if err != nil { return fmt.Errorf("declare qos failed: %w", err) } - err = declareExchange(consumer.chanManager, options.ExchangeOptions) - if err != nil { - return fmt.Errorf("declare exchange failed: %w", err) + for _, exchangeOption := range options.ExchangeOptions { + err = declareExchange(consumer.chanManager, exchangeOption) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } } err = declareQueue(consumer.chanManager, options.QueueOptions) if err != nil { diff --git a/consumer_options.go b/consumer_options.go index 80f2979..7de85cb 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -26,22 +26,26 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { Args: Table{}, Declare: true, }, - ExchangeOptions: ExchangeOptions{ - Name: "", - Kind: amqp.ExchangeDirect, - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: false, - }, - Bindings: []Binding{}, - Concurrency: 1, - Logger: stdDebugLogger{}, - QOSPrefetch: 10, - QOSGlobal: false, + ExchangeOptions: []ExchangeOptions{}, + Concurrency: 1, + Logger: stdDebugLogger{}, + QOSPrefetch: 10, + QOSGlobal: false, + } +} + +func getDefaultExchangeOptions() ExchangeOptions { + return ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + Bindings: []Binding{}, } } @@ -60,8 +64,7 @@ func getDefaultBindingOptions() BindingOptions { type ConsumerOptions struct { RabbitConsumerOptions RabbitConsumerOptions QueueOptions QueueOptions - ExchangeOptions ExchangeOptions - Bindings []Binding + ExchangeOptions []ExchangeOptions Concurrency int Logger logger.Logger QOSPrefetch int @@ -144,61 +147,77 @@ func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) { } } +func ensureExchangeOptions(options *ConsumerOptions) { + if len(options.ExchangeOptions) == 0 { + options.ExchangeOptions = append(options.ExchangeOptions, getDefaultExchangeOptions()) + } +} + // WithConsumerOptionsExchangeName sets the exchange name func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.ExchangeOptions.Name = name + ensureExchangeOptions(options) + options.ExchangeOptions[0].Name = name } } // WithConsumerOptionsExchangeKind ensures the queue is a durable queue func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.ExchangeOptions.Kind = kind + ensureExchangeOptions(options) + options.ExchangeOptions[0].Kind = kind } } // WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) { - options.ExchangeOptions.Durable = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].Durable = true } // WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) { - options.ExchangeOptions.AutoDelete = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].AutoDelete = true } // WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) { - options.ExchangeOptions.Internal = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].Internal = true } // WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) { - options.ExchangeOptions.NoWait = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].NoWait = true } // WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) { - options.ExchangeOptions.Declare = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].Declare = true } // WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange func WithConsumerOptionsExchangePassive(options *ConsumerOptions) { - options.ExchangeOptions.Passive = true + ensureExchangeOptions(options) + options.ExchangeOptions[0].Passive = true } // WithConsumerOptionsExchangeArgs adds optional args to the exchange func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.ExchangeOptions.Args = args + ensureExchangeOptions(options) + options.ExchangeOptions[0].Args = args } } // WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.Bindings = append(options.Bindings, Binding{ + ensureExchangeOptions(options) + options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{ RoutingKey: routingKey, BindingOptions: getDefaultBindingOptions(), }) @@ -210,7 +229,16 @@ func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { // the zero value. If you want to declare your bindings for example, be sure to set Declare=true func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.Bindings = append(options.Bindings, binding) + ensureExchangeOptions(options) + options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, binding) + } +} + +// WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be +// used if you want to to consume from multiple exchanges on the same consumer +func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions = append(options.ExchangeOptions, exchangeOptions) } } diff --git a/declare.go b/declare.go index 86abe85..dabd344 100644 --- a/declare.go +++ b/declare.go @@ -71,19 +71,21 @@ func declareExchange(chanManager *channelmanager.ChannelManager, options Exchang } func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error { - for _, binding := range options.Bindings { - if !binding.Declare { - continue - } - err := chanManager.QueueBindSafe( - options.QueueOptions.Name, - binding.RoutingKey, - options.ExchangeOptions.Name, - binding.NoWait, - tableToAMQPTable(binding.Args), - ) - if err != nil { - return err + for _, exchangeOption := range options.ExchangeOptions { + for _, binding := range exchangeOption.Bindings { + if !binding.Declare { + continue + } + err := chanManager.QueueBindSafe( + options.QueueOptions.Name, + binding.RoutingKey, + exchangeOption.Name, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err + } } } return nil diff --git a/exchange_options.go b/exchange_options.go index cf5e255..4cc7ad9 100644 --- a/exchange_options.go +++ b/exchange_options.go @@ -13,4 +13,5 @@ type ExchangeOptions struct { Passive bool // if false, a missing exchange will be created on the server Args Table Declare bool + Bindings []Binding } diff --git a/publisher_options.go b/publisher_options.go index eb283e4..17d42cf 100644 --- a/publisher_options.go +++ b/publisher_options.go @@ -77,7 +77,7 @@ func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) { options.ExchangeOptions.NoWait = true } -// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance +// WithPublisherOptionsExchangeDeclare will create the exchange if it doesn't exist func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) { options.ExchangeOptions.Declare = true }