Skip to content

Commit 25f3075

Browse files
authored
Fix unsubscribe blocked when consumer is closing or has closed (#457)
### Motivation For the present consumer, `Close()` and `Unsubscribe()` handled by the same eventloop goroutine. The eventloop exited after `Close()`, then unsubscribe event wouldn't be selected and handled anymore, lead to block. example: ```go func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) if err != nil { log.Fatal(err) } defer client.Close() consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", }) if err != nil { log.Fatal(err) } defer consumer.Unsubscribe() // unintentional defer consumer.Close() } ``` `Unsubscribe()` blocked: ![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png) ### Modifications Check consumer state before send unsubscribe event, if consumer is closing or has closed, just logging it ### Verifying this change - [x] Make sure that the change passes the CI checks.
1 parent 0e76dc2 commit 25f3075

File tree

1 file changed

+25
-6
lines changed

1 file changed

+25
-6
lines changed

pulsar/consumer_partition.go

+25-6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,21 @@ const (
4747
consumerClosed
4848
)
4949

50+
func (s consumerState) String() string {
51+
switch s {
52+
case consumerInit:
53+
return "Initializing"
54+
case consumerReady:
55+
return "Ready"
56+
case consumerClosing:
57+
return "Closing"
58+
case consumerClosed:
59+
return "Closed"
60+
default:
61+
return "Unknown"
62+
}
63+
}
64+
5065
type subscriptionMode int
5166

5267
const (
@@ -195,6 +210,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
195210
}
196211

197212
func (pc *partitionConsumer) Unsubscribe() error {
213+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
214+
pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
215+
return nil
216+
}
217+
198218
req := &unsubscribeRequest{doneCh: make(chan struct{})}
199219
pc.eventsCh <- req
200220

@@ -206,9 +226,8 @@ func (pc *partitionConsumer) Unsubscribe() error {
206226
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
207227
defer close(unsub.doneCh)
208228

209-
state := pc.getConsumerState()
210-
if state == consumerClosed || state == consumerClosing {
211-
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
229+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
230+
pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
212231
return
213232
}
214233

@@ -354,7 +373,7 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
354373
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
355374
state := pc.getConsumerState()
356375
if state == consumerClosing || state == consumerClosed {
357-
pc.log.Error("Consumer was already closed")
376+
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
358377
return nil
359378
}
360379

@@ -398,7 +417,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
398417

399418
state := pc.getConsumerState()
400419
if state == consumerClosing || state == consumerClosed {
401-
pc.log.Error("Consumer was already closed")
420+
pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
402421
return
403422
}
404423

@@ -798,7 +817,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
798817
}
799818

800819
if state == consumerClosed || state == consumerClosing {
801-
pc.log.Error("The consumer is closing or has been closed")
820+
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
802821
if pc.nackTracker != nil {
803822
pc.nackTracker.Close()
804823
}

0 commit comments

Comments
 (0)