Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for any kind of declarations when reconnecting #152

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func NewConsumer(
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
options := defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
optionFunc(&options)
}

if conn.connectionManager == nil {
Expand All @@ -73,14 +73,14 @@ func NewConsumer(
chanManager: chanManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
options: options,
isClosedMux: &sync.RWMutex{},
isClosed: false,
}

err = consumer.startGoroutines(
handler,
*options,
options,
)
if err != nil {
return nil, err
Expand All @@ -91,7 +91,7 @@ func NewConsumer(
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,
options,
)
if err != nil {
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
Expand Down Expand Up @@ -140,21 +140,19 @@ func (consumer *Consumer) startGoroutines(
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)

err = declareAll(consumer.chanManager, declareOptions{
Queues: options.Queues,
Exchanges: options.Exchanges,
Bindings: options.Bindings,
})

if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
return err
}

msgs, err := consumer.chanManager.ConsumeSafe(
options.QueueOptions.Name,
options.QueueName,
options.RabbitConsumerOptions.Name,
options.RabbitConsumerOptions.AutoAck,
options.RabbitConsumerOptions.Exclusive,
Expand Down
178 changes: 125 additions & 53 deletions consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,30 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
NoLocal: false,
Args: Table{},
},
QueueOptions: QueueOptions{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
Queues: []QueueOptions{
{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
Exchanges: []ExchangeOptions{
{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
},
},
Bindings: []Binding{},
Concurrency: 1,
Expand All @@ -59,8 +63,9 @@ func getDefaultBindingOptions() BindingOptions {
// If there are Bindings, the queue will be bound to them
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
QueueName string
Queues []QueueOptions
Exchanges []ExchangeOptions
Bindings []Binding
Concurrency int
Logger logger.Logger
Expand Down Expand Up @@ -93,105 +98,126 @@ type QueueOptions struct {
Declare bool
}

// Binding describes the bhinding of a queue to a routing key on an exchange
type Binding struct {
RoutingKey string
BindingOptions
}

// BindingOptions describes the options a binding can have
type BindingOptions struct {
NoWait bool
Args Table
Declare bool
}

// WithConsumerOptionsQueueDurable ensures the queue is a durable queue
func WithConsumerOptionsQueueDurable(options *ConsumerOptions) {
options.QueueOptions.Durable = true
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.Durable = true
})
}

// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) {
options.QueueOptions.AutoDelete = true
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.AutoDelete = true
})
}

// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) {
options.QueueOptions.Exclusive = true
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.Exclusive = true
})
}

// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) {
options.QueueOptions.NoWait = true
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.NoWait = true
})
}

// WithConsumerOptionsQueuePassive ensures the queue is a passive queue
func WithConsumerOptionsQueuePassive(options *ConsumerOptions) {
options.QueueOptions.Passive = true
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.Passive = true
})
}

// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's
// existance upon startup
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) {
options.QueueOptions.Declare = false
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.Declare = false
})
}

// WithConsumerOptionsQueueArgs adds optional args to the queue
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.QueueOptions.Args = args
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
queueOptions.Args = args
})
}
}

// WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {

return func(options *ConsumerOptions) {
options.ExchangeOptions.Name = name
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Name = name
})
}
}

// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {

return func(options *ConsumerOptions) {
options.ExchangeOptions.Kind = kind
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Kind = kind
})
}
}

// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
options.ExchangeOptions.Durable = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Durable = true
})
}

// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
options.ExchangeOptions.AutoDelete = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.AutoDelete = true
})
}

// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
options.ExchangeOptions.Internal = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Internal = true
})
}

// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
options.ExchangeOptions.NoWait = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.NoWait = true
})
}

// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
options.ExchangeOptions.Declare = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Declare = true
})
}

// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
options.ExchangeOptions.Passive = true
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Passive = true
})
}

// WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Args = args
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
exchangeOptions.Args = args
})
}
}

Expand Down Expand Up @@ -287,9 +313,55 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
// multiple nodes in the cluster will have the messages distributed amongst them
// for higher reliability
func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
if options.QueueOptions.Args == nil {
options.QueueOptions.Args = Table{}
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
if queueOptions.Args == nil {
queueOptions.Args = Table{}
}

queueOptions.Args["x-queue-type"] = "quorum"
})
}

// WithSimpleQueueOptions used for backwards compatibility
// Will set options on the first queue and ensure that queue exists
func WithSimpleQueueOptions(options *ConsumerOptions, handler func(queueOptions *QueueOptions)) {
if len(options.Queues) == 0 {
options.Queues = append(options.Queues, QueueOptions{})
}

handler(&options.Queues[0])
}

// WithSimpleExchangeOptions used for backwards compatibility
// Will set options on the first exchange and ensure that exchange exists
func WithSimpleExchangeOptions(options *ConsumerOptions, handler func(exchangeOptions *ExchangeOptions)) {
if len(options.Exchanges) == 0 {
options.Exchanges = append(options.Exchanges, ExchangeOptions{})
}

options.QueueOptions.Args["x-queue-type"] = "quorum"
handler(&options.Exchanges[0])
}

func WithConsumerQueue(queue QueueOptions) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Queues = []QueueOptions{queue}
}
}

func WithConsumerQueues(queues []QueueOptions) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Queues = queues
}
}

func WithConsumerBindings(bindings []Binding) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = bindings
}
}

func WithConsumerExchanges(exchanges []ExchangeOptions) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Exchanges = exchanges
}
}
Loading
Loading