Skip to content

Commit aeee45d

Browse files
committed
Add support for any kind of declarations when creating publisher / consumer
Note: Breaking Changes - Removes ConsumerOptions.QueueOptions is removed - Removes ConsumerOptions.ExchangeOptions is removed - Adds ConsumerOptions.QueueName as the queue to listen to - Adds slices Queues, Exchanges and Bindings to ConsumerOptions - Adds slices Queues, Exchanges and Bindings to PublisherOptions - Expands Binding to allow for exchange bindings
1 parent 937bab2 commit aeee45d

File tree

6 files changed

+299
-99
lines changed

6 files changed

+299
-99
lines changed

Diff for: consume.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,19 @@ func (consumer *Consumer) startGoroutines(
140140
if err != nil {
141141
return fmt.Errorf("declare qos failed: %w", err)
142142
}
143-
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
144-
if err != nil {
145-
return fmt.Errorf("declare exchange failed: %w", err)
146-
}
147-
err = declareQueue(consumer.chanManager, options.QueueOptions)
148-
if err != nil {
149-
return fmt.Errorf("declare queue failed: %w", err)
150-
}
151-
err = declareBindings(consumer.chanManager, options)
143+
144+
err = declareAll(consumer.chanManager, declareOptions{
145+
Queues: options.Queues,
146+
Exchanges: options.Exchanges,
147+
Bindings: options.Bindings,
148+
})
149+
152150
if err != nil {
153-
return fmt.Errorf("declare bindings failed: %w", err)
151+
return err
154152
}
155153

156154
msgs, err := consumer.chanManager.ConsumeSafe(
157-
options.QueueOptions.Name,
155+
options.QueueName,
158156
options.RabbitConsumerOptions.Name,
159157
options.RabbitConsumerOptions.AutoAck,
160158
options.RabbitConsumerOptions.Exclusive,

Diff for: consumer_options.go

+125-53
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,30 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
1616
NoLocal: false,
1717
Args: Table{},
1818
},
19-
QueueOptions: QueueOptions{
20-
Name: queueName,
21-
Durable: false,
22-
AutoDelete: false,
23-
Exclusive: false,
24-
NoWait: false,
25-
Passive: false,
26-
Args: Table{},
27-
Declare: true,
19+
Queues: []QueueOptions{
20+
{
21+
Name: queueName,
22+
Durable: false,
23+
AutoDelete: false,
24+
Exclusive: false,
25+
NoWait: false,
26+
Passive: false,
27+
Args: Table{},
28+
Declare: true,
29+
},
2830
},
29-
ExchangeOptions: ExchangeOptions{
30-
Name: "",
31-
Kind: amqp.ExchangeDirect,
32-
Durable: false,
33-
AutoDelete: false,
34-
Internal: false,
35-
NoWait: false,
36-
Passive: false,
37-
Args: Table{},
38-
Declare: false,
31+
Exchanges: []ExchangeOptions{
32+
{
33+
Name: "",
34+
Kind: amqp.ExchangeDirect,
35+
Durable: false,
36+
AutoDelete: false,
37+
Internal: false,
38+
NoWait: false,
39+
Passive: false,
40+
Args: Table{},
41+
Declare: false,
42+
},
3943
},
4044
Bindings: []Binding{},
4145
Concurrency: 1,
@@ -59,8 +63,9 @@ func getDefaultBindingOptions() BindingOptions {
5963
// If there are Bindings, the queue will be bound to them
6064
type ConsumerOptions struct {
6165
RabbitConsumerOptions RabbitConsumerOptions
62-
QueueOptions QueueOptions
63-
ExchangeOptions ExchangeOptions
66+
QueueName string
67+
Queues []QueueOptions
68+
Exchanges []ExchangeOptions
6469
Bindings []Binding
6570
Concurrency int
6671
Logger logger.Logger
@@ -93,105 +98,126 @@ type QueueOptions struct {
9398
Declare bool
9499
}
95100

96-
// Binding describes the bhinding of a queue to a routing key on an exchange
97-
type Binding struct {
98-
RoutingKey string
99-
BindingOptions
100-
}
101-
102-
// BindingOptions describes the options a binding can have
103-
type BindingOptions struct {
104-
NoWait bool
105-
Args Table
106-
Declare bool
107-
}
108-
109101
// WithConsumerOptionsQueueDurable ensures the queue is a durable queue
110102
func WithConsumerOptionsQueueDurable(options *ConsumerOptions) {
111-
options.QueueOptions.Durable = true
103+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
104+
queueOptions.Durable = true
105+
})
112106
}
113107

114108
// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
115109
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) {
116-
options.QueueOptions.AutoDelete = true
110+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
111+
queueOptions.AutoDelete = true
112+
})
117113
}
118114

119115
// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
120116
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) {
121-
options.QueueOptions.Exclusive = true
117+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
118+
queueOptions.Exclusive = true
119+
})
122120
}
123121

124122
// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
125123
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) {
126-
options.QueueOptions.NoWait = true
124+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
125+
queueOptions.NoWait = true
126+
})
127127
}
128128

