Skip to content

Commit 5d6886b

Browse files
Fix data race in super stream producer (#397)
* Create test to catch a data race if the producer stopped quickly after start * Fix: prevent data race when publisher connection is closed shortly after start --------- Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 5fefe76 commit 5d6886b

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

pkg/stream/super_stream_producer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,21 +279,20 @@ func (s *SuperStreamProducer) ConnectPartition(partition string) error {
279279
event := <-_closedEvent
280280

281281
s.mutex.Lock()
282+
defer s.mutex.Unlock()
282283
for i := range s.activeProducers {
283284
if s.activeProducers[i].GetStreamName() == gpartion {
284285
s.activeProducers = append(s.activeProducers[:i], s.activeProducers[i+1:]...)
285286
break
286287
}
287288
}
288-
s.mutex.Unlock()
289+
289290
if s.chSuperStreamPartitionClose != nil {
290-
s.mutex.Lock()
291291
s.chSuperStreamPartitionClose <- PPartitionClose{
292292
Partition: gpartion,
293293
Event: event,
294294
Context: s,
295295
}
296-
s.mutex.Unlock()
297296
}
298297
logs.LogDebug("[SuperStreamProducer] chSuperStreamPartitionClose for partition: %s", gpartion)
299298
}(partition, closedEvent)
@@ -327,6 +326,9 @@ func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan Partition
327326
// Event will give the reason of the close
328327
// size is the size of the channel
329328
func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose {
329+
s.mutex.Lock()
330+
defer s.mutex.Unlock()
331+
330332
ch := make(chan PPartitionClose, size)
331333
s.chSuperStreamPartitionClose = ch
332334
return ch

pkg/stream/super_stream_producer_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,5 +532,25 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func()
532532
Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())
533533
Expect(env.Close()).NotTo(HaveOccurred())
534534
})
535+
It("should detect potential data races when sending concurrently", func() {
536+
env, err := NewEnvironment(nil)
537+
Expect(err).NotTo(HaveOccurred())
538+
var superStream = fmt.Sprintf("race-super-stream-%d", time.Now().Unix())
539+
Expect(env.DeclareSuperStream(superStream, NewPartitionsOptions(10))).NotTo(HaveOccurred())
535540

541+
superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions(
542+
NewHashRoutingStrategy(func(message message.StreamMessage) string {
543+
return message.GetApplicationProperties()["routingKey"].(string)
544+
}),
545+
))
546+
Expect(err).NotTo(HaveOccurred())
547+
548+
// example error handling from producer on the client side
549+
go func() {
550+
superProducer.NotifyPartitionClose(1)
551+
}()
552+
553+
Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())
554+
Expect(env.Close()).NotTo(HaveOccurred())
555+
})
536556
})

0 commit comments

Comments
 (0)