Skip to content

Commit 325b049

Browse files
committed
2 parents 6f99816 + 0febae3 commit 325b049

File tree

4 files changed

+89
-59
lines changed

4 files changed

+89
-59
lines changed

README.md

+9-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ defer conn.Close()
4444

4545
consumer, err := rabbitmq.NewConsumer(
4646
conn,
47-
func(d rabbitmq.Delivery) rabbitmq.Action {
48-
log.Printf("consumed: %v", string(d.Body))
49-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
50-
return rabbitmq.Ack
51-
},
5247
"my_queue",
5348
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
5449
rabbitmq.WithConsumerOptionsExchangeName("events"),
@@ -58,6 +53,15 @@ if err != nil {
5853
log.Fatal(err)
5954
}
6055
defer consumer.Close()
56+
57+
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
58+
log.Printf("consumed: %v", string(d.Body))
59+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
60+
return rabbitmq.Ack
61+
})
62+
if err != nil {
63+
log.Fatal(err)
64+
}
6165
```
6266

6367
## 🚀 Quick Start Publisher

consume.go

+20-21
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@ type Delivery struct {
4444
amqp.Delivery
4545
}
4646

47-
// NewConsumer returns a new Consumer connected to the given rabbitmq server
48-
// it also starts consuming on the given connection with automatic reconnection handling
49-
// Do not reuse the returned consumer for anything other than to close it
47+
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
5048
func NewConsumer(
5149
conn *Conn,
52-
handler Handler,
5350
queue string,
5451
optionFuncs ...func(*ConsumerOptions),
5552
) (*Consumer, error) {
@@ -78,30 +75,32 @@ func NewConsumer(
7875
isClosed: false,
7976
}
8077

81-
err = consumer.startGoroutines(
78+
return consumer, nil
79+
}
80+
81+
// Run starts consuming with automatic reconnection handling. Do not reuse the
82+
// consumer for anything other than to close it.
83+
func (consumer *Consumer) Run(handler Handler) error {
84+
err := consumer.startGoroutines(
8285
handler,
83-
*options,
86+
consumer.options,
8487
)
8588
if err != nil {
86-
return nil, err
89+
return err
8790
}
8891

89-
go func() {
90-
for err := range consumer.reconnectErrCh {
91-
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
92-
err = consumer.startGoroutines(
93-
handler,
94-
*options,
95-
)
96-
if err != nil {
97-
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
98-
consumer.options.Logger.Fatalf("consumer closing, unable to recover")
99-
return
100-
}
92+
for err := range consumer.reconnectErrCh {
93+
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
94+
err = consumer.startGoroutines(
95+
handler,
96+
consumer.options,
97+
)
98+
if err != nil {
99+
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
101100
}
102-
}()
101+
}
103102

104-
return consumer, nil
103+
return nil
105104
}
106105

107106
// Close cleans up resources and closes the consumer.

examples/consumer/main.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ func main() {
2222

2323
consumer, err := rabbitmq.NewConsumer(
2424
conn,
25-
func(d rabbitmq.Delivery) rabbitmq.Action {
26-
log.Printf("consumed: %v", string(d.Body))
27-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
28-
return rabbitmq.Ack
29-
},
3025
"my_queue",
3126
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
3227
rabbitmq.WithConsumerOptionsExchangeName("events"),
@@ -35,22 +30,29 @@ func main() {
3530
if err != nil {
3631
log.Fatal(err)
3732
}
38-
defer consumer.Close()
3933

40-
// block main thread - wait for shutdown signal
4134
sigs := make(chan os.Signal, 1)
42-
done := make(chan bool, 1)
4335

4436
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
4537

4638
go func() {
39+
fmt.Println("awaiting signal")
4740
sig := <-sigs
41+
4842
fmt.Println()
4943
fmt.Println(sig)
50-
done <- true
44+
fmt.Println("stopping consumer")
45+
46+
consumer.Close()
5147
}()
5248

53-
fmt.Println("awaiting signal")
54-
<-done
55-
fmt.Println("stopping consumer")
49+
// block main thread - wait for shutdown signal
50+
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
51+
log.Printf("consumed: %v", string(d.Body))
52+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
53+
return rabbitmq.Ack
54+
})
55+
if err != nil {
56+
log.Fatal(err)
57+
}
5658
}

examples/multiconsumer/main.go

+46-21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"log"
66
"os"
77
"os/signal"
8+
"sync"
89
"syscall"
910

1011
rabbitmq "github.com/wagslane/go-rabbitmq"
@@ -22,11 +23,6 @@ func main() {
2223

2324
consumer, err := rabbitmq.NewConsumer(
2425
conn,
25-
func(d rabbitmq.Delivery) rabbitmq.Action {
26-
log.Printf("consumed: %v", string(d.Body))
27-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
28-
return rabbitmq.Ack
29-
},
3026
"my_queue",
3127
rabbitmq.WithConsumerOptionsConcurrency(2),
3228
rabbitmq.WithConsumerOptionsConsumerName("consumer_1"),
@@ -38,15 +34,9 @@ func main() {
3834
if err != nil {
3935
log.Fatal(err)
4036
}
41-
defer consumer.Close()
4237

4338
consumer2, err := rabbitmq.NewConsumer(
4439
conn,
45-
func(d rabbitmq.Delivery) rabbitmq.Action {
46-
log.Printf("consumed 2: %v", string(d.Body))
47-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
48-
return rabbitmq.Ack
49-
},
5040
"my_queue",
5141
rabbitmq.WithConsumerOptionsConcurrency(2),
5242
rabbitmq.WithConsumerOptionsConsumerName("consumer_2"),
@@ -56,22 +46,57 @@ func main() {
5646
if err != nil {
5747
log.Fatal(err)
5848
}
59-
defer consumer2.Close()
6049

61-
// block main thread - wait for shutdown signal
6250
sigs := make(chan os.Signal, 1)
63-
done := make(chan bool, 1)
51+
errs := make(chan error, 1)
6452

6553
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
6654

6755
go func() {
68-
sig := <-sigs
69-
fmt.Println()
70-
fmt.Println(sig)
71-
done <- true
56+
fmt.Println("awaiting signal")
57+
select {
58+
case sig := <-sigs:
59+
fmt.Println()
60+
fmt.Println(sig)
61+
case err := <-errs:
62+
log.Print(err)
63+
}
64+
65+
fmt.Println("stopping consumers")
66+
67+
consumer.Close()
68+
consumer2.Close()
69+
}()
70+
71+
var wg sync.WaitGroup
72+
73+
wg.Add(2)
74+
75+
go func() {
76+
defer wg.Done()
77+
78+
err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
79+
log.Printf("consumed: %v", string(d.Body))
80+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
81+
return rabbitmq.Ack
82+
})
83+
if err != nil {
84+
errs <- err
85+
}
86+
}()
87+
88+
go func() {
89+
defer wg.Done()
90+
91+
err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
92+
log.Printf("consumed: %v", string(d.Body))
93+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
94+
return rabbitmq.Ack
95+
})
96+
if err != nil {
97+
errs <- err
98+
}
7299
}()
73100

74-
fmt.Println("awaiting signal")
75-
<-done
76-
fmt.Println("stopping consumer")
101+
wg.Wait()
77102
}

0 commit comments

Comments
 (0)