129129
// WithConsumerOptionsQueuePassive ensures the queue is a passive queue
130130
func WithConsumerOptionsQueuePassive(options *ConsumerOptions) {
131-
options.QueueOptions.Passive = true
131+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
132+
queueOptions.Passive = true
133+
})
132134
}
133135

134136
// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's
135137
// existance upon startup
136138
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) {
137-
options.QueueOptions.Declare = false
139+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
140+
queueOptions.Declare = false
141+
})
138142
}
139143

140144
// WithConsumerOptionsQueueArgs adds optional args to the queue
141145
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
142146
return func(options *ConsumerOptions) {
143-
options.QueueOptions.Args = args
147+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
148+
queueOptions.Args = args
149+
})
144150
}
145151
}
146152

147153
// WithConsumerOptionsExchangeName sets the exchange name
148154
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
155+
149156
return func(options *ConsumerOptions) {
150-
options.ExchangeOptions.Name = name
157+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
158+
exchangeOptions.Name = name
159+
})
151160
}
152161
}
153162

154163
// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
155164
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
165+
156166
return func(options *ConsumerOptions) {
157-
options.ExchangeOptions.Kind = kind
167+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
168+
exchangeOptions.Kind = kind
169+
})
158170
}
159171
}
160172

161173
// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
162174
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
163-
options.ExchangeOptions.Durable = true
175+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
176+
exchangeOptions.Durable = true
177+
})
164178
}
165179

166180
// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
167181
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
168-
options.ExchangeOptions.AutoDelete = true
182+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
183+
exchangeOptions.AutoDelete = true
184+
})
169185
}
170186

171187
// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
172188
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
173-
options.ExchangeOptions.Internal = true
189+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
190+
exchangeOptions.Internal = true
191+
})
174192
}
175193

176194
// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
177195
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
178-
options.ExchangeOptions.NoWait = true
196+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
197+
exchangeOptions.NoWait = true
198+
})
179199
}
180200

181201
// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
182202
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
183-
options.ExchangeOptions.Declare = true
203+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
204+
exchangeOptions.Declare = true
205+
})
184206
}
185207

186208
// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
187209
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
188-
options.ExchangeOptions.Passive = true
210+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
211+
exchangeOptions.Passive = true
212+
})
189213
}
190214

191215
// WithConsumerOptionsExchangeArgs adds optional args to the exchange
192216
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
193217
return func(options *ConsumerOptions) {
194-
options.ExchangeOptions.Args = args
218+
WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) {
219+
exchangeOptions.Args = args
220+
})
195221
}
196222
}
197223

@@ -287,9 +313,55 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
287313
// multiple nodes in the cluster will have the messages distributed amongst them
288314
// for higher reliability
289315
func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
290-
if options.QueueOptions.Args == nil {
291-
options.QueueOptions.Args = Table{}
316+
WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) {
317+
if queueOptions.Args == nil {
318+
queueOptions.Args = Table{}
319+
}
320+
321+
queueOptions.Args["x-queue-type"] = "quorum"
322+
})
323+
}
324+
325+
// WithSimpleQueueOptions used for backwards compatibility
326+
// Will set options on the first queue and ensure that queue exists
327+
func WithSimpleQueueOptions(options *ConsumerOptions, handler func(queueOptions *QueueOptions)) {
328+
if len(options.Queues) == 0 {
329+
options.Queues = append(options.Queues, QueueOptions{})
330+
}
331+
332+
handler(&options.Queues[0])
333+
}
334+
335+
// WithSimpleExchangeOptions used for backwards compatibility
336+
// Will set options on the first exchange and ensure that exchange exists
337+
func WithSimpleExchangeOptions(options *ConsumerOptions, handler func(exchangeOptions *ExchangeOptions)) {
338+
if len(options.Exchanges) == 0 {
339+
options.Exchanges = append(options.Exchanges, ExchangeOptions{})
292340
}
293341

294-
options.QueueOptions.Args["x-queue-type"] = "quorum"
342+
handler(&options.Exchanges[0])
343+
}
344+
345+
func WithConsumerQueue(queue QueueOptions) func(options *ConsumerOptions) {
346+
return func(options *ConsumerOptions) {
347+
options.Queues = []QueueOptions{queue}
348+
}
349+
}
350+
351+
func WithConsumerQueues(queues []QueueOptions) func(options *ConsumerOptions) {
352+
return func(options *ConsumerOptions) {
353+
options.Queues = queues
354+
}
355+
}
356+
357+
func WithConsumerBindings(bindings []Binding) func(options *ConsumerOptions) {
358+
return func(options *ConsumerOptions) {
359+
options.Bindings = bindings
360+
}
361+
}
362+
363+
func WithConsumerExchanges(exchanges []ExchangeOptions) func(options *ConsumerOptions) {
364+
return func(options *ConsumerOptions) {
365+
options.Exchanges = exchanges
366+
}
295367
}

0 commit comments

Comments
 (0)