Skip to content

Commit e99fa51

Browse files
committed
update naming mux -> mu
1 parent 786f782 commit e99fa51

File tree

9 files changed

+151
-151
lines changed

9 files changed

+151
-151
lines changed

consume.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ type Consumer struct {
3333
reconnectErrCh <-chan error
3434
closeConnectionToManagerCh chan<- struct{}
3535
options ConsumerOptions
36-
handlerMux *sync.RWMutex
36+
handlerMu *sync.RWMutex
3737

38-
isClosedMux *sync.RWMutex
39-
isClosed bool
38+
isClosedMu *sync.RWMutex
39+
isClosed bool
4040
}
4141

4242
// Delivery captures the fields for a previously delivered message resident in
@@ -73,7 +73,7 @@ func NewConsumer(
7373
reconnectErrCh: reconnectErrCh,
7474
closeConnectionToManagerCh: closeCh,
7575
options: *options,
76-
isClosedMux: &sync.RWMutex{},
76+
isClosedMu: &sync.RWMutex{},
7777
isClosed: false,
7878
}
7979

@@ -92,10 +92,10 @@ func (consumer *Consumer) Run(handler Handler) error {
9292
}
9393

9494
handler = func(d Delivery) (action Action) {
95-
if !consumer.handlerMux.TryRLock() {
95+
if !consumer.handlerMu.TryRLock() {
9696
return NackRequeue
9797
}
98-
defer consumer.handlerMux.RUnlock()
98+
defer consumer.handlerMu.RUnlock()
9999
return handler(d)
100100
}
101101

@@ -134,8 +134,8 @@ func (consumer *Consumer) Close() {
134134
}
135135

136136
func (consumer *Consumer) cleanupResources() {
137-
consumer.isClosedMux.Lock()
138-
defer consumer.isClosedMux.Unlock()
137+
consumer.isClosedMu.Lock()
138+
defer consumer.isClosedMu.Unlock()
139139
consumer.isClosed = true
140140
// close the channel so that rabbitmq server knows that the
141141
// consumer has been stopped.
@@ -175,8 +175,8 @@ func (consumer *Consumer) startGoroutines(
175175
handler Handler,
176176
options ConsumerOptions,
177177
) error {
178-
consumer.isClosedMux.Lock()
179-
defer consumer.isClosedMux.Unlock()
178+
consumer.isClosedMu.Lock()
179+
defer consumer.isClosedMu.Unlock()
180180
err := consumer.chanManager.QosSafe(
181181
options.QOSPrefetch,
182182
0,
@@ -221,8 +221,8 @@ func (consumer *Consumer) startGoroutines(
221221
}
222222

223223
func (consumer *Consumer) getIsClosed() bool {
224-
consumer.isClosedMux.RLock()
225-
defer consumer.isClosedMux.RUnlock()
224+
consumer.isClosedMu.RLock()
225+
defer consumer.isClosedMu.RUnlock()
226226
return consumer.isClosed
227227
}
228228

@@ -266,8 +266,8 @@ func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
266266
}
267267
c := make(chan struct{})
268268
go func() {
269-
consumer.handlerMux.Lock()
270-
defer consumer.handlerMux.Unlock()
269+
consumer.handlerMu.Lock()
270+
defer consumer.handlerMu.Unlock()
271271
close(c)
272272
}()
273273
select {

internal/channelmanager/channel_manager.go

+24-24
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import (
1313

1414
// ChannelManager -
1515
type ChannelManager struct {
16-
logger logger.Logger
17-
channel *amqp.Channel
18-
connManager *connectionmanager.ConnectionManager
19-
channelMux *sync.RWMutex
20-
reconnectInterval time.Duration
21-
reconnectionCount uint
22-
reconnectionCountMux *sync.Mutex
23-
dispatcher *dispatcher.Dispatcher
16+
logger logger.Logger
17+
channel *amqp.Channel
18+
connManager *connectionmanager.ConnectionManager
19+
channelMu *sync.RWMutex
20+
reconnectInterval time.Duration
21+
reconnectionCount uint
22+
reconnectionCountMu *sync.Mutex
23+
dispatcher *dispatcher.Dispatcher
2424
}
2525

2626
// NewChannelManager creates a new connection manager
@@ -31,14 +31,14 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log
3131
}
3232

3333
chanManager := ChannelManager{
34-
logger: log,
35-
connManager: connManager,
36-
channel: ch,
37-
channelMux: &sync.RWMutex{},
38-
reconnectInterval: reconnectInterval,
39-
reconnectionCount: 0,
40-
reconnectionCountMux: &sync.Mutex{},
41-
dispatcher: dispatcher.NewDispatcher(),
34+
logger: log,
35+
connManager: connManager,
36+
channel: ch,
37+
channelMu: &sync.RWMutex{},
38+
reconnectInterval: reconnectInterval,
39+
reconnectionCount: 0,
40+
reconnectionCountMu: &sync.Mutex{},
41+
dispatcher: dispatcher.NewDispatcher(),
4242
}
4343
go chanManager.startNotifyCancelOrClosed()
4444
return &chanManager, nil
@@ -84,14 +84,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {
8484

8585
// GetReconnectionCount -
8686
func (chanManager *ChannelManager) GetReconnectionCount() uint {
87-
chanManager.reconnectionCountMux.Lock()
88-
defer chanManager.reconnectionCountMux.Unlock()
87+
chanManager.reconnectionCountMu.Lock()
88+
defer chanManager.reconnectionCountMu.Unlock()
8989
return chanManager.reconnectionCount
9090
}
9191

9292
func (chanManager *ChannelManager) incrementReconnectionCount() {
93-
chanManager.reconnectionCountMux.Lock()
94-
defer chanManager.reconnectionCountMux.Unlock()
93+
chanManager.reconnectionCountMu.Lock()
94+
defer chanManager.reconnectionCountMu.Unlock()
9595
chanManager.reconnectionCount++
9696
}
9797

@@ -113,8 +113,8 @@ func (chanManager *ChannelManager) reconnectLoop() {
113113

114114
// reconnect safely closes the current channel and obtains a new one
115115
func (chanManager *ChannelManager) reconnect() error {
116-
chanManager.channelMux.Lock()
117-
defer chanManager.channelMux.Unlock()
116+
chanManager.channelMu.Lock()
117+
defer chanManager.channelMu.Unlock()
118118
newChannel, err := getNewChannel(chanManager.connManager)
119119
if err != nil {
120120
return err
@@ -131,8 +131,8 @@ func (chanManager *ChannelManager) reconnect() error {
131131
// Close safely closes the current channel and connection
132132
func (chanManager *ChannelManager) Close() error {
133133
chanManager.logger.Infof("closing channel manager...")
134-
chanManager.channelMux.Lock()
135-
defer chanManager.channelMux.Unlock()
134+
chanManager.channelMu.Lock()
135+
defer chanManager.channelMu.Unlock()
136136

137137
err := chanManager.channel.Close()
138138
if err != nil {

internal/channelmanager/safe_wraps.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ func (chanManager *ChannelManager) ConsumeSafe(
1616
noWait bool,
1717
args amqp.Table,
1818
) (<-chan amqp.Delivery, error) {
19-
chanManager.channelMux.RLock()
20-
defer chanManager.channelMux.RUnlock()
19+
chanManager.channelMu.RLock()
20+
defer chanManager.channelMu.RUnlock()
2121

2222
return chanManager.channel.Consume(
2323
queue,
@@ -39,8 +39,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
3939
noWait bool,
4040
args amqp.Table,
4141
) (amqp.Queue, error) {
42-
chanManager.channelMux.RLock()
43-
defer chanManager.channelMux.RUnlock()
42+
chanManager.channelMu.RLock()
43+
defer chanManager.channelMu.RUnlock()
4444

4545
return chanManager.channel.QueueDeclarePassive(
4646
name,
@@ -56,8 +56,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
5656
func (chanManager *ChannelManager) QueueDeclareSafe(
5757
name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table,
5858
) (amqp.Queue, error) {
59-
chanManager.channelMux.RLock()
60-
defer chanManager.channelMux.RUnlock()
59+
chanManager.channelMu.RLock()
60+
defer chanManager.channelMu.RUnlock()
6161

6262
return chanManager.channel.QueueDeclare(
6363
name,
@@ -73,8 +73,8 @@ func (chanManager *ChannelManager) QueueDeclareSafe(
7373
func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
7474
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
7575
) error {
76-
chanManager.channelMux.RLock()
77-
defer chanManager.channelMux.RUnlock()
76+
chanManager.channelMu.RLock()
77+
defer chanManager.channelMu.RUnlock()
7878

7979
return chanManager.channel.ExchangeDeclarePassive(
8080
name,
@@ -91,8 +91,8 @@ func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
9191
func (chanManager *ChannelManager) ExchangeDeclareSafe(
9292
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
9393
) error {
94-
chanManager.channelMux.RLock()
95-
defer chanManager.channelMux.RUnlock()
94+
chanManager.channelMu.RLock()
95+
defer chanManager.channelMu.RUnlock()
9696

9797
return chanManager.channel.ExchangeDeclare(
9898
name,
@@ -109,8 +109,8 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe(
109109
func (chanManager *ChannelManager) QueueBindSafe(
110110
name string, key string, exchange string, noWait bool, args amqp.Table,
111111
) error {
112-
chanManager.channelMux.RLock()
113-
defer chanManager.channelMux.RUnlock()
112+
chanManager.channelMu.RLock()
113+
defer chanManager.channelMu.RUnlock()
114114

115115
return chanManager.channel.QueueBind(
116116
name,
@@ -125,8 +125,8 @@ func (chanManager *ChannelManager) QueueBindSafe(
125125
func (chanManager *ChannelManager) QosSafe(
126126
prefetchCount int, prefetchSize int, global bool,
127127
) error {
128-
chanManager.channelMux.RLock()
129-
defer chanManager.channelMux.RUnlock()
128+
chanManager.channelMu.RLock()
129+
defer chanManager.channelMu.RUnlock()
130130

131131
return chanManager.channel.Qos(
132132
prefetchCount,
@@ -141,8 +141,8 @@ PublishSafe safely wraps the (*amqp.Channel).Publish method.
141141
func (chanManager *ChannelManager) PublishSafe(
142142
exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
143143
) error {
144-
chanManager.channelMux.RLock()
145-
defer chanManager.channelMux.RUnlock()
144+
chanManager.channelMu.RLock()
145+
defer chanManager.channelMu.RUnlock()
146146

147147
return chanManager.channel.PublishWithContext(
148148
context.Background(),
@@ -158,8 +158,8 @@ func (chanManager *ChannelManager) PublishSafe(
158158
func (chanManager *ChannelManager) PublishWithContextSafe(
159159
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
160160
) error {
161-
chanManager.channelMux.RLock()
162-
defer chanManager.channelMux.RUnlock()
161+
chanManager.channelMu.RLock()
162+
defer chanManager.channelMu.RUnlock()
163163

164164
return chanManager.channel.PublishWithContext(
165165
ctx,
@@ -174,8 +174,8 @@ func (chanManager *ChannelManager) PublishWithContextSafe(
174174
func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
175175
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
176176
) (*amqp.DeferredConfirmation, error) {
177-
chanManager.channelMux.RLock()
178-
defer chanManager.channelMux.RUnlock()
177+
chanManager.channelMu.RLock()
178+
defer chanManager.channelMu.RUnlock()
179179

180180
return chanManager.channel.PublishWithDeferredConfirmWithContext(
181181
ctx,
@@ -191,8 +191,8 @@ func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
191191
func (chanManager *ChannelManager) NotifyReturnSafe(
192192
c chan amqp.Return,
193193
) chan amqp.Return {
194-
chanManager.channelMux.RLock()
195-
defer chanManager.channelMux.RUnlock()
194+
chanManager.channelMu.RLock()
195+
defer chanManager.channelMu.RUnlock()
196196

197197
return chanManager.channel.NotifyReturn(
198198
c,
@@ -203,8 +203,8 @@ func (chanManager *ChannelManager) NotifyReturnSafe(
203203
func (chanManager *ChannelManager) ConfirmSafe(
204204
noWait bool,
205205
) error {
206-
chanManager.channelMux.Lock()
207-
defer chanManager.channelMux.Unlock()
206+
chanManager.channelMu.Lock()
207+
defer chanManager.channelMu.Unlock()
208208

209209
return chanManager.channel.Confirm(
210210
noWait,
@@ -215,8 +215,8 @@ func (chanManager *ChannelManager) ConfirmSafe(
215215
func (chanManager *ChannelManager) NotifyPublishSafe(
216216
confirm chan amqp.Confirmation,
217217
) chan amqp.Confirmation {
218-
chanManager.channelMux.RLock()
219-
defer chanManager.channelMux.RUnlock()
218+
chanManager.channelMu.RLock()
219+
defer chanManager.channelMu.RUnlock()
220220

221221
return chanManager.channel.NotifyPublish(
222222
confirm,
@@ -227,8 +227,8 @@ func (chanManager *ChannelManager) NotifyPublishSafe(
227227
func (chanManager *ChannelManager) NotifyFlowSafe(
228228
c chan bool,
229229
) chan bool {
230-
chanManager.channelMux.RLock()
231-
defer chanManager.channelMux.RUnlock()
230+
chanManager.channelMu.RLock()
231+
defer chanManager.channelMu.RUnlock()
232232

233233
return chanManager.channel.NotifyFlow(
234234
c,

0 commit comments

Comments
 (0)