Skip to content

[Bug] Chunk message is incomplete when resending to a different cluster #24153

Open
@nodece

Description

@nodece

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

  • Pulsar client: 4.0.2
  • Java: 17.0.x
  • Broker: 4.0.3

Minimal reproduce step

@Test
public void chunkMessage() throws Exception {
    String r1="pulsar://localhost:6650";
    String r2="pulsar://localhost:6651";
    @Cleanup
    PulsarClient pulsarClient = PulsarClient.builder()
            .serviceUrl(r1)
            .build();
    String topic = "test" + System.nanoTime();
    // Send a message to the r1 cluster.
    @Cleanup
    Producer<byte[]> producer = pulsarClient.newProducer()
            .enableBatching(false)
            .enableChunking(true)
            .chunkMaxMessageSize(10)
            .topic(topic).create();
    // When sending the chunk message, update the service URL to r2.
    // Assuming the chunk message [0,1,2,3,4,5], when the url to r2, and then the r1 holds [0,1,2], r2 holds [3,4,5].
    new Thread(() -> {
        try {
            System.out.println("sleep...");
            Thread.sleep(500);
            System.out.println("update...");
            // Update the service URL to r2.
            pulsarClient.updateServiceUrl(r2);
        } catch (PulsarClientException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();
    try {
        System.out.println("send...");
        MessageId send = producer.send(new byte[4000000]);
        System.out.println(send);
        System.out.println("send ok...");
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    // Try to receive the message from r2.
    @Cleanup
    PulsarClient pulsarClient2 = PulsarClient.builder()
            .serviceUrl(r2)
            .build();
    @Cleanup
    Consumer<byte[]> consumer = pulsarClient2.newConsumer()
            .topic(topic)
            .subscriptionName("test")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscribe();
    Message<byte[]> receive = consumer.receive(30, TimeUnit.SECONDS);
    assertNotNull(receive); // Got null.
}

What did you expect to see?

I can receive a chunk message from the second cluster.

What did you see instead?

I cannot receive a chunk message from the second cluster.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions