Skip to content

Commit 90649e4

Browse files
authored
Merge pull request #145 from ttab/main
Return an error instead of crashing when we cannot retry consumer
2 parents 937bab2 + 2a1dc36 commit 90649e4

File tree

4 files changed

+89
-59
lines changed

4 files changed

+89
-59
lines changed

Diff for: README.md

+9-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ defer conn.Close()
4444

4545
consumer, err := rabbitmq.NewConsumer(
4646
conn,
47-
func(d rabbitmq.Delivery) rabbitmq.Action {
48-
log.Printf("consumed: %v", string(d.Body))
49-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
50-
return rabbitmq.Ack
51-
},
5247
"my_queue",
5348
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
5449
rabbitmq.WithConsumerOptionsExchangeName("events"),
@@ -58,6 +53,15 @@ if err != nil {
5853
log.Fatal(err)
5954
}
6055
defer consumer.Close()
56+
57+
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
58+
log.Printf("consumed: %v", string(d.Body))
59+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
60+
return rabbitmq.Ack
61+
})
62+
if err != nil {
63+
log.Fatal(err)
64+
}
6165
```
6266

6367
## 🚀 Quick Start Publisher

Diff for: consume.go

+20-21
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@ type Delivery struct {
4444
amqp.Delivery
4545
}
4646

47-
// NewConsumer returns a new Consumer connected to the given rabbitmq server
48-
// it also starts consuming on the given connection with automatic reconnection handling
49-
// Do not reuse the returned consumer for anything other than to close it
47+
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
5048
func NewConsumer(
5149
conn *Conn,
52-
handler Handler,
5350
queue string,
5451
optionFuncs ...func(*ConsumerOptions),
5552
) (*Consumer, error) {
@@ -78,30 +75,32 @@ func NewConsumer(
7875
isClosed: false,
7976
}
8077

81-
err = consumer.startGoroutines(
78+
return consumer, nil
79+
}
80+
81+
// Run starts consuming with automatic reconnection handling. Do not reuse the
82+
// consumer for anything other than to close it.
83+
func (consumer *Consumer) Run(handler Handler) error {
84+
err := consumer.startGoroutines(
8285
handler,
83-
*options,
86+
consumer.options,
8487
)
8588
if err != nil {
86-
return nil, err
89+
return err
8790
}
8891

89-
go func() {
90-
for err := range consumer.reconnectErrCh {
91-
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
92-
err = consumer.startGoroutines(
93-
handler,
94-
*options,
95-
)
96-
if err != nil {
97-
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
98-
consumer.options.Logger.Fatalf("consumer closing, unable to recover")
99-
return
100-
}
92+
for err := range consumer.reconnectErrCh {
93+
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
94+
err = consumer.startGoroutines(
95+
handler,
96+
consumer.options,
97+
)
98+
if err != nil {
99+
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
101100
}
102-
}()
101+
}
103102

104-
return consumer, nil
103+
return nil
105104
}
106105

107106
// Close cleans up resources and closes the consumer.

Diff for: examples/consumer/main.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ func main() {
2222

2323
consumer, err := rabbitmq.NewConsumer(
2424
conn,
25-
func(d rabbitmq.Delivery) rabbitmq.Action {
26-
log.Printf("consumed: %v", string(d.Body))
27-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
28-
return rabbitmq.Ack
29-
},
3025
"my_queue",
3126
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
3227
rabbitmq.WithConsumerOptionsExchangeName("events"),
@@ -35,22 +30,29 @@ func main() {
3530
if err != nil {
3631
log.Fatal(err)
3732
}
38-
defer consumer.Close()
3933

40-
// block main thread - wait for shutdown signal
4134
sigs := make(chan os.Signal, 1)
42-
done := make(chan bool, 1)
4335

4436
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
4537

4638
go func() {
39+
fmt.Println("awaiting signal")
4740
sig := <-sigs
41+
4842
fmt.Println()
4943
fmt.Println(sig)
50-
done <- true
44+
fmt.Println("stopping consumer")
45+
46+
consumer.Close()
5147
}()
5248

53-
fmt.Println("awaiting signal")
54-
<-done
55-
fmt.Println("stopping consumer")
49+
// block main thread - wait for shutdown signal
50+
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
51+
log.Printf("consumed: %v", string(d.Body))
52+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
53+
return rabbitmq.Ack
54+
})
55+
if err != nil {
56+
log.Fatal(err)
57+
}
5658
}

Diff for: examples/multiconsumer/main.go

+46-21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"log"
66
"os"
77
"os/signal"
8+
"sync"
89
"syscall"
910

1011
rabbitmq "github.com/wagslane/go-rabbitmq"
@@ -22,11 +23,6 @@ func main() {
2223

2324
consumer, err := rabbitmq.NewConsumer(
2425
conn,
25-
func(d rabbitmq.Delivery) rabbitmq.Action {
26-
log.Printf("consumed: %v", string(d.Body))
27-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
28-
return rabbitmq.Ack
29-
},
3026
"my_queue",
3127
rabbitmq.WithConsumerOptionsConcurrency(2),
3228
rabbitmq.WithConsumerOptionsConsumerName("consumer_1"),
@@ -37,15 +33,9 @@ func main() {
3733
if err != nil {
3834
log.Fatal(err)
3935
}
40-
defer consumer.Close()
4136

4237
consumer2, err := rabbitmq.NewConsumer(
4338
conn,
44-
func(d rabbitmq.Delivery) rabbitmq.Action {
45-
log.Printf("consumed 2: %v", string(d.Body))
46-
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
47-
return rabbitmq.Ack
48-
},
4939
"my_queue",
5040
rabbitmq.WithConsumerOptionsConcurrency(2),
5141
rabbitmq.WithConsumerOptionsConsumerName("consumer_2"),
@@ -55,22 +45,57 @@ func main() {
5545
if err != nil {
5646
log.Fatal(err)
5747
}
58-
defer consumer2.Close()
5948

60-
// block main thread - wait for shutdown signal
6149
sigs := make(chan os.Signal, 1)
62-
done := make(chan bool, 1)
50+
errs := make(chan error, 1)
6351

6452
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
6553

6654
go func() {
67-
sig := <-sigs
68-
fmt.Println()
69-
fmt.Println(sig)
70-
done <- true
55+
fmt.Println("awaiting signal")
56+
select {
57+
case sig := <-sigs:
58+
fmt.Println()
59+
fmt.Println(sig)
60+
case err := <-errs:
61+
log.Print(err)
62+
}
63+
64+
fmt.Println("stopping consumers")
65+
66+
consumer.Close()
67+
consumer2.Close()
68+
}()
69+
70+
var wg sync.WaitGroup
71+
72+
wg.Add(2)
73+
74+
go func() {
75+
defer wg.Done()
76+
77+
err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
78+
log.Printf("consumed: %v", string(d.Body))
79+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
80+
return rabbitmq.Ack
81+
})
82+
if err != nil {
83+
errs <- err
84+
}
85+
}()
86+
87+
go func() {
88+
defer wg.Done()
89+
90+
err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
91+
log.Printf("consumed: %v", string(d.Body))
92+
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
93+
return rabbitmq.Ack
94+
})
95+
if err != nil {
96+
errs <- err
97+
}
7198
}()
7299

73-
fmt.Println("awaiting signal")
74-
<-done
75-
fmt.Println("stopping consumer")
100+
wg.Wait()
76101
}

0 commit comments

Comments
 (0)