Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit 532afd8

Browse files
authored
Merge pull request #50 from siad007/routingKey
Add amqp routing key.
2 parents ef23479 + efb9c7f commit 532afd8

File tree

3 files changed

+77
-8
lines changed

3 files changed

+77
-8
lines changed

broker/amqp/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (b *Broker) Serve() (err error) {
6969
defer b.consume.Close()
7070

7171
for _, q := range b.queues {
72-
err := q.declare(b.publish, q.name, q.name, nil)
72+
err := q.declare(b.publish, q.name, q.key, nil)
7373
if err != nil {
7474
b.mu.Unlock()
7575
return err

broker/amqp/broker_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,70 @@ func TestBroker_StatNotRegistered(t *testing.T) {
256256
_, err = b.Stat(pipe)
257257
assert.Error(t, err)
258258
}
259+
260+
func TestBroker_Queue_RoutingKey(t *testing.T) {
261+
pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
262+
263+
assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
264+
}
265+
266+
func TestBroker_Register_With_RoutingKey(t *testing.T) {
267+
b := &Broker{}
268+
_, err := b.Init(cfg)
269+
if err != nil {
270+
t.Fatal(err)
271+
}
272+
273+
pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
274+
275+
assert.NoError(t, b.Register(&pipeWithKey))
276+
}
277+
278+
func TestBroker_Consume_With_RoutingKey(t *testing.T) {
279+
b := &Broker{}
280+
_, err := b.Init(cfg)
281+
if err != nil {
282+
t.Fatal(err)
283+
}
284+
285+
pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
286+
287+
err = b.Register(&pipeWithKey)
288+
if err != nil {
289+
t.Fatal(err)
290+
}
291+
292+
ready := make(chan interface{})
293+
b.Listen(func(event int, ctx interface{}) {
294+
if event == jobs.EventBrokerReady {
295+
close(ready)
296+
}
297+
})
298+
299+
exec := make(chan jobs.Handler, 1)
300+
assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
301+
302+
go func() { assert.NoError(t, b.Serve()) }()
303+
defer b.Stop()
304+
305+
<-ready
306+
307+
jid, perr := b.Push(&pipeWithKey, &jobs.Job{
308+
Job: "test",
309+
Payload: "body",
310+
Options: &jobs.Options{},
311+
})
312+
313+
assert.NotEqual(t, "", jid)
314+
assert.NoError(t, perr)
315+
316+
waitJob := make(chan interface{})
317+
exec <- func(id string, j *jobs.Job) error {
318+
assert.Equal(t, jid, id)
319+
assert.Equal(t, "body", j.Payload)
320+
close(waitJob)
321+
return nil
322+
}
323+
324+
<-waitJob
325+
}

broker/amqp/queue.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import (
1111
)
1212

1313
type queue struct {
14-
active int32
15-
pipe *jobs.Pipeline
16-
exchange, name string
17-
consumer string
14+
active int32
15+
pipe *jobs.Pipeline
16+
exchange, name, key string
17+
consumer string
1818

1919
// active consuming channel
2020
muc sync.Mutex
@@ -42,6 +42,7 @@ func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue
4242
return &queue{
4343
exchange: pipe.String("exchange", "amqp.direct"),
4444
name: pipe.String("queue", ""),
45+
key: pipe.String("routing-key", pipe.String("queue", "")),
4546
consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
4647
pipe: pipe,
4748
lsn: lsn,
@@ -169,11 +170,12 @@ func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay
169170
return err
170171
}
171172

172-
qName := q.name
173+
qKey := q.key
173174

174175
if delay != 0 {
175176
delayMs := int64(delay.Seconds() * 1000)
176-
qName = fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
177+
qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
178+
qKey = qName
177179

178180
err := q.declare(cp, qName, qName, amqp.Table{
179181
"x-dead-letter-exchange": q.exchange,
@@ -189,7 +191,7 @@ func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay
189191

190192
err = c.ch.Publish(
191193
q.exchange, // exchange
192-
qName, // routing key
194+
qKey, // routing key
193195
false, // mandatory
194196
false, // immediate
195197
amqp.Publishing{

0 commit comments

Comments
 (0)