Skip to content

Commit 1e37f2f

Browse files
authored
Add consumer state check when request commands (apache#772)
* Add consumer state check when request commands Signed-off-by: xiaolongran <xiaolongran@tencent.com> * fix a little Signed-off-by: xiaolongran <xiaolongran@tencent.com>
1 parent ee379ac commit 1e37f2f

1 file changed

Lines changed: 62 additions & 6 deletions

File tree

pulsar/consumer_partition.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package pulsar
1919

2020
import (
21+
"errors"
2122
"fmt"
2223
"math"
2324
"strings"
@@ -278,6 +279,10 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
278279
}
279280

280281
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
282+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
283+
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
284+
return trackingMessageID{}, errors.New("failed to redeliver closing or closed consumer")
285+
}
281286
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
282287
pc.eventsCh <- req
283288

@@ -292,6 +297,11 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
292297
}
293298

294299
func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
300+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
301+
pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer")
302+
return trackingMessageID{}, errors.New("failed to getLastMessageID closing or closed consumer")
303+
}
304+
295305
requestID := pc.client.rpcClient.NewRequestID()
296306
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
297307
RequestId: proto.Uint64(requestID),
@@ -308,6 +318,10 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
308318
}
309319

310320
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
321+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
322+
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
323+
return
324+
}
311325
if !msgID.Undefined() && msgID.ack() {
312326
pc.metrics.AcksCounter.Inc()
313327
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
@@ -331,6 +345,10 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
331345
}
332346

333347
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
348+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
349+
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
350+
return
351+
}
334352
pc.eventsCh <- &redeliveryRequest{msgIds}
335353

336354
iMsgIds := make([]MessageID, len(msgIds))
@@ -341,6 +359,10 @@ func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
341359
}
342360

343361
func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
362+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
363+
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
364+
return
365+
}
344366
msgIds := req.msgIds
345367
pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
346368

@@ -352,11 +374,14 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
352374
}
353375
}
354376

355-
pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
377+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
356378
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
357379
ConsumerId: proto.Uint64(pc.consumerID),
358380
MessageIds: msgIDDataList,
359381
})
382+
if err != nil {
383+
pc.log.Error("Connection was closed when request redeliver cmd")
384+
}
360385
}
361386

362387
func (pc *partitionConsumer) getConsumerState() consumerState {
@@ -381,6 +406,10 @@ func (pc *partitionConsumer) Close() {
381406
}
382407

383408
func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
409+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
410+
pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer")
411+
return errors.New("failed to seek by closing or closed consumer")
412+
}
384413
req := &seekRequest{
385414
doneCh: make(chan struct{}),
386415
msgID: msgID,
@@ -407,7 +436,7 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error {
407436
func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
408437
state := pc.getConsumerState()
409438
if state == consumerClosing || state == consumerClosed {
410-
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
439+
pc.log.WithField("state", state).Error("failed seek by consumer is closing or has closed")
411440
return nil
412441
}
413442

@@ -434,6 +463,10 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
434463
}
435464

436465
func (pc *partitionConsumer) SeekByTime(time time.Time) error {
466+
if state := pc.getConsumerState(); state == consumerClosing || state == consumerClosed {
467+
pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
468+
return errors.New("failed seekByTime by consumer is closing or has closed")
469+
}
437470
req := &seekByTimeRequest{
438471
doneCh: make(chan struct{}),
439472
publishTime: time,
@@ -450,7 +483,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
450483

451484
state := pc.getConsumerState()
452485
if state == consumerClosing || state == consumerClosed {
453-
pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
486+
pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
454487
return
455488
}
456489

@@ -477,6 +510,10 @@ func (pc *partitionConsumer) clearMessageChannels() {
477510
}
478511

479512
func (pc *partitionConsumer) internalAck(req *ackRequest) {
513+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
514+
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
515+
return
516+
}
480517
msgID := req.msgID
481518

482519
messageIDs := make([]*pb.MessageIdData, 1)
@@ -491,7 +528,10 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
491528
AckType: pb.CommandAck_Individual.Enum(),
492529
}
493530

494-
pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
531+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
532+
if err != nil {
533+
pc.log.Error("Connection was closed when request ack cmd")
534+
}
495535
}
496536

497537
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -728,6 +768,10 @@ func (pc *partitionConsumer) ConnectionClosed() {
728768
// before the application is ready to consume them. After the consumer is ready,
729769
// the client needs to give permission to the broker to push messages.
730770
func (pc *partitionConsumer) internalFlow(permits uint32) error {
771+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
772+
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
773+
return errors.New("consumer closing or closed")
774+
}
731775
if permits == 0 {
732776
return fmt.Errorf("invalid number of permits requested: %d", permits)
733777
}
@@ -736,7 +780,11 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
736780
ConsumerId: proto.Uint64(pc.consumerID),
737781
MessagePermits: proto.Uint32(permits),
738782
}
739-
pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
783+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
784+
if err != nil {
785+
pc.log.Error("Connection was closed when request flow cmd")
786+
return err
787+
}
740788

741789
return nil
742790
}
@@ -1259,18 +1307,26 @@ func (pc *partitionConsumer) initializeCompressionProvider(
12591307

12601308
func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
12611309
validationError pb.CommandAck_ValidationError) {
1310+
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
1311+
pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " +
1312+
"by closing or closed consumer")
1313+
return
1314+
}
12621315
pc.log.WithFields(log.Fields{
12631316
"msgID": msgID,
12641317
"validationError": validationError,
12651318
}).Error("Discarding corrupted message")
12661319

1267-
pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
1320+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
12681321
pb.BaseCommand_ACK, &pb.CommandAck{
12691322
ConsumerId: proto.Uint64(pc.consumerID),
12701323
MessageId: []*pb.MessageIdData{msgID},
12711324
AckType: pb.CommandAck_Individual.Enum(),
12721325
ValidationError: validationError.Enum(),
12731326
})
1327+
if err != nil {
1328+
pc.log.Error("Connection was closed when request ack cmd")
1329+
}
12741330
}
12751331

12761332
// _setConn sets the internal connection field of this partition consumer atomically.

0 commit comments

Comments
 (0)