Skip to content

Commit 75a62c0

Browse files
authored
fix data race of Consumer.closeHandler (#205)
* fix data race of Consumer.closeHandler * use defer
1 parent 3f4e822 commit 75a62c0

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

pkg/stream/consumer.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,14 @@ func (consumer *Consumer) setCurrentOffset(offset int64) {
6666

6767
func (consumer *Consumer) GetOffset() int64 {
6868
consumer.mutex.Lock()
69-
res := consumer.currentOffset
70-
consumer.mutex.Unlock()
71-
return res
69+
defer consumer.mutex.Unlock()
70+
return consumer.currentOffset
7271
}
7372

7473
func (consumer *Consumer) GetLastStoredOffset() int64 {
7574
consumer.mutex.Lock()
76-
res := consumer.lastStoredOffset
77-
consumer.mutex.Unlock()
78-
return res
75+
defer consumer.mutex.Unlock()
76+
return consumer.lastStoredOffset
7977
}
8078

8179
func (consumer *Consumer) updateLastStoredOffset() bool {
@@ -88,6 +86,12 @@ func (consumer *Consumer) updateLastStoredOffset() bool {
8886
return false
8987
}
9088

89+
func (consumer *Consumer) GetCloseHandler() chan Event {
90+
consumer.mutex.Lock()
91+
defer consumer.mutex.Unlock()
92+
return consumer.closeHandler
93+
}
94+
9195
func (consumer *Consumer) NotifyClose() ChannelClose {
9296
consumer.mutex.Lock()
9397
defer consumer.mutex.Unlock()

pkg/stream/coordinator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
8282
reason.StreamName = consumer.GetStreamName()
8383
reason.Name = consumer.GetName()
8484

85-
if consumer.closeHandler != nil {
86-
consumer.closeHandler <- reason
87-
close(consumer.closeHandler)
85+
if closeHandler := consumer.GetCloseHandler(); closeHandler != nil {
86+
closeHandler <- reason
87+
close(closeHandler)
8888
}
8989

9090
return coordinator.removeById(id, coordinator.consumers)

0 commit comments

Comments
 (0)