Skip to content

Zero queue consumer doesn't seem to work properly #1330

Open
@massakam

Description

@massakam

Expected behavior

omitted

Actual behavior

Zero queue consumer has been supported since v0.13.0, but there seem to be some buggy behaviors.

  • Immediately after creating a consumer, I got the following error log:
ERRO[0000] unable to send initial permits to broker      consumerID=1 error="invalid number of permits requested: 0" name=dufbb subscription=sub1 topic="persistent://pulsar/test/t1"
  • If I registered MessageChannel when creating a consumer, its availablePermits was 0. Naturally, even if messages were published to the topic, it was not able to receive any of them.
  • Consumer that receive messages using Receive() rather than MessageChannel worked. However, if the connected topic was unloaded or the broker was restarted, availablePermits became 0 and no messages could be received thereafter.

Steps to reproduce

I ran the following code:

consumer_with_message_channel.go

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    channel := make(chan pulsar.ConsumerMessage)

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:                   "persistent://pulsar/test/t1",
        SubscriptionName:        "sub1",
        ReceiverQueueSize:       0,
        EnableZeroQueueConsumer: true,
        MessageChannel:          channel,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    for out := range channel {
        msg := out.Message
        fmt.Printf("Received: %s (%s)\n", string(msg.Payload()), msg.ID().String())
        consumer.Ack(msg)
    }
}

consumer_without_message_channel.go

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:                   "persistent://pulsar/test/t1",
        SubscriptionName:        "sub1",
        ReceiverQueueSize:       0,
        EnableZeroQueueConsumer: true,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    for i := 0; i < 100; i++ {
        msg, err := consumer.Receive(context.Background())
        if err == nil {
            fmt.Printf("Received: %s (%s)\n", string(msg.Payload()), msg.ID().String())
            consumer.Ack(msg)
        } else {
            log.Fatal(err)
        }
    }
}

System configuration

Pulsar version: v0.14.0

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions