Skip to content

Commit 7fca926

Browse files
authored
Merge pull request #163 from wazsone/feature/cluster
Add cluster support via Resolver
2 parents 456bec0 + 5802dd2 commit 7fca926

File tree

3 files changed

+93
-12
lines changed

3 files changed

+93
-12
lines changed

connection.go

+34-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package rabbitmq
22

33
import (
4+
"math/rand"
5+
46
amqp "github.com/rabbitmq/amqp091-go"
57
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
68
)
@@ -21,27 +23,53 @@ type Conn struct {
2123
// will be stored in the returned connection's Config field.
2224
type Config amqp.Config
2325

26+
type Resolver = connectionmanager.Resolver
27+
28+
type StaticResolver struct {
29+
urls []string
30+
shuffe bool
31+
}
32+
33+
func (r *StaticResolver) Resolve() ([]string, error) {
34+
// TODO: move to slices.Clone when supported Go versions > 1.21
35+
var urls []string
36+
urls = append(urls, r.urls...)
37+
38+
if r.shuffe {
39+
rand.Shuffle(len(urls), func(i, j int) {
40+
urls[i], urls[j] = urls[j], urls[i]
41+
})
42+
}
43+
return urls, nil
44+
}
45+
46+
func NewStaticResolver(urls []string, shuffle bool) *StaticResolver {
47+
return &StaticResolver{urls: urls}
48+
}
49+
2450
// NewConn creates a new connection manager
25-
func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) {
51+
func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) {
52+
return NewClusterConn(NewStaticResolver([]string{url}, false), opts...)
53+
}
54+
55+
func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) {
2656
defaultOptions := getDefaultConnectionOptions()
2757
options := &defaultOptions
28-
for _, optionFunc := range optionFuncs {
29-
optionFunc(options)
58+
for _, optFn := range opts {
59+
optFn(options)
3060
}
3161

32-
manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
62+
manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
3363
if err != nil {
3464
return nil, err
3565
}
36-
3766
reconnectErrCh, closeCh := manager.NotifyReconnect()
3867
conn := &Conn{
3968
connectionManager: manager,
4069
reconnectErrCh: reconnectErrCh,
4170
closeConnectionToManagerCh: closeCh,
4271
options: *options,
4372
}
44-
4573
go conn.handleRestarts()
4674
return conn, nil
4775
}

examples/cluster/main.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
rabbitmq "github.com/wagslane/go-rabbitmq"
7+
)
8+
9+
func main() {
10+
resolver := rabbitmq.NewStaticResolver(
11+
[]string{
12+
"amqp://guest:guest@host1",
13+
"amqp://guest:guest@host2",
14+
"amqp://guest:guest@host3",
15+
},
16+
false, /* shuffle */
17+
)
18+
19+
conn, err := rabbitmq.NewClusterConn(resolver)
20+
if err != nil {
21+
log.Fatal(err)
22+
}
23+
defer conn.Close()
24+
25+
}

internal/connectionmanager/connection_manager.go

+34-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package connectionmanager
22

33
import (
4+
"errors"
5+
"fmt"
46
"sync"
57
"time"
68

@@ -12,7 +14,7 @@ import (
1214
// ConnectionManager -
1315
type ConnectionManager struct {
1416
logger logger.Logger
15-
url string
17+
resolver Resolver
1618
connection *amqp.Connection
1719
amqpConfig amqp.Config
1820
connectionMux *sync.RWMutex
@@ -22,15 +24,40 @@ type ConnectionManager struct {
2224
dispatcher *dispatcher.Dispatcher
2325
}
2426

27+
type Resolver interface {
28+
Resolve() ([]string, error)
29+
}
30+
31+
// dial will attempt to connect to the a list of urls in the order they are
32+
// given.
33+
func dial(log logger.Logger, resolver Resolver, conf amqp.Config) (*amqp.Connection, error) {
34+
urls, err := resolver.Resolve()
35+
if err != nil {
36+
return nil, fmt.Errorf("error resolving amqp server urls: %w", err)
37+
}
38+
39+
var errs []error
40+
for _, url := range urls {
41+
conn, err := amqp.DialConfig(url, amqp.Config(conf))
42+
if err == nil {
43+
return conn, err
44+
}
45+
log.Warnf("failed to connect to amqp server %s: %v", url, err)
46+
errs = append(errs, err)
47+
}
48+
return nil, errors.Join(errs...)
49+
}
50+
2551
// NewConnectionManager creates a new connection manager
26-
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
27-
conn, err := amqp.DialConfig(url, amqp.Config(conf))
52+
func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
53+
conn, err := dial(log, resolver, amqp.Config(conf))
2854
if err != nil {
2955
return nil, err
3056
}
57+
3158
connManager := ConnectionManager{
3259
logger: log,
33-
url: url,
60+
resolver: resolver,
3461
connection: conn,
3562
amqpConfig: conf,
3663
connectionMux: &sync.RWMutex{},
@@ -125,7 +152,8 @@ func (connManager *ConnectionManager) reconnectLoop() {
125152
func (connManager *ConnectionManager) reconnect() error {
126153
connManager.connectionMux.Lock()
127154
defer connManager.connectionMux.Unlock()
128-
newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig))
155+
156+
conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig))
129157
if err != nil {
130158
return err
131159
}
@@ -134,6 +162,6 @@ func (connManager *ConnectionManager) reconnect() error {
134162
connManager.logger.Warnf("error closing connection while reconnecting: %v", err)
135163
}
136164

137-
connManager.connection = newConn
165+
connManager.connection = conn
138166
return nil
139167
}

0 commit comments

Comments
 (0)