Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs and multiple exchanges #154

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# go-rabbitmq

Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐
A wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by [Boot.dev](https://boot.dev)

Expand Down Expand Up @@ -103,7 +103,7 @@ See the [examples](examples) directory for more ideas.

* By default, queues are declared if they didn't already exist by new consumers
* By default, routing-key bindings are declared by consumers if you're using `WithConsumerOptionsRoutingKey`
* By default, exchanges are *not* declared by publishers or consumers if they didn't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`.
* By default, exchanges are *not* declared by publishers or consumers if they don't already exist, hence `WithPublisherOptionsExchangeDeclare` and `WithConsumerOptionsExchangeDeclare`.

Read up on all the options in the GoDoc, there are quite a few of them. I try to pick sane and simple defaults.

Expand Down
8 changes: 5 additions & 3 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ func (consumer *Consumer) startGoroutines(
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
for _, exchangeOption := range options.ExchangeOptions {
err = declareExchange(consumer.chanManager, exchangeOption)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
Expand Down
86 changes: 57 additions & 29 deletions consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
Args: Table{},
Declare: true,
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
},
Bindings: []Binding{},
Concurrency: 1,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
ExchangeOptions: []ExchangeOptions{},
Concurrency: 1,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
}
}

func getDefaultExchangeOptions() ExchangeOptions {
return ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
Bindings: []Binding{},
}
}

Expand All @@ -60,8 +64,7 @@ func getDefaultBindingOptions() BindingOptions {
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
Bindings []Binding
ExchangeOptions []ExchangeOptions
Concurrency int
Logger logger.Logger
QOSPrefetch int
Expand Down Expand Up @@ -144,61 +147,77 @@ func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
}
}

func ensureExchangeOptions(options *ConsumerOptions) {
if len(options.ExchangeOptions) == 0 {
options.ExchangeOptions = append(options.ExchangeOptions, getDefaultExchangeOptions())
}
}

// WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Name = name
ensureExchangeOptions(options)
options.ExchangeOptions[0].Name = name
}
}

// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Kind = kind
ensureExchangeOptions(options)
options.ExchangeOptions[0].Kind = kind
}
}

// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
options.ExchangeOptions.Durable = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Durable = true
}

// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
options.ExchangeOptions.AutoDelete = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].AutoDelete = true
}

// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
options.ExchangeOptions.Internal = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Internal = true
}

// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
options.ExchangeOptions.NoWait = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].NoWait = true
}

// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
options.ExchangeOptions.Declare = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Declare = true
}

// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
options.ExchangeOptions.Passive = true
ensureExchangeOptions(options)
options.ExchangeOptions[0].Passive = true
}

// WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Args = args
ensureExchangeOptions(options)
options.ExchangeOptions[0].Args = args
}
}

// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, Binding{
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
Expand All @@ -210,7 +229,16 @@ func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, binding)
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, binding)
}
}

// WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be
// used if you want to to consume from multiple exchanges on the same consumer
func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions = append(options.ExchangeOptions, exchangeOptions)
}
}

Expand Down
28 changes: 15 additions & 13 deletions declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,21 @@ func declareExchange(chanManager *channelmanager.ChannelManager, options Exchang
}

func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error {
for _, binding := range options.Bindings {
if !binding.Declare {
continue
}
err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
options.ExchangeOptions.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
for _, exchangeOption := range options.ExchangeOptions {
for _, binding := range exchangeOption.Bindings {
if !binding.Declare {
continue
}
err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
exchangeOption.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
}
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions exchange_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type ExchangeOptions struct {
Passive bool // if false, a missing exchange will be created on the server
Args Table
Declare bool
Bindings []Binding
}
2 changes: 1 addition & 1 deletion publisher_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) {
options.ExchangeOptions.NoWait = true
}

// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance
// WithPublisherOptionsExchangeDeclare will create the exchange if it doesn't exist
func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) {
options.ExchangeOptions.Declare = true
}
Expand Down
Loading