Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit 12ffca7

Browse files
authored
Merge pull request #51 from siad007/exchangeType
Added amqp exchange-type option.
2 parents 532afd8 + a715718 commit 12ffca7

File tree

2 files changed

+137
-5
lines changed

2 files changed

+137
-5
lines changed

broker/amqp/broker_test.go

+94
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@ var (
2121
}
2222
)
2323

24+
var (
25+
fanoutPipe = &jobs.Pipeline{
26+
"broker": "amqp",
27+
"name": "fanout",
28+
"queue": "fanout-queue",
29+
"exchange": "fanout-exchange",
30+
"exchange-type": "fanout",
31+
"prefetch": 1,
32+
}
33+
34+
fanoutCfg = &Config{
35+
Addr: "amqp://guest:guest@localhost:5672/",
36+
}
37+
)
38+
2439
func TestBroker_Init(t *testing.T) {
2540
b := &Broker{}
2641
ok, err := b.Init(cfg)
@@ -323,3 +338,82 @@ func TestBroker_Consume_With_RoutingKey(t *testing.T) {
323338

324339
<-waitJob
325340
}
341+
342+
func TestBroker_Queue_ExchangeType(t *testing.T) {
343+
pipeWithKey := pipe.With("exchange-type", "direct")
344+
345+
assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
346+
}
347+
348+
func TestBroker_Register_With_ExchangeType(t *testing.T) {
349+
b := &Broker{}
350+
_, err := b.Init(cfg)
351+
if err != nil {
352+
t.Fatal(err)
353+
}
354+
355+
pipeWithKey := pipe.With("exchange-type", "fanout")
356+
357+
assert.NoError(t, b.Register(&pipeWithKey))
358+
}
359+
360+
func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
361+
b := &Broker{}
362+
_, err := b.Init(cfg)
363+
if err != nil {
364+
t.Fatal(err)
365+
}
366+
367+
pipeWithKey := pipe.With("exchange-type", "xxx")
368+
369+
assert.Error(t, b.Register(&pipeWithKey))
370+
}
371+
372+
func TestBroker_Consume_With_ExchangeType(t *testing.T) {
373+
b := &Broker{}
374+
_, err := b.Init(fanoutCfg)
375+
if err != nil {
376+
t.Fatal(err)
377+
}
378+
379+
pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
380+
381+
err = b.Register(&pipeWithKey)
382+
if err != nil {
383+
t.Fatal(err)
384+
}
385+
386+
ready := make(chan interface{})
387+
b.Listen(func(event int, ctx interface{}) {
388+
if event == jobs.EventBrokerReady {
389+
close(ready)
390+
}
391+
})
392+
393+
exec := make(chan jobs.Handler, 1)
394+
assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
395+
396+
go func() { assert.NoError(t, b.Serve()) }()
397+
defer b.Stop()
398+
399+
<-ready
400+
401+
jid, perr := b.Push(&pipeWithKey, &jobs.Job{
402+
Job: "test",
403+
Payload: "body",
404+
Options: &jobs.Options{},
405+
})
406+
407+
assert.NotEqual(t, "", jid)
408+
assert.NoError(t, perr)
409+
410+
waitJob := make(chan interface{})
411+
exec <- func(id string, j *jobs.Job) error {
412+
assert.Equal(t, jid, id)
413+
assert.Equal(t, "body", j.Payload)
414+
close(waitJob)
415+
return nil
416+
}
417+
418+
<-waitJob
419+
}

broker/amqp/queue.go

+43-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package amqp
22

33
import (
4+
"errors"
45
"fmt"
56
"github.com/spiral/jobs/v2"
67
"github.com/streadway/amqp"
@@ -10,11 +11,40 @@ import (
1011
"time"
1112
)
1213

14+
type ExchangeType string
15+
16+
const (
17+
Direct ExchangeType = "direct"
18+
Fanout ExchangeType = "fanout"
19+
Topic ExchangeType = "topic"
20+
Headers ExchangeType = "headers"
21+
)
22+
23+
func (et ExchangeType) IsValid() error {
24+
switch et {
25+
case Direct, Fanout, Topic, Headers:
26+
return nil
27+
}
28+
return errors.New("unknown exchange-type")
29+
}
30+
31+
func (et ExchangeType) String() string {
32+
switch et {
33+
case Direct, Fanout, Topic, Headers:
34+
return string(et)
35+
default:
36+
return "direct"
37+
}
38+
}
39+
40+
1341
type queue struct {
14-
active int32
15-
pipe *jobs.Pipeline
16-
exchange, name, key string
17-
consumer string
42+
active int32
43+
pipe *jobs.Pipeline
44+
exchange string
45+
exchangeType ExchangeType
46+
name, key string
47+
consumer string
1848

1949
// active consuming channel
2050
muc sync.Mutex
@@ -39,8 +69,16 @@ func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue
3969
return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
4070
}
4171

72+
exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
73+
74+
err := exchangeType.IsValid()
75+
if err != nil {
76+
return nil, fmt.Errorf(err.Error())
77+
}
78+
4279
return &queue{
4380
exchange: pipe.String("exchange", "amqp.direct"),
81+
exchangeType: exchangeType,
4482
name: pipe.String("queue", ""),
4583
key: pipe.String("routing-key", pipe.String("queue", "")),
4684
consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
@@ -221,7 +259,7 @@ func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table)
221259
return err
222260
}
223261

224-
err = c.ch.ExchangeDeclare(q.exchange, "direct", true, false, false, false, nil)
262+
err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
225263
if err != nil {
226264
return cp.closeChan(c, err)
227265
}

0 commit comments

Comments
 (0)