Skip to content

Commit 218cee7

Browse files
committed
fix concurrent map access
1 parent 4c2d99d commit 218cee7

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

lepus.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ type Channel struct {
3939
retc chan amqp.Return
4040
closc chan *amqp.Error
4141

42-
sm map[string]*info
43-
smu *sync.Mutex
42+
sm sync.Map
4443

4544
timeout time.Duration
4645
}
@@ -58,16 +57,15 @@ func SyncChannel(ch *amqp.Channel, err error) (*Channel, error) {
5857
c := &Channel{
5958
Channel: ch,
6059
midPrefix: "lepus-" + strconv.Itoa(int(time.Now().Unix())),
61-
sm: make(map[string]*info),
62-
smu: &sync.Mutex{},
6360
timeout: 2 * time.Second,
6461
}
6562

6663
c.pubc = ch.NotifyPublish(make(chan amqp.Confirmation))
6764
go func() {
6865
for pub := range c.pubc {
6966
smkey := "DeliveryTag-" + strconv.Itoa(int(pub.DeliveryTag))
70-
if inf, ok := c.sm[smkey]; ok {
67+
if iinf, ok := c.sm.Load(smkey); ok {
68+
inf := iinf.(*info)
7169
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StatePublished))
7270
if swapped {
7371
inf.mu.Unlock()
@@ -80,7 +78,8 @@ func SyncChannel(ch *amqp.Channel, err error) (*Channel, error) {
8078
go func() {
8179
for ret := range c.retc {
8280
smkey := ret.MessageId + ret.CorrelationId
83-
if inf, ok := c.sm[smkey]; ok {
81+
if iinf, ok := c.sm.Load(smkey); ok {
82+
inf := iinf.(*info)
8483
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateReturned))
8584
if swapped {
8685
inf.mu.Unlock()
@@ -92,13 +91,15 @@ func SyncChannel(ch *amqp.Channel, err error) (*Channel, error) {
9291
c.closc = ch.NotifyClose(make(chan *amqp.Error))
9392
go func() {
9493
err := <-c.closc
95-
for _, inf := range c.sm {
94+
c.sm.Range(func(key, value interface{}) bool {
95+
inf := value.(*info)
9696
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateClosed))
9797
if swapped {
9898
inf.err = err
9999
inf.mu.Unlock()
100100
}
101-
}
101+
return true
102+
})
102103
}()
103104

104105
return c, nil
@@ -133,16 +134,12 @@ func (c *Channel) PublishAndWait(exchange, key string, mandatory, immediate bool
133134
mkey := msg.MessageId + msg.CorrelationId
134135
tkey := "DeliveryTag-" + strconv.Itoa(int(mid))
135136

136-
c.smu.Lock()
137-
c.sm[mkey] = inf
138-
c.sm[tkey] = inf
139-
c.smu.Unlock()
137+
c.sm.Store(mkey, inf)
138+
c.sm.Store(tkey, inf)
140139

141140
defer func() {
142-
c.smu.Lock()
143-
delete(c.sm, mkey)
144-
delete(c.sm, tkey)
145-
c.smu.Unlock()
141+
c.sm.Delete(mkey)
142+
c.sm.Delete(tkey)
146143
}()
147144

148145
err := c.Publish(exchange, key, mandatory, immediate, msg)

0 commit comments

Comments
 (0)