Skip to content

Commit 073a047

Browse files
authored
Fix producers reconnection deadlock (#394)
* Fix deadlock in producer cleanup logic - Added a new test to validate reconnections on partition close events and catching deadlock - Updated `maybeCleanProducers` to remove unnecessary mutex locks to pevent deadlock * Deleted unnecessary mutex locks from , use thre-safe method instead of dirrect access to struct field * Fix datarace: use sync.Map in Coordinator for producers and consumers
1 parent dd20421 commit 073a047

File tree

4 files changed

+149
-87
lines changed

4 files changed

+149
-87
lines changed

pkg/stream/client.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -471,37 +471,41 @@ func (c *Client) closeHartBeat() {
471471
}
472472

473473
func (c *Client) Close() error {
474-
475474
c.closeHartBeat()
476-
for _, p := range c.coordinator.Producers() {
477-
err := c.coordinator.RemoveProducerById(p.(*Producer).id, Event{
475+
c.coordinator.Producers().Range(func(_, p any) bool {
476+
producer := p.(*Producer)
477+
err := c.coordinator.RemoveProducerById(producer.id, Event{
478478
Command: CommandClose,
479-
StreamName: p.(*Producer).GetStreamName(),
480-
Name: p.(*Producer).GetName(),
479+
StreamName: producer.GetStreamName(),
480+
Name: producer.GetName(),
481481
Reason: SocketClosed,
482482
Err: nil,
483483
})
484484

485485
if err != nil {
486486
logs.LogWarn("error removing producer: %s", err)
487487
}
488-
}
489488

490-
for _, cs := range c.coordinator.GetConsumers() {
491-
if cs != nil {
492-
err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{
493-
Command: CommandClose,
494-
StreamName: cs.(*Consumer).GetStreamName(),
495-
Name: cs.(*Consumer).GetName(),
496-
Reason: SocketClosed,
497-
Err: nil,
498-
})
489+
return true
490+
})
499491

500-
if err != nil {
501-
logs.LogWarn("error removing consumer: %s", err)
502-
}
492+
c.coordinator.Consumers().Range(func(_, cs any) bool {
493+
consumer := cs.(*Consumer)
494+
err := c.coordinator.RemoveConsumerById(consumer.ID, Event{
495+
Command: CommandClose,
496+
StreamName: consumer.GetStreamName(),
497+
Name: consumer.GetName(),
498+
Reason: SocketClosed,
499+
Err: nil,
500+
})
501+
502+
if err != nil {
503+
logs.LogWarn("error removing consumer: %s", err)
503504
}
504-
}
505+
506+
return true
507+
})
508+
505509
if c.getSocket().isOpen() {
506510

507511
res := c.coordinator.NewResponse(CommandClose)

pkg/stream/coordinator.go

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111

1212
type Coordinator struct {
1313
counter int
14-
producers map[interface{}]interface{}
15-
consumers map[interface{}]interface{}
14+
producers *sync.Map
15+
consumers *sync.Map
1616
responses map[interface{}]interface{}
1717
nextItemProducer uint8
1818
nextItemConsumer uint8
@@ -43,8 +43,8 @@ type Response struct {
4343

4444
func NewCoordinator() *Coordinator {
4545
return &Coordinator{mutex: &sync.Mutex{},
46-
producers: make(map[interface{}]interface{}),
47-
consumers: make(map[interface{}]interface{}),
46+
producers: &sync.Map{},
47+
consumers: &sync.Map{},
4848
responses: make(map[interface{}]interface{})}
4949
}
5050

@@ -77,7 +77,7 @@ func (coordinator *Coordinator) NewProducer(
7777
confirmMutex: &sync.Mutex{},
7878
onClose: cleanUp,
7979
}
80-
coordinator.producers[lastId] = producer
80+
coordinator.producers.Store(lastId, producer)
8181
return producer, err
8282
}
8383

@@ -89,11 +89,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
8989
return consumer.close(reason)
9090

9191
}
92-
func (coordinator *Coordinator) GetConsumers() map[interface{}]interface{} {
93-
coordinator.mutex.Lock()
94-
defer coordinator.mutex.Unlock()
92+
func (coordinator *Coordinator) Consumers() *sync.Map {
9593
return coordinator.consumers
96-
9794
}
9895

9996
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {
@@ -117,7 +114,7 @@ func (coordinator *Coordinator) RemoveResponseById(id interface{}) error {
117114
}
118115

119116
func (coordinator *Coordinator) ProducersCount() int {
120-
return coordinator.count(coordinator.producers)
117+
return coordinator.countSyncMap(coordinator.producers)
121118
}
122119

123120
// response
@@ -198,28 +195,25 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
198195
onClose: cleanUp,
199196
}
200197

201-
coordinator.consumers[lastId] = item
198+
coordinator.consumers.Store(lastId, item)
199+
202200
return item
203201
}
204202

205203
func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error) {
206-
v, err := coordinator.getById(id, coordinator.consumers)
207-
if err != nil {
208-
return nil, err
204+
if consumer, exists := coordinator.consumers.Load(id); exists {
205+
return consumer.(*Consumer), nil
209206
}
210-
return v.(*Consumer), err
207+
208+
return nil, errors.New("item #{id} not found ")
211209
}
212210

213211
func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error) {
214-
coordinator.mutex.Lock()
215-
defer coordinator.mutex.Unlock()
216-
if coordinator.consumers[id] == nil {
217-
return nil, errors.New("item #{id} not found ")
212+
if consumer, exists := coordinator.consumers.LoadAndDelete(id); exists {
213+
return consumer.(*Consumer), nil
218214
}
219-
consumer := coordinator.consumers[id].(*Consumer)
220-
coordinator.consumers[id] = nil
221-
delete(coordinator.consumers, id)
222-
return consumer, nil
215+
216+
return nil, errors.New("item #{id} not found ")
223217
}
224218

225219
func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) {
@@ -231,31 +225,26 @@ func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) {
231225
}
232226

233227
func (coordinator *Coordinator) ConsumersCount() int {
234-
return coordinator.count(coordinator.consumers)
228+
return coordinator.countSyncMap(coordinator.consumers)
235229
}
236230

237231
func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error) {
238-
v, err := coordinator.getById(id, coordinator.producers)
239-
if err != nil {
240-
return nil, err
232+
if producer, exists := coordinator.producers.Load(id); exists {
233+
return producer.(*Producer), nil
241234
}
242-
return v.(*Producer), err
235+
236+
return nil, errors.New("item #{id} not found ")
243237
}
244238

245239
func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error) {
246-
coordinator.mutex.Lock()
247-
defer coordinator.mutex.Unlock()
248-
if coordinator.producers[id] == nil {
249-
return nil, errors.New("item #{id} not found ")
240+
if producer, exists := coordinator.producers.LoadAndDelete(id); exists {
241+
return producer.(*Producer), nil
250242
}
251-
producer := coordinator.producers[id].(*Producer)
252-
coordinator.producers[id] = nil
253-
delete(coordinator.producers, id)
254-
return producer, nil
243+
244+
return nil, errors.New("item #{id} not found ")
255245
}
256246

257247
// general functions
258-
259248
func (coordinator *Coordinator) getById(id interface{}, refmap map[interface{}]interface{}) (interface{}, error) {
260249
coordinator.mutex.Lock()
261250
defer coordinator.mutex.Unlock()
@@ -276,11 +265,16 @@ func (coordinator *Coordinator) removeById(id interface{}, refmap map[interface{
276265
return nil
277266
}
278267

279-
func (coordinator *Coordinator) count(refmap map[interface{}]interface{}) int {
280-
coordinator.mutex.Lock()
281-
defer coordinator.mutex.Unlock()
282-
return len(refmap)
268+
func (coordinator *Coordinator) countSyncMap(refmap *sync.Map) int {
269+
count := 0
270+
refmap.Range(func(_, _ interface{}) bool {
271+
count++
272+
return true
273+
})
274+
275+
return count
283276
}
277+
284278
func (coordinator *Coordinator) getNextProducerItem() (uint8, error) {
285279
if coordinator.nextItemProducer >= ^uint8(0) {
286280
return coordinator.reuseFreeId(coordinator.producers)
@@ -299,11 +293,11 @@ func (coordinator *Coordinator) getNextConsumerItem() (uint8, error) {
299293
return res, nil
300294
}
301295

302-
func (coordinator *Coordinator) reuseFreeId(refMap map[interface{}]interface{}) (byte, error) {
296+
func (coordinator *Coordinator) reuseFreeId(refMap *sync.Map) (byte, error) {
303297
maxValue := int(^uint8(0))
304298
var result byte
305299
for i := 0; i < maxValue; i++ {
306-
if refMap[byte(i)] == nil {
300+
if _, exists := refMap.Load(byte(i)); !exists {
307301
return byte(i), nil
308302
}
309303
result++
@@ -314,8 +308,20 @@ func (coordinator *Coordinator) reuseFreeId(refMap map[interface{}]interface{})
314308
return result, nil
315309
}
316310

317-
func (coordinator *Coordinator) Producers() map[interface{}]interface{} {
318-
coordinator.mutex.Lock()
319-
defer coordinator.mutex.Unlock()
311+
func (coordinator *Coordinator) Producers() *sync.Map {
320312
return coordinator.producers
321313
}
314+
315+
func (coordinator *Coordinator) Close() {
316+
coordinator.producers.Range(func(_, producer interface{}) bool {
317+
_ = producer.(*Producer).Close()
318+
319+
return true
320+
})
321+
322+
coordinator.consumers.Range(func(_, consumer interface{}) bool {
323+
_ = consumer.(*Consumer).Close()
324+
325+
return true
326+
})
327+
}

pkg/stream/environment.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -510,42 +510,43 @@ func (cc *environmentCoordinator) maybeCleanClients() {
510510
}
511511

512512
func (c *Client) maybeCleanProducers(streamName string) {
513-
c.mutex.Lock()
514-
for pidx, producer := range c.coordinator.Producers() {
515-
if producer.(*Producer).GetStreamName() == streamName {
513+
c.coordinator.Producers().Range(func(pidx, p any) bool {
514+
producer := p.(*Producer)
515+
if producer.GetStreamName() == streamName {
516516
err := c.coordinator.RemoveProducerById(pidx.(uint8), Event{
517517
Command: CommandMetadataUpdate,
518518
StreamName: streamName,
519-
Name: producer.(*Producer).GetName(),
519+
Name: producer.GetName(),
520520
Reason: MetaDataUpdate,
521521
Err: nil,
522522
})
523523
if err != nil {
524-
return
524+
return false
525525
}
526526
}
527-
}
528-
c.mutex.Unlock()
529527

528+
return true
529+
})
530530
}
531531

532532
func (c *Client) maybeCleanConsumers(streamName string) {
533-
c.mutex.Lock()
534-
for pidx, consumer := range c.coordinator.consumers {
535-
if consumer.(*Consumer).options.streamName == streamName {
533+
c.coordinator.Consumers().Range(func(pidx, cs any) bool {
534+
consumer := cs.(*Consumer)
535+
if consumer.options.streamName == streamName {
536536
err := c.coordinator.RemoveConsumerById(pidx.(uint8), Event{
537537
Command: CommandMetadataUpdate,
538538
StreamName: streamName,
539-
Name: consumer.(*Consumer).GetName(),
539+
Name: consumer.GetName(),
540540
Reason: MetaDataUpdate,
541541
Err: nil,
542542
})
543543
if err != nil {
544-
return
544+
return false
545545
}
546546
}
547-
}
548-
c.mutex.Unlock()
547+
548+
return true
549+
})
549550
}
550551

551552
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, options *ProducerOptions, rpcTimeout time.Duration, cleanUp func()) (*Producer, error) {
@@ -643,15 +644,9 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
643644
}
644645

645646
func (cc *environmentCoordinator) Close() error {
646-
647647
cc.clientsPerContext.Range(func(key, value any) bool {
648-
client := value.(*Client)
649-
for i := range client.coordinator.producers {
650-
_ = client.coordinator.producers[i].(*Producer).Close()
651-
}
652-
for i := range client.coordinator.consumers {
653-
_ = client.coordinator.consumers[i].(*Consumer).Close()
654-
}
648+
value.(*Client).coordinator.Close()
649+
655650
return true
656651
})
657652

pkg/stream/super_stream_producer_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,4 +476,61 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func()
476476
Expect(env.Close()).NotTo(HaveOccurred())
477477
})
478478

479+
It("should reconnect to the same partition after a close event", func() {
480+
const partitionsCount = 3
481+
env, err := NewEnvironment(nil)
482+
Expect(err).NotTo(HaveOccurred())
483+
484+
var superStream = fmt.Sprintf("reconnect-test-super-stream-%d", time.Now().Unix())
485+
Expect(env.DeclareSuperStream(superStream, NewPartitionsOptions(partitionsCount))).NotTo(HaveOccurred())
486+
487+
superProducer, err := newSuperStreamProducer(env, superStream, &SuperStreamProducerOptions{
488+
RoutingStrategy: NewHashRoutingStrategy(func(msg message.StreamMessage) string {
489+
return msg.GetApplicationProperties()["routingKey"].(string)
490+
}),
491+
})
492+
Expect(err).To(BeNil())
493+
Expect(superProducer).NotTo(BeNil())
494+
Expect(superProducer.init()).NotTo(HaveOccurred())
495+
producers := superProducer.getProducers()
496+
Expect(producers).To(HaveLen(partitionsCount))
497+
partitionToClose := producers[0].GetStreamName()
498+
499+
// Declare synchronization helpers and listeners
500+
partitionCloseEvent := make(chan bool)
501+
502+
// Listen for the partition close event and try to reconnect
503+
go func(ch <-chan PPartitionClose) {
504+
for event := range ch {
505+
err := event.Context.ConnectPartition(event.Partition)
506+
Expect(err).To(BeNil())
507+
508+
partitionCloseEvent <- true
509+
510+
break
511+
512+
}
513+
}(superProducer.NotifyPartitionClose(1))
514+
515+
// Imitates metadataUpdateFrameHandler - it can happen when stream members are changed.
516+
go func() {
517+
client, ok := env.producers.getCoordinators()["localhost:5552"].clientsPerContext.Load(1)
518+
Expect(ok).To(BeTrue())
519+
client.(*Client).maybeCleanProducers(partitionToClose)
520+
}()
521+
522+
// Wait for the partition close event
523+
Eventually(partitionCloseEvent).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Receive())
524+
525+
// Verify that the partition was successfully reconnected
526+
Expect(superProducer.getProducers()).To(HaveLen(partitionsCount))
527+
reconnectedProducer := superProducer.getProducer(partitionToClose)
528+
Expect(reconnectedProducer).NotTo(BeNil())
529+
530+
// Clean up
531+
Expect(superProducer.Close()).NotTo(HaveOccurred())
532+
Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())
533+
Expect(env.Close()).NotTo(HaveOccurred())
534+
})
535+
479536
})

0 commit comments

Comments
 (0)