Skip to content

Commit 8e18cd3

Browse files
authored
Merge pull request #168 from wagslane/lw_rename
Rename mux->mu and update amqp
2 parents 7fca926 + 5546631 commit 8e18cd3

19 files changed

+457
-291
lines changed

consume.go

+15-17
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ type Consumer struct {
3333
reconnectErrCh <-chan error
3434
closeConnectionToManagerCh chan<- struct{}
3535
options ConsumerOptions
36-
handlerMux *sync.RWMutex
36+
handlerMu *sync.RWMutex
3737

38-
isClosedMux *sync.RWMutex
39-
isClosed bool
38+
isClosedMu *sync.RWMutex
39+
isClosed bool
4040
}
4141

4242
// Delivery captures the fields for a previously delivered message resident in
@@ -73,7 +73,7 @@ func NewConsumer(
7373
reconnectErrCh: reconnectErrCh,
7474
closeConnectionToManagerCh: closeCh,
7575
options: *options,
76-
isClosedMux: &sync.RWMutex{},
76+
isClosedMu: &sync.RWMutex{},
7777
isClosed: false,
7878
}
7979

@@ -92,10 +92,10 @@ func (consumer *Consumer) Run(handler Handler) error {
9292
}
9393

9494
handler = func(d Delivery) (action Action) {
95-
if !consumer.handlerMux.TryRLock() {
95+
if !consumer.handlerMu.TryRLock() {
9696
return NackRequeue
9797
}
98-
defer consumer.handlerMux.RUnlock()
98+
defer consumer.handlerMu.RUnlock()
9999
return handler(d)
100100
}
101101

@@ -134,8 +134,8 @@ func (consumer *Consumer) Close() {
134134
}
135135

136136
func (consumer *Consumer) cleanupResources() {
137-
consumer.isClosedMux.Lock()
138-
defer consumer.isClosedMux.Unlock()
137+
consumer.isClosedMu.Lock()
138+
defer consumer.isClosedMu.Unlock()
139139
consumer.isClosed = true
140140
// close the channel so that rabbitmq server knows that the
141141
// consumer has been stopped.
@@ -175,8 +175,8 @@ func (consumer *Consumer) startGoroutines(
175175
handler Handler,
176176
options ConsumerOptions,
177177
) error {
178-
consumer.isClosedMux.Lock()
179-
defer consumer.isClosedMux.Unlock()
178+
consumer.isClosedMu.Lock()
179+
defer consumer.isClosedMu.Unlock()
180180
err := consumer.chanManager.QosSafe(
181181
options.QOSPrefetch,
182182
0,
@@ -221,8 +221,8 @@ func (consumer *Consumer) startGoroutines(
221221
}
222222

223223
func (consumer *Consumer) getIsClosed() bool {
224-
consumer.isClosedMux.RLock()
225-
defer consumer.isClosedMux.RUnlock()
224+
consumer.isClosedMu.RLock()
225+
defer consumer.isClosedMu.RUnlock()
226226
return consumer.isClosed
227227
}
228228

@@ -259,15 +259,13 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
259259
}
260260

261261
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
262-
if ctx == nil {
263-
ctx = context.Background()
264-
} else if ctx.Err() != nil {
262+
if ctx.Err() != nil {
265263
return ctx.Err()
266264
}
267265
c := make(chan struct{})
268266
go func() {
269-
consumer.handlerMux.Lock()
270-
defer consumer.handlerMux.Unlock()
267+
consumer.handlerMu.Lock()
268+
defer consumer.handlerMu.Unlock()
271269
close(c)
272270
}()
273271
select {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq
22

33
go 1.20
44

5-
require github.com/rabbitmq/amqp091-go v1.9.0
5+
require github.com/rabbitmq/amqp091-go v1.10.0

go.sum

+3-48
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,3 @@
1-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
4-
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
5-
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
6-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
7-
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
8-
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
9-
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
10-
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
11-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
12-
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
13-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
14-
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
15-
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
16-
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
17-
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
18-
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
19-
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
20-
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
21-
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
22-
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
23-
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
24-
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
25-
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
26-
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
27-
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
28-
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
29-
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
30-
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
31-
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
32-
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
33-
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
34-
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
35-
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
36-
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
37-
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
38-
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
39-
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
40-
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
41-
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
42-
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
43-
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
44-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
45-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
46-
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
47-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
48-
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
1+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
2+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
3+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

internal/channelmanager/channel_manager.go

+24-24
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import (
1313

1414
// ChannelManager -
1515
type ChannelManager struct {
16-
logger logger.Logger
17-
channel *amqp.Channel
18-
connManager *connectionmanager.ConnectionManager
19-
channelMux *sync.RWMutex
20-
reconnectInterval time.Duration
21-
reconnectionCount uint
22-
reconnectionCountMux *sync.Mutex
23-
dispatcher *dispatcher.Dispatcher
16+
logger logger.Logger
17+
channel *amqp.Channel
18+
connManager *connectionmanager.ConnectionManager
19+
channelMu *sync.RWMutex
20+
reconnectInterval time.Duration
21+
reconnectionCount uint
22+
reconnectionCountMu *sync.Mutex
23+
dispatcher *dispatcher.Dispatcher
2424
}
2525

2626
// NewChannelManager creates a new connection manager
@@ -31,14 +31,14 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log
3131
}
3232

3333
chanManager := ChannelManager{
34-
logger: log,
35-
connManager: connManager,
36-
channel: ch,
37-
channelMux: &sync.RWMutex{},
38-
reconnectInterval: reconnectInterval,
39-
reconnectionCount: 0,
40-
reconnectionCountMux: &sync.Mutex{},
41-
dispatcher: dispatcher.NewDispatcher(),
34+
logger: log,
35+
connManager: connManager,
36+
channel: ch,
37+
channelMu: &sync.RWMutex{},
38+
reconnectInterval: reconnectInterval,
39+
reconnectionCount: 0,
40+
reconnectionCountMu: &sync.Mutex{},
41+
dispatcher: dispatcher.NewDispatcher(),
4242
}
4343
go chanManager.startNotifyCancelOrClosed()
4444
return &chanManager, nil
@@ -84,14 +84,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {
8484

8585
// GetReconnectionCount -
8686
func (chanManager *ChannelManager) GetReconnectionCount() uint {
87-
chanManager.reconnectionCountMux.Lock()
88-
defer chanManager.reconnectionCountMux.Unlock()
87+
chanManager.reconnectionCountMu.Lock()
88+
defer chanManager.reconnectionCountMu.Unlock()
8989
return chanManager.reconnectionCount
9090
}
9191

9292
func (chanManager *ChannelManager) incrementReconnectionCount() {
93-
chanManager.reconnectionCountMux.Lock()
94-
defer chanManager.reconnectionCountMux.Unlock()
93+
chanManager.reconnectionCountMu.Lock()
94+
defer chanManager.reconnectionCountMu.Unlock()
9595
chanManager.reconnectionCount++
9696
}
9797

@@ -113,8 +113,8 @@ func (chanManager *ChannelManager) reconnectLoop() {
113113

114114
// reconnect safely closes the current channel and obtains a new one
115115
func (chanManager *ChannelManager) reconnect() error {
116-
chanManager.channelMux.Lock()
117-
defer chanManager.channelMux.Unlock()
116+
chanManager.channelMu.Lock()
117+
defer chanManager.channelMu.Unlock()
118118
newChannel, err := getNewChannel(chanManager.connManager)
119119
if err != nil {
120120
return err
@@ -131,8 +131,8 @@ func (chanManager *ChannelManager) reconnect() error {
131131
// Close safely closes the current channel and connection
132132
func (chanManager *ChannelManager) Close() error {
133133
chanManager.logger.Infof("closing channel manager...")
134-
chanManager.channelMux.Lock()
135-
defer chanManager.channelMux.Unlock()
134+
chanManager.channelMu.Lock()
135+
defer chanManager.channelMu.Unlock()
136136

137137
err := chanManager.channel.Close()
138138
if err != nil {

internal/channelmanager/safe_wraps.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ func (chanManager *ChannelManager) ConsumeSafe(
1616
noWait bool,
1717
args amqp.Table,
1818
) (<-chan amqp.Delivery, error) {
19-
chanManager.channelMux.RLock()
20-
defer chanManager.channelMux.RUnlock()
19+
chanManager.channelMu.RLock()
20+
defer chanManager.channelMu.RUnlock()
2121

2222
return chanManager.channel.Consume(
2323
queue,
@@ -39,8 +39,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
3939
noWait bool,
4040
args amqp.Table,
4141
) (amqp.Queue, error) {
42-
chanManager.channelMux.RLock()
43-
defer chanManager.channelMux.RUnlock()
42+
chanManager.channelMu.RLock()
43+
defer chanManager.channelMu.RUnlock()
4444

4545
return chanManager.channel.QueueDeclarePassive(
4646
name,
@@ -56,8 +56,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe(
5656
func (chanManager *ChannelManager) QueueDeclareSafe(
5757
name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table,
5858
) (amqp.Queue, error) {
59-
chanManager.channelMux.RLock()
60-
defer chanManager.channelMux.RUnlock()
59+
chanManager.channelMu.RLock()
60+
defer chanManager.channelMu.RUnlock()
6161

6262
return chanManager.channel.QueueDeclare(
6363
name,
@@ -73,8 +73,8 @@ func (chanManager *ChannelManager) QueueDeclareSafe(
7373
func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
7474
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
7575
) error {
76-
chanManager.channelMux.RLock()
77-
defer chanManager.channelMux.RUnlock()
76+
chanManager.channelMu.RLock()
77+
defer chanManager.channelMu.RUnlock()
7878

7979
return chanManager.channel.ExchangeDeclarePassive(
8080
name,
@@ -91,8 +91,8 @@ func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe(
9191
func (chanManager *ChannelManager) ExchangeDeclareSafe(
9292
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
9393
) error {
94-
chanManager.channelMux.RLock()
95-
defer chanManager.channelMux.RUnlock()
94+
chanManager.channelMu.RLock()
95+
defer chanManager.channelMu.RUnlock()
9696

9797
return chanManager.channel.ExchangeDeclare(
9898
name,
@@ -109,8 +109,8 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe(
109109
func (chanManager *ChannelManager) QueueBindSafe(
110110
name string, key string, exchange string, noWait bool, args amqp.Table,
111111
) error {
112-
chanManager.channelMux.RLock()
113-
defer chanManager.channelMux.RUnlock()
112+
chanManager.channelMu.RLock()
113+
defer chanManager.channelMu.RUnlock()
114114

115115
return chanManager.channel.QueueBind(
116116
name,
@@ -125,8 +125,8 @@ func (chanManager *ChannelManager) QueueBindSafe(
125125
func (chanManager *ChannelManager) QosSafe(
126126
prefetchCount int, prefetchSize int, global bool,
127127
) error {
128-
chanManager.channelMux.RLock()
129-
defer chanManager.channelMux.RUnlock()
128+
chanManager.channelMu.RLock()
129+
defer chanManager.channelMu.RUnlock()
130130

131131
return chanManager.channel.Qos(
132132
prefetchCount,
@@ -141,8 +141,8 @@ PublishSafe safely wraps the (*amqp.Channel).Publish method.
141141
func (chanManager *ChannelManager) PublishSafe(
142142
exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
143143
) error {
144-
chanManager.channelMux.RLock()
145-
defer chanManager.channelMux.RUnlock()
144+
chanManager.channelMu.RLock()
145+
defer chanManager.channelMu.RUnlock()
146146

147147
return chanManager.channel.PublishWithContext(
148148
context.Background(),
@@ -158,8 +158,8 @@ func (chanManager *ChannelManager) PublishSafe(
158158
func (chanManager *ChannelManager) PublishWithContextSafe(
159159
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
160160
) error {
161-
chanManager.channelMux.RLock()
162-
defer chanManager.channelMux.RUnlock()
161+
chanManager.channelMu.RLock()
162+
defer chanManager.channelMu.RUnlock()
163163

164164
return chanManager.channel.PublishWithContext(
165165
ctx,
@@ -174,8 +174,8 @@ func (chanManager *ChannelManager) PublishWithContextSafe(
174174
func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
175175
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
176176
) (*amqp.DeferredConfirmation, error) {
177-
chanManager.channelMux.RLock()
178-
defer chanManager.channelMux.RUnlock()
177+
chanManager.channelMu.RLock()
178+
defer chanManager.channelMu.RUnlock()
179179

180180
return chanManager.channel.PublishWithDeferredConfirmWithContext(
181181
ctx,
@@ -191,8 +191,8 @@ func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
191191
func (chanManager *ChannelManager) NotifyReturnSafe(
192192
c chan amqp.Return,
193193
) chan amqp.Return {
194-
chanManager.channelMux.RLock()
195-
defer chanManager.channelMux.RUnlock()
194+
chanManager.channelMu.RLock()
195+
defer chanManager.channelMu.RUnlock()
196196

197197
return chanManager.channel.NotifyReturn(
198198
c,
@@ -203,8 +203,8 @@ func (chanManager *ChannelManager) NotifyReturnSafe(
203203
func (chanManager *ChannelManager) ConfirmSafe(
204204
noWait bool,
205205
) error {
206-
chanManager.channelMux.Lock()
207-
defer chanManager.channelMux.Unlock()
206+
chanManager.channelMu.Lock()
207+
defer chanManager.channelMu.Unlock()
208208

209209
return chanManager.channel.Confirm(
210210
noWait,
@@ -215,8 +215,8 @@ func (chanManager *ChannelManager) ConfirmSafe(
215215
func (chanManager *ChannelManager) NotifyPublishSafe(
216216
confirm chan amqp.Confirmation,
217217
) chan amqp.Confirmation {
218-
chanManager.channelMux.RLock()
219-
defer chanManager.channelMux.RUnlock()
218+
chanManager.channelMu.RLock()
219+
defer chanManager.channelMu.RUnlock()
220220

221221
return chanManager.channel.NotifyPublish(
222222
confirm,
@@ -227,8 +227,8 @@ func (chanManager *ChannelManager) NotifyPublishSafe(
227227
func (chanManager *ChannelManager) NotifyFlowSafe(
228228
c chan bool,
229229
) chan bool {
230-
chanManager.channelMux.RLock()
231-
defer chanManager.channelMux.RUnlock()
230+
chanManager.channelMu.RLock()
231+
defer chanManager.channelMu.RUnlock()
232232

233233
return chanManager.channel.NotifyFlow(
234234
c,

0 commit comments

Comments
 (0)