Skip to content

Commit bddd55e

Browse files
poorbarcodenodece
authored andcommitted
[fix][client] Fix building broken batched message when publishing (apache#24061)
(cherry picked from commit c75018a)
1 parent 4f66f94 commit bddd55e

File tree

6 files changed

+169
-6
lines changed

6 files changed

+169
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.protocol;
20+
21+
import static org.mockito.Mockito.doAnswer;
22+
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
25+
import static org.mockito.Mockito.spy;
26+
import java.util.Arrays;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.pulsar.broker.BrokerTestUtil;
33+
import org.apache.pulsar.client.api.Message;
34+
import org.apache.pulsar.client.api.MessageId;
35+
import org.apache.pulsar.client.api.Producer;
36+
import org.apache.pulsar.client.api.ProducerConsumerBase;
37+
38+
import org.apache.pulsar.client.api.Schema;
39+
import org.apache.pulsar.client.impl.ConsumerImpl;
40+
import org.apache.pulsar.common.api.proto.BaseCommand;
41+
import org.awaitility.Awaitility;
42+
import org.testng.annotations.AfterClass;
43+
import org.testng.annotations.BeforeClass;
44+
import org.testng.annotations.DataProvider;
45+
import org.testng.annotations.Test;
46+
47+
@Slf4j
48+
@Test
49+
public class ProducerBatchSendTest extends ProducerConsumerBase {
50+
51+
@BeforeClass(alwaysRun = true)
52+
@Override
53+
protected void setup() throws Exception {
54+
super.internalSetup();
55+
super.producerBaseSetup();
56+
}
57+
58+
@AfterClass(alwaysRun = true)
59+
@Override
60+
protected void cleanup() throws Exception {
61+
super.internalCleanup();
62+
}
63+
64+
@DataProvider
65+
public Object[][] flushSend() {
66+
return new Object[][] {
67+
{Collections.emptyList()},
68+
{Arrays.asList(1)},
69+
{Arrays.asList(2)},
70+
{Arrays.asList(3)},
71+
{Arrays.asList(1, 2)},
72+
{Arrays.asList(2, 3)},
73+
{Arrays.asList(1, 2, 3)},
74+
};
75+
}
76+
77+
@Test(timeOut = 30_000, dataProvider = "flushSend")
78+
public void testNoEnoughMemSend(List<Integer> flushSend) throws Exception {
79+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
80+
final String subscription = "s1";
81+
admin.topics().createNonPartitionedTopic(topic);
82+
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
83+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true)
84+
.batchingMaxMessages(Integer.MAX_VALUE).batchingMaxPublishDelay(1, TimeUnit.HOURS).create();
85+
86+
/**
87+
* The method {@link org.apache.pulsar.client.impl.BatchMessageContainerImpl#createOpSendMsg} may fail due to
88+
* many errors, such like allocate more memory failed when calling
89+
* {@link Commands#serializeCommandSendWithSize}. We mock an error here.
90+
*/
91+
AtomicBoolean failure = new AtomicBoolean(true);
92+
BaseCommand threadLocalBaseCommand = Commands.LOCAL_BASE_COMMAND.get();
93+
BaseCommand spyBaseCommand = spy(threadLocalBaseCommand);
94+
doAnswer(invocation -> {
95+
if (failure.get()) {
96+
throw new RuntimeException("mocked exception");
97+
} else {
98+
return invocation.callRealMethod();
99+
}
100+
}).when(spyBaseCommand).setSend();
101+
Commands.LOCAL_BASE_COMMAND.set(spyBaseCommand);
102+
103+
// Failed sending 3 times.
104+
producer.sendAsync("1");
105+
if (flushSend.contains(1)) {
106+
producer.flushAsync();
107+
}
108+
producer.sendAsync("2");
109+
if (flushSend.contains(2)) {
110+
producer.flushAsync();
111+
}
112+
producer.sendAsync("3");
113+
if (flushSend.contains(3)) {
114+
producer.flushAsync();
115+
}
116+
// Publishing is finished eventually.
117+
failure.set(false);
118+
producer.flush();
119+
Awaitility.await().untilAsserted(() -> {
120+
assertTrue(admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog() > 0);
121+
});
122+
123+
// Verify: all messages can be consumed.
124+
ConsumerImpl<String> consumer = (ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING).topic(topic)
125+
.subscriptionName(subscription).subscribe();
126+
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
127+
assertNotNull(msg1);
128+
assertEquals(msg1.getValue(), "1");
129+
Message<String> msg2 = consumer.receive(2, TimeUnit.SECONDS);
130+
assertNotNull(msg2);
131+
assertEquals(msg2.getValue(), "2");
132+
Message<String> msg3 = consumer.receive(2, TimeUnit.SECONDS);
133+
assertNotNull(msg3);
134+
assertEquals(msg3.getValue(), "3");
135+
136+
// cleanup.
137+
consumer.close();
138+
producer.close();
139+
admin.topics().delete(topic, false);
140+
}
141+
}

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java

