From c06e61347126527e81041a06dc6052b0bd34885d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 7 Mar 2025 01:58:07 +0800 Subject: [PATCH 1/3] [fix][client] Fix building broken batched message when publishing --- .../protocol/ProducerBatchSendTest.java | 118 ++++++++++++++++++ .../impl/BatchMessageContainerBase.java | 5 + .../impl/BatchMessageContainerImpl.java | 14 ++- .../impl/BatchMessageKeyBasedContainer.java | 7 ++ .../pulsar/client/impl/ProducerImpl.java | 4 +- .../pulsar/common/protocol/Commands.java | 3 +- 6 files changed, 145 insertions(+), 6 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java new file mode 100644 index 0000000000000..cc931eb1bce24 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import static org.mockito.Mockito.doAnswer; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test +public class ProducerBatchSendTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30_000) + public void testNoEnoughMemSend() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true) + .batchingMaxMessages(Integer.MAX_VALUE).batchingMaxPublishDelay(1, TimeUnit.HOURS).create(); + + /** + * The method {@link org.apache.pulsar.client.impl.BatchMessageContainerImpl#createOpSendMsg} may fail due to + * many errors, such like allocate more memory failed when calling + * {@link Commands#serializeCommandSendWithSize}. We mock an error here. + */ + AtomicBoolean failure = new AtomicBoolean(true); + BaseCommand threadLocalBaseCommand = Commands.LOCAL_BASE_COMMAND.get(); + BaseCommand spyBaseCommand = spy(threadLocalBaseCommand); + doAnswer(invocation -> { + if (failure.get()) { + throw new RuntimeException("mocked exception"); + } else { + return invocation.callRealMethod(); + } + }).when(spyBaseCommand).setSend(); + Commands.LOCAL_BASE_COMMAND.set(spyBaseCommand); + + // Failed sending 3 times. + producer.sendAsync("1"); + producer.flushAsync(); + producer.sendAsync("2"); + producer.flushAsync(); + producer.sendAsync("3"); + producer.flushAsync(); + // Publishing is finished eventually. + failure.set(false); + producer.flush(); + Awaitility.await().untilAsserted(() -> { + assertTrue(admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog() > 0); + }); + + // Verify: all messages can be consumed. + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subscription).subscribe(); + Message msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + assertEquals(msg1.getValue(), "1"); + Message msg2 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), "2"); + Message msg3 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg3); + assertEquals(msg3.getValue(), "3"); + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topic, false); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java index ddbe1bc255779..ee9e8262275fa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java @@ -89,4 +89,9 @@ public interface BatchMessageContainerBase extends BatchMessageContainer { * @return the timestamp in nanoseconds or 0L if the batch container is empty */ long getFirstAddedTimestamp(); + + /** + * Clear the container's payload if build {@link OpSendMsg} failed. + */ + void resetPayloadAfterFailedPublishing(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 9e0eeafc47841..489a3752332de 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -162,13 +162,11 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean clientOperation) } catch (Throwable th) { // serializing batch message can corrupt the index of message and batch-message. Reset the index so, // next iteration doesn't send corrupt message to broker. - for (int j = 0; j <= i; j++) { - MessageImpl previousMsg = messages.get(j); - previousMsg.getDataBuffer().resetReaderIndex(); - } batchedMessageMetadataAndPayload.writerIndex(batchWriteIndex); batchedMessageMetadataAndPayload.readerIndex(batchReadIndex); throw new RuntimeException(th); + } finally { + msg.getDataBuffer().resetReaderIndex(); } } @@ -343,6 +341,14 @@ public OpSendMsg createOpSendMsg() throws IOException { return op; } + @Override + public void resetPayloadAfterFailedPublishing() { + if (batchedMessageMetadataAndPayload != null) { + batchedMessageMetadataAndPayload.readerIndex(0); + batchedMessageMetadataAndPayload.writerIndex(0); + } + } + protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) { int delta = updatedSizeBytes - batchAllocatedSizeBytes; batchAllocatedSizeBytes = updatedSizeBytes; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index 1592d3cae6cb5..f6e3f5a683edc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -127,6 +127,13 @@ public List createOpSendMsgs() throws IOException { } } + @Override + public void resetPayloadAfterFailedPublishing() { + for (BatchMessageContainerImpl batch : batches.values()) { + batch.resetPayloadAfterFailedPublishing(); + } + } + @Override public boolean hasSameSchema(MessageImpl msg) { String key = getKey(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 0b1f8edf1072c..a6faeb18346b0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2353,7 +2353,9 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) { processOpSendMsg(opSendMsg); } } catch (Throwable t) { - log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); + batchMessageContainer.resetPayloadAfterFailedPublishing(); + log.warn("[{}] [{}] error while create opSendMsg by batch message container, and reset payloads", + topic, producerName, t); } finally { if (shouldScheduleNextBatchFlush) { maybeScheduleBatchFlushTask(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 95053e5e7e0a7..2cb4f9a40e376 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -130,7 +130,8 @@ public class Commands { public static final short magicBrokerEntryMetadata = 0x0e02; private static final int checksumSize = 4; - private static final FastThreadLocal LOCAL_BASE_COMMAND = new FastThreadLocal() { + @VisibleForTesting + static final FastThreadLocal LOCAL_BASE_COMMAND = new FastThreadLocal() { @Override protected BaseCommand initialValue() throws Exception { return new BaseCommand(); From add2c58dcb776a58b323af83499bf01bdae1728d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 7 Mar 2025 11:32:41 +0800 Subject: [PATCH 2/3] address comment --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a6faeb18346b0..7e02f5b552cf3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2353,9 +2353,10 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) { processOpSendMsg(opSendMsg); } } catch (Throwable t) { + // Since there is a uncompleted payload was built, we should reset it. batchMessageContainer.resetPayloadAfterFailedPublishing(); - log.warn("[{}] [{}] error while create opSendMsg by batch message container, and reset payloads", - topic, producerName, t); + log.warn("[{}] [{}] Failed to create batch message for sending. Batch payloads have been reset and" + + " messages will be retried in subsequent batches.", topic, producerName, t); } finally { if (shouldScheduleNextBatchFlush) { maybeScheduleBatchFlushTask(); From fdfac6a6bd157503b0feb36a8635b6b9466c5239 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 10 Mar 2025 11:03:10 +0800 Subject: [PATCH 3/3] address comment --- .../protocol/ProducerBatchSendTest.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java index cc931eb1bce24..552afbf085c00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java @@ -23,6 +23,9 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.mockito.Mockito.spy; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -38,6 +41,7 @@ import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -57,8 +61,21 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test(timeOut = 30_000) - public void testNoEnoughMemSend() throws Exception { + @DataProvider + public Object[][] flushSend() { + return new Object[][] { + {Collections.emptyList()}, + {Arrays.asList(1)}, + {Arrays.asList(2)}, + {Arrays.asList(3)}, + {Arrays.asList(1, 2)}, + {Arrays.asList(2, 3)}, + {Arrays.asList(1, 2, 3)}, + }; + } + + @Test(timeOut = 30_000, dataProvider = "flushSend") + public void testNoEnoughMemSend(List flushSend) throws Exception { final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); final String subscription = "s1"; admin.topics().createNonPartitionedTopic(topic); @@ -85,11 +102,17 @@ public void testNoEnoughMemSend() throws Exception { // Failed sending 3 times. producer.sendAsync("1"); - producer.flushAsync(); + if (flushSend.contains(1)) { + producer.flushAsync(); + } producer.sendAsync("2"); - producer.flushAsync(); + if (flushSend.contains(2)) { + producer.flushAsync(); + } producer.sendAsync("3"); - producer.flushAsync(); + if (flushSend.contains(3)) { + producer.flushAsync(); + } // Publishing is finished eventually. failure.set(false); producer.flush();