1
1
package connectionmanager
2
2
3
3
import (
4
+ "errors"
5
+ "fmt"
4
6
"sync"
5
7
"time"
6
8
@@ -9,10 +11,14 @@ import (
9
11
"github.com/wagslane/go-rabbitmq/internal/logger"
10
12
)
11
13
14
+ type Resolver interface {
15
+ Resolve () ([]string , error )
16
+ }
17
+
12
18
// ConnectionManager -
13
19
type ConnectionManager struct {
14
20
logger logger.Logger
15
- url string
21
+ resolver Resolver
16
22
connection * amqp.Connection
17
23
amqpConfig amqp.Config
18
24
connectionMux * sync.RWMutex
@@ -22,15 +28,36 @@ type ConnectionManager struct {
22
28
dispatcher * dispatcher.Dispatcher
23
29
}
24
30
31
+ // dial will attempt to connect to the a list of urls in the order they are
32
+ // given.
33
+ func dial (log logger.Logger , resolver Resolver , conf amqp.Config ) (* amqp.Connection , error ) {
34
+ urls , err := resolver .Resolve ()
35
+ if err != nil {
36
+ return nil , fmt .Errorf ("error resolving amqp server urls: %w" , err )
37
+ }
38
+
39
+ var errs []error
40
+ for _ , url := range urls {
41
+ conn , err := amqp .DialConfig (url , amqp .Config (conf ))
42
+ if err == nil {
43
+ return conn , err
44
+ }
45
+ log .Warnf ("failed to connect to amqp server %s: %v" , url , err )
46
+ errs = append (errs , err )
47
+ }
48
+ return nil , errors .Join (errs ... )
49
+ }
50
+
25
51
// NewConnectionManager creates a new connection manager
26
- func NewConnectionManager (url string , conf amqp.Config , log logger.Logger , reconnectInterval time.Duration ) (* ConnectionManager , error ) {
27
- conn , err := amqp . DialConfig ( url , amqp .Config (conf ))
52
+ func NewConnectionManager (resolver Resolver , conf amqp.Config , log logger.Logger , reconnectInterval time.Duration ) (* ConnectionManager , error ) {
53
+ conn , err := dial ( log , resolver , amqp .Config (conf ))
28
54
if err != nil {
29
55
return nil , err
30
56
}
57
+
31
58
connManager := ConnectionManager {
32
59
logger : log ,
33
- url : url ,
60
+ resolver : resolver ,
34
61
connection : conn ,
35
62
amqpConfig : conf ,
36
63
connectionMux : & sync.RWMutex {},
@@ -125,7 +152,8 @@ func (connManager *ConnectionManager) reconnectLoop() {
125
152
func (connManager * ConnectionManager ) reconnect () error {
126
153
connManager .connectionMux .Lock ()
127
154
defer connManager .connectionMux .Unlock ()
128
- newConn , err := amqp .DialConfig (connManager .url , amqp .Config (connManager .amqpConfig ))
155
+
156
+ conn , err := dial (connManager .logger , connManager .resolver , amqp .Config (connManager .amqpConfig ))
129
157
if err != nil {
130
158
return err
131
159
}
@@ -134,6 +162,6 @@ func (connManager *ConnectionManager) reconnect() error {
134
162
connManager .logger .Warnf ("error closing connection while reconnecting: %v" , err )
135
163
}
136
164
137
- connManager .connection = newConn
165
+ connManager .connection = conn
138
166
return nil
139
167
}
0 commit comments