+5
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,9 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
8989
* @return the timestamp in nanoseconds or 0L if the batch container is empty
9090
*/
9191
long getFirstAddedTimestamp();
92+
93+
/**
94+
* Clear the container's payload if build {@link OpSendMsg} failed.
95+
*/
96+
void resetPayloadAfterFailedPublishing();
9297
}

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,11 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
158158
} catch (Throwable th) {
159159
// serializing batch message can corrupt the index of message and batch-message. Reset the index so,
160160
// next iteration doesn't send corrupt message to broker.
161-
for (int j = 0; j <= i; j++) {
162-
MessageImpl<?> previousMsg = messages.get(j);
163-
previousMsg.getDataBuffer().resetReaderIndex();
164-
}
165161
batchedMessageMetadataAndPayload.writerIndex(batchWriteIndex);
166162
batchedMessageMetadataAndPayload.readerIndex(batchReadIndex);
167163
throw new RuntimeException(th);
164+
} finally {
165+
msg.getDataBuffer().resetReaderIndex();
168166
}
169167
}
170168

@@ -325,6 +323,14 @@ public OpSendMsg createOpSendMsg() throws IOException {
325323
return op;
326324
}
327325

326+
@Override
327+
public void resetPayloadAfterFailedPublishing() {
328+
if (batchedMessageMetadataAndPayload != null) {
329+
batchedMessageMetadataAndPayload.readerIndex(0);
330+
batchedMessageMetadataAndPayload.writerIndex(0);
331+
}
332+
}
333+
328334
protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
329335
int delta = updatedSizeBytes - batchAllocatedSizeBytes;
330336
batchAllocatedSizeBytes = updatedSizeBytes;

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java

+7
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
127127
}
128128
}
129129

130+
@Override
131+
public void resetPayloadAfterFailedPublishing() {
132+
for (BatchMessageContainerImpl batch : batches.values()) {
133+
batch.resetPayloadAfterFailedPublishing();
134+
}
135+
}
136+
130137
@Override
131138
public boolean hasSameSchema(MessageImpl<?> msg) {
132139
String key = getKey(msg);

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -2250,7 +2250,10 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
22502250
processOpSendMsg(opSendMsg);
22512251
}
22522252
} catch (Throwable t) {
2253-
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
2253+
// Since there is a uncompleted payload was built, we should reset it.
2254+
batchMessageContainer.resetPayloadAfterFailedPublishing();
2255+
log.warn("[{}] [{}] Failed to create batch message for sending. Batch payloads have been reset and"
2256+
+ " messages will be retried in subsequent batches.", topic, producerName, t);
22542257
} finally {
22552258
if (shouldScheduleNextBatchFlush) {
22562259
maybeScheduleBatchFlushTask();

Diff for: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public class Commands {
127127
public static final short magicBrokerEntryMetadata = 0x0e02;
128128
private static final int checksumSize = 4;
129129

130-
private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() {
130+
@VisibleForTesting
131+
static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() {
131132
@Override
132133
protected BaseCommand initialValue() throws Exception {
133134
return new BaseCommand();

0 commit comments

Comments
 (0)