-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 387] fix goroutine leak for closing consumers. #808
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any chance that c.messageCh can be nil? Just wondering if we need a nil check around closing it since trying to close a nil channel can cause a panic.
Otherwise LGTM
Thanks for the reminder. pulsar-client-go/pulsar/consumer_impl.go Line 101 in 6a8e7f3
Initialization is done when the consumer is created. |
Looks like there is a failure in one of the tests that we're trying to close an already closed channel. I guess in some cases the channel gets closed and others it doesn't? |
@pgier The This is the reason why |
pulsar/consumer_impl.go
Outdated
@@ -563,6 +566,11 @@ func (c *consumer) Close() { | |||
} | |||
wg.Wait() | |||
close(c.closeCh) | |||
closed := closeChanSet[c.messageCh] | |||
if !closed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeChanSet is a global variable. The element in the set is never deleted. Would it be more reasonable to create a isClosed (bool) attribute in the consumer
struct. It can be checked at line 569 and 570 as
if !c.isClosed {
close(c.messageCh)
}
I just do not understand why there is a need to use a global map to track the messageCh channel which will not be GCed. Isn't a leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I thought before, but in the case of multiTopicConsumer
it causes the chan to be closed repeatedly.
defer consumer.Close() |

This is because although the consumers are multiple instances, they use the same messageCh
and can only be closed once.
So I'd like to record the closed flag for messageCh
via a global variable.
With regard to the closeChanSet
leak, we can set it to nil
in the client.Close()
method.
pulsar-client-go/pulsar/client_impl.go
Line 212 in 6a8e7f3
func (c *client) Close() { |
Is there a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use Sync.Once()
? Let closeMsgChOnce
be a attribute in the consumer.
c.closeMsgChOnce.Do(func() {
close(c.messageCh)
})
multiTopicConsumer
uses newInternalConsumer
to creat. You can add a parameter closeMsgChOnce
in newInternalConsumer
, then the consumers also share the Sync.once()
.
pulsar-client-go/pulsar/consumer_impl.go
Lines 208 to 209 in 6a8e7f3
func newInternalConsumer(client *client, options ConsumerOptions, topic string, | |
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) { |
@Gleiphir2769 Good suggestion, thx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remove the redundant code comments? Otherwise, LGTM.
close(c.messageCh) | ||
closeChanSet[c.messageCh] = true | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove these redundant code comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# Conflicts: # pulsar/consumer_regex.go
Fixes #387
Motivation
Fix goroutine leak for closing consumers.
Modifications
Close the
c.messageCh
at the same time as closing the consumer.Before fix: