From eec28109d10f26c0da4f8bf2fbd8a35c89aad0bd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 11:51:41 +0800 Subject: [PATCH 01/15] [fix][broker]fix memory leak, messages lost, incorrect replication state if using multiple versions schema --- .../broker/service/OneWayReplicatorTest.java | 98 +++++++++++ .../service/OneWayReplicatorTestBase.java | 9 + .../pulsar/client/api/SimpleSchemaTest.java | 89 ++++++++++ .../client/impl/ProducerMemoryLeakTest.java | 58 ++++++- .../impl/GeoReplicationProducerImpl.java | 9 + .../pulsar/client/impl/HandlerState.java | 4 + .../pulsar/client/impl/ProducerImpl.java | 162 +++++++++++++++--- ...PulsarClientImplementationBindingImpl.java | 2 +- .../client/impl/schema/SchemaInfoImpl.java | 2 +- .../client/impl/schema/SchemaUtils.java | 13 +- 10 files changed, 415 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index d3356a6069553..ca26b16cb1c3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -38,10 +38,13 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -79,6 +82,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -88,9 +92,12 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -1324,4 +1331,95 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception { admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); admin1.topics().delete(topicName, false); } + + @DataProvider + public Object[][] enableDeduplication() { + return new Object[][] { + {false}, + {true}, + }; + } + + @Test(dataProvider = "enableDeduplication") + public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + + sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_"); + final String subscriptionName = "s1"; + // 1.Create topic. + admin1.topics().createNonPartitionedTopic(topicName); + Producer producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + waitReplicatorStarted(topicName); + admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + admin2.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + if (enableDeduplication) { + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + } + // 2. Publish messages with multiple schemas. + producer1.newMessage(Schema.STRING).value("msg1").send(); + producer1.newMessage(Schema.BOOL).value(false).send(); + producer1.newMessage(Schema.STRING).value("msg3").send(); + // 3. several unloading, which causes replicator internal producer reconnects. +// for (int i = 0; i < 3; i++) { + Thread.sleep(2000); + admin2.topics().unload(topicName); + waitReplicatorStarted(topicName); +// } + // Verify: no individual acks. + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertTrue( + persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true) > 0); + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2"); + assertEquals(cursor.getTotalNonContiguousDeletedMessagesRange(), 0); + //assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) < 0); + }); + // 4. Adjust schema compatibility and unload topic on the remote side, which will solve the replication stuck + // issue. + admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topics().unload(topicName); + admin1.topics().unload(topicName); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2"); + assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) >= 0); + }); + // Verify: no out-of-order; schemas are as expected. + Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName(subscriptionName).subscribe(); + Collection msgReceived; + if (enableDeduplication) { + msgReceived = new ArrayList<>(); + } else { + msgReceived = new LinkedHashSet<>(); + } + while (true) { + Message message = consumer2.receive(2, TimeUnit.SECONDS); + if (message == null) { + break; + } +// SchemaType schemaType = message.getValue().getSchemaType(); +// assertTrue(schemaType.equals(SchemaType.STRING) || schemaType.equals(SchemaType.BOOLEAN)); + msgReceived.add(message.getValue().getNativeObject().toString()); + log.info("received msg: {}", message.getValue().getNativeObject().toString()); + } + assertEquals(msgReceived, Arrays.asList("msg1", "false", "msg3")); + List schemaInfoList = admin2.schemas().getAllSchemas(topicName); + assertEquals(schemaInfoList.size(), 2); + assertEquals(schemaInfoList.get(0).getType(), SchemaType.STRING); + assertEquals(schemaInfoList.get(1).getType(), SchemaType.BOOLEAN); + + // cleanup. + consumer2.close(); + producer1.close(); + admin2.topics().deleteSubscription(topicName, subscriptionName); + admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace, + SchemaCompatibilityStrategy.FORWARD); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index fdb01fc867a5e..534da8bbc491b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; @@ -65,6 +66,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected final String defaultTenant = "public"; protected final String replicatedNamespace = defaultTenant + "/default"; protected final String nonReplicatedNamespace = defaultTenant + "/ns1"; + protected final String sourceClusterAlwaysSchemaCompatibleNamespace = defaultTenant + "/always-compatible"; protected final String cluster1 = "r1"; @@ -157,6 +159,10 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(cluster1, cluster2))); admin1.namespaces().createNamespace(replicatedNamespace, Sets.newHashSet(cluster1, cluster2)); + admin1.namespaces().createNamespace( + sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster1, cluster2)); + admin1.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); admin1.namespaces().createNamespace(nonReplicatedNamespace); if (!usingGlobalZK) { @@ -177,6 +183,9 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(cluster1, cluster2))); admin2.namespaces().createNamespace(replicatedNamespace); + admin2.namespaces().createNamespace(sourceClusterAlwaysSchemaCompatibleNamespace); + admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace, + SchemaCompatibilityStrategy.FORWARD); admin2.namespaces().createNamespace(nonReplicatedNamespace); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index e006b72fad279..904754d1c729d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.AllArgsConstructor; @@ -42,25 +43,33 @@ import org.apache.avro.Schema.Parser; import org.apache.avro.reflect.ReflectData; import org.apache.pulsar.TestNGInstanceOrder; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.BinaryProtoLookupService; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.HttpLookupService; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.reader.AvroReader; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -76,6 +85,8 @@ public class SimpleSchemaTest extends ProducerConsumerBase { private static final String NAMESPACE = "my-property/my-ns"; + private static final String NAMESPACE_ALWAYS_COMPATIBLE = "my-property/always-compatible"; + private static final String NAMESPACE_NEVER_COMPATIBLE = "my-property/never-compatible"; @DataProvider(name = "batchingModes") public static Object[][] batchingModes() { @@ -124,6 +135,12 @@ protected void setup() throws Exception { this.isTcpLookup = true; super.internalSetup(); super.producerBaseSetup(); + admin.namespaces().createNamespace(NAMESPACE_ALWAYS_COMPATIBLE); + admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_ALWAYS_COMPATIBLE, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE); + admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE, + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); } @AfterClass(alwaysRun = true) @@ -340,6 +357,78 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex } } + @Test + public void testProducerConnectStateWhenRegisteringSchema() throws Exception { + final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_ALWAYS_COMPATIBLE + "/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + + // Create a pulsar client with a delayed response of "getOrCreateSchemaResponse" + CompletableFuture responseSignal = new CompletableFuture<>(); + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> + new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + protected void handleGetOrCreateSchemaResponse( + CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) { + responseSignal.join(); + super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse); + } + }); + Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create(); + producer.newMessage(Schema.STRING).value("msg").sendAsync(); + + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get(); + assertEquals(persistentTopic.getProducers().size(), 1); + assertTrue(producer.isConnected()); + + // cleanup. + responseSignal.complete(null); + producer.close(); + client.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getProducers().size(), 0); + }); + admin.topics().delete(topic); + } + + @Test + public void testNoMemoryLeakIfSchemaIncompatible() throws Exception { + final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_NEVER_COMPATIBLE + "/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + + // Create a pulsar client with a delayed response of "getOrCreateSchemaResponse" + CompletableFuture responseSignal = new CompletableFuture<>(); + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> + new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + protected void handleGetOrCreateSchemaResponse( + CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) { + responseSignal.join(); + super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse); + } + }); + Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create(); + producer.newMessage(Schema.STRING).value("msg").sendAsync(); + + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get(); + assertEquals(persistentTopic.getProducers().size(), 1); + assertTrue(producer.isConnected()); + + // cleanup. + responseSignal.complete(null); + producer.close(); + client.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getProducers().size(), 0); + }); + admin.topics().delete(topic); + } + @Test public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception { String topic = NAMESPACE + "/schema-test"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java index e366b232a639d..02a7c5bdb365f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -39,9 +40,11 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -52,11 +55,16 @@ @Test(groups = "broker-api") public class ProducerMemoryLeakTest extends ProducerConsumerBase { + private static final String NAMESPACE_NEVER_COMPATIBLE = "public/schema-never-compatible"; + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); + admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE); + admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE, + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); } @AfterClass(alwaysRun = true) @@ -268,6 +276,50 @@ public void testSendAfterClosedProducer() throws Exception { admin.topics().delete(topicName); } + @Test + public void testBrokenSchema() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + NAMESPACE_NEVER_COMPATIBLE + + "/tp"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl producer = + (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder msgBuilder1 = newMessage(producer, Schema.STRING); + msgBuilder1.value("msg-1").send(); + MsgPayloadTouchableMessageBuilder msgBuilder2 = newMessage(producer, Schema.BOOL); + try { + msgBuilder2.value(false).send(); + fail("expected schema broken error"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.IncompatibleSchemaException); + } + MsgPayloadTouchableMessageBuilder msgBuilder3 = newMessage(producer, Schema.STRING); + msgBuilder3.value("msg-3").send(); + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + ProducerImpl.OpSendMsgQueue pendingMessages = + WhiteboxImpl.getInternalState(producer, "pendingMessages"); + Queue pendingMessagesInternal = + WhiteboxImpl.getInternalState(pendingMessages, "delegate"); + assertEquals(pendingMessagesInternal.size(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + assertEquals(msgBuilder3.payload.refCnt(), 1); + + // cleanup. + msgBuilder1.release(); + msgBuilder2.release(); + msgBuilder3.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + assertEquals(msgBuilder3.payload.refCnt(), 0); + producer.close(); + admin.topics().delete(topicName); + } + @DataProvider public Object[][] failedInterceptAt() { return new Object[][]{ @@ -338,7 +390,11 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId } private MsgPayloadTouchableMessageBuilder newMessage(ProducerImpl producer){ - return new MsgPayloadTouchableMessageBuilder(producer, producer.schema); + return newMessage(producer, producer.schema); + } + + private MsgPayloadTouchableMessageBuilder newMessage(ProducerImpl producer, Schema schema){ + return new MsgPayloadTouchableMessageBuilder(producer, schema); } private static class MsgPayloadTouchableMessageBuilder extends TypedMessageBuilderImpl { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java index 3201dde748f34..9c56b72866be2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -108,6 +108,15 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI if (pendingLId != null && pendingEId != null && (pendingLId < lastPersistedSourceLedgerId || (pendingLId.longValue() == lastPersistedSourceLedgerId && pendingEId.longValue() <= lastPersistedSourceEntryId))) { + if (MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) { + log.error("[{}] [{}] Replication due to incompatible schem and the replicator will be stuck by producer" + + " queue size limitation." + + " Latest published entry {}:{}, Entry who has broken schema: {}:{}," + + " latest persisted source entry: {}:{}, pending queue size: {}.", + topic, producerName, sourceLId, sourceEId, pendingLId, pendingEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId, pendingMessages.messagesCount()); + return; + } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received an msg send receipt[pending send is repeated due to repl cursor rewind]:" + " source entry {}:{}, pending send: {}:{}, latest persisted: {}:{}", diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java index 25c133391cb65..e9f14a06768c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java @@ -68,6 +68,10 @@ protected boolean changeToReadyState() { || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Ready)); } + protected boolean compareAndSetState(State expect, State update) { + return STATE_UPDATER.compareAndSet(this, expect, update); + } + protected boolean changeToRegisteringSchemaState() { return STATE_UPDATER.compareAndSet(this, State.Ready, State.RegisteringSchema); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f968bea2019ce..b201ba9819d38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -74,6 +74,7 @@ import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.CryptoException; +import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; @@ -188,6 +189,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final Counter producersOpenedCounter; private final Counter producersClosedCounter; + private final boolean needStuckIfSchemaIncompatible; public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema, @@ -198,6 +200,10 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration this.userProvidedProducerName = StringUtils.isNotBlank(producerName); this.partitionIndex = partitionIndex; this.pendingMessages = createPendingMessagesQueue(); + // Replication need be stuck when a message can not be replicated due to failed schema registration. Otherwise, + // it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled deduplication + // on the remote side. + this.needStuckIfSchemaIncompatible = conf.isReplProducer(); if (conf.getMaxPendingMessages() > 0) { this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)); } else { @@ -514,7 +520,8 @@ public ByteBuf applyCompression(ByteBuf payload) { *
    *
  1. Release 1: When the message is written out by * {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.
  2. - *
  3. Release 2: In the method {@link SendCallback#sendComplete(Throwable, OpSendMsgStats)}.
  4. + *
  5. Release 2: In the method {@link ByteBufPair#release()}, which was called by + * {@link SendCallback#sendComplete(Throwable, OpSendMsgStats)}.
  6. *
* * @@ -876,7 +883,7 @@ private boolean rePopulateMessageSchema(MessageImpl msg) { return true; } - private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) { + private void tryRegisterSchema(ClientCnx cnx, final MessageImpl msg, SendCallback callback, long expectedCnxEpoch) { if (!changeToRegisteringSchemaState()) { return; } @@ -889,8 +896,15 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call Throwable t = FutureUtil.unwrapCompletionException(ex); log.warn("[{}] [{}] GetOrCreateSchema error", topic, producerName, t); if (t instanceof PulsarClientException.IncompatibleSchemaException) { - msg.setSchemaState(MessageImpl.SchemaState.Broken); - callback.sendComplete(t, null); + // Only the first time of failed schema registration will trigger a "recoverProcessOpSendMsgFrom". + if (!Broken.equals(msg.getSchemaState())) { + cnx.ctx().channel().eventLoop().execute(() -> { + synchronized (ProducerImpl.this) { + recoverProcessOpSendMsgFrom(cnx, msg, true, expectedCnxEpoch); + } + }); + } + return null; } } else { log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName); @@ -908,7 +922,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call } cnx.ctx().channel().eventLoop().execute(() -> { synchronized (ProducerImpl.this) { - recoverProcessOpSendMsgFrom(cnx, msg, expectedCnxEpoch); + recoverProcessOpSendMsgFrom(cnx, msg, false, expectedCnxEpoch); } }); return null; @@ -1231,7 +1245,10 @@ public boolean isConnected() { * verify that the returned cnx is not null before using reference. */ protected ClientCnx getCnxIfReady() { - if (getState() == State.Ready) { + State state = getState(); + // When a producer is publishing with multiple schema, it may be switched to a "RegisteringSchema" state, even + // if the connection is established. + if (State.Ready.equals(state) || State.RegisteringSchema.equals(state)) { return connectionHandler.cnx(); } else { return null; @@ -2069,7 +2086,7 @@ private void resendMessages(ClientCnx cnx, long expectedEpoch) { } log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend); - recoverProcessOpSendMsgFrom(cnx, null, expectedEpoch); + recoverProcessOpSendMsgFrom(cnx, null, false, expectedEpoch); } }); } @@ -2367,7 +2384,12 @@ protected synchronized void processOpSendMsg(OpSendMsg op) { } pendingMessages.add(op); updateLastSeqPushed(op); - + if (State.RegisteringSchema.equals(getState())) { + // Since there is a in-progress schema registration, do not continuously publish to avoid break publish + // ordering. After the schema registration finished, it will trigger a "recoverProcessOpSendMsgFrom" to + // publish all messages in "pendingMessages". + return; + } final ClientCnx cnx = getCnxIfReady(); if (cnx != null) { if (op.msg != null && op.msg.getSchemaState() == None) { @@ -2399,8 +2421,25 @@ protected void updateLastSeqPushed(OpSendMsg op) { } } - // Must acquire a lock on ProducerImpl.this before calling method. - private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) { + /** + * There are following events that will call this method. + * 1. Republish messages in {@link #pendingMessages} after a reconnect. + * 1-1. Using multiple version producer, and there is a message has new schema that should be registered. + * 1-2. No message should register new schema. + * 2. If using multiple version producer, the new schema was registered successfully. + * 2-1. There is another message needs to register new schema,which is in {@link #pendingMessages}. + * 2-2. {@link #pendingMessages} has no other messages that need to register new schema. + * 3. If using multiple version producer, the new schema was failed to registered. + * 3-1. If the new schema is incompatible. + * 3-1-1. If {@link #needStuckIfSchemaIncompatible} is true stuck all following publishing to avoid + * out-of-order issue. + * 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages. + * 3-2. The new schema registration failed due to other error, retry registering. + * Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on + * {@link ProducerImpl} before calling method. + */ + private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAttemptedRegisteredSchema, + boolean failedIncompatibleSchema, long expectedEpoch) { if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) { // In this case, the cnx passed to this method is no longer the active connection. This method will get // called again once the new connection registers the producer with the broker. @@ -2412,28 +2451,84 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e } final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion(); Iterator msgIterator = pendingMessages.iterator(); - OpSendMsg pendingRegisteringOp = null; + MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema; + OpSendMsg loopEndDueToSchemaRegisterNeeded = null; while (msgIterator.hasNext()) { OpSendMsg op = msgIterator.next(); - if (from != null) { - if (op.msg == from) { - from = null; + if (loopStartAt != null) { + if (op.msg == loopStartAt) { + loopStartAt = null; } else { continue; } } if (op.msg != null) { - if (op.msg.getSchemaState() == None) { - if (!rePopulateMessageSchema(op.msg)) { - pendingRegisteringOp = op; + if (Broken.equals(op.msg.getSchemaState())) { + // "Event 1-1" happens after "Event 3-1-1". + // Maybe user has changed the schema compatibility strategy, will retry to register the new schema. + if (needStuckIfSchemaIncompatible) { + loopEndDueToSchemaRegisterNeeded = op; break; + } else { + // This scenario will never happen because the message will be removed from the queue as soon + // as it was set to "schemaState -> Broken". + SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator() + : op.msg.getSchemaInfo(); + log.error("[{}] [{}] A message attempts to register new schema, but failed. It should be" + + " removed from the pending queue but not, which is not expected. {}", + topic, producerName, String.valueOf(msgSchemaInfo)); + releaseSemaphoreForSendOp(op); + msgIterator.remove(); + op.recycle(); + continue; } - } else if (op.msg.getSchemaState() == Broken) { - op.recycle(); + } else if (op.msg == latestMsgAttemptedRegisteredSchema && failedIncompatibleSchema + && op.msg.getSchemaState() == None) { + op.msg.setSchemaState(Broken); + SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator() + : op.msg.getSchemaInfo(); + // Event 3-1-1. + // New schema is incompatible, if users need to guarantee the publishing ordering, we should let + // the producer be stuck until user changed the compatibility policy and unload the target topic. + if (needStuckIfSchemaIncompatible) { + log.error("[{}] [{}] Producer was stuck due to incompatible schem, please adjust your" + + " schema compatibility strategy and unload the topic on the target cluster. {}", + topic, producerName, String.valueOf(msgSchemaInfo)); + loopEndDueToSchemaRegisterNeeded = op; + break; + } + // Event 3-1-2. + // Give user a failed callback and remove the message from "pendingMessages". + String failedMsg = format("[%s] [%s] incompatible schema %s", topic, producerName, + String.valueOf(msgSchemaInfo)); + log.error(failedMsg); + // The messages' release rely on "op.cmd"'s release, we need to initialize "op.cmd" and + // release it to release "msg.payload". + if (op.cmd == null) { + op.rePopulate.run(); + } + ReferenceCountUtil.safeRelease(op.cmd); + try { + // Need to protect ourselves from any exception being thrown in the future handler from the + // application + op.sendComplete(new IncompatibleSchemaException(failedMsg)); + } catch (Throwable t) { + log.warn("Got exception while completing the failed publishing: {}", failedMsg, t); + } + releaseSemaphoreForSendOp(op); msgIterator.remove(); + op.recycle(); continue; + } else if (op.msg.getSchemaState() == None) { + // Event 1-1. + // There is a message needs to register new schema when flushing pending messages after reconnected. + if (!rePopulateMessageSchema(op.msg)) { + loopEndDueToSchemaRegisterNeeded = op; + break; + } } } + // "Event 1-2" or "Event 2-2" or "Event 3-1-2". if (op.cmd == null) { checkState(op.rePopulate != null); op.rePopulate.run(); @@ -2454,6 +2549,32 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte); } cnx.ctx().flush(); + + // "Event 1-1" or "Event 3-1-1" or "Event 3-2". + if (loopEndDueToSchemaRegisterNeeded != null) { + if (compareAndSetState(State.Connecting, State.Ready)) { + // "Event 1-1" happens after "Event 3-1-1". + // After a topic unload, ask the producer retry to register schema, which avoids restart client + // after users changed the compatibility strategy to make the schema is compatible. + tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback, + expectedEpoch); + } else if (!needStuckIfSchemaIncompatible && compareAndSetState(State.RegisteringSchema, State.Ready)) { + // "Event 2-1" or "Event 3-2". + // "pendingMessages" has more messages to register new schema. + // This operation will not be conflict with another schema registration because both operations are + // attempt to acquire the same lock "ProducerImpl.this". + tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback, + expectedEpoch); + } + // Nothing to do if the event is "Event 3-1-1", just keep stuck. + return; + } else if (latestMsgAttemptedRegisteredSchema != null) { + // Event 2-2 or "Event 3-1-2". + // Switch state to "Ready" after a successful schema registration. + compareAndSetState(State.RegisteringSchema, State.Ready); + } + // "Event 1-2". + // Change state to "Ready" after reconnected. if (!changeToReadyState()) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side @@ -2465,9 +2586,6 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e if (isBatchMessagingEnabled() && !batchMessageContainer.isEmpty()) { maybeScheduleBatchFlushTask(); } - if (pendingRegisteringOp != null) { - tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback, expectedEpoch); - } } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 346eb20ef4cc5..b16edac8c3fc6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -327,7 +327,7 @@ public KeyValue decodeKeyValueSchemaInfo(SchemaInfo sche * @return the jsonified schema info */ public String jsonifySchemaInfo(SchemaInfo schemaInfo) { - return SchemaUtils.jsonifySchemaInfo(schemaInfo); + return SchemaUtils.jsonifySchemaInfo(schemaInfo, false); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java index bac41e0bf6287..e1b697ddd612d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java @@ -117,6 +117,6 @@ public SchemaHash getSchemaHash() { @Override public String toString() { - return SchemaUtils.jsonifySchemaInfo(this); + return SchemaUtils.jsonifySchemaInfo(this, false); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java index 881ad424669d2..e00b1a19d8337 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java @@ -200,12 +200,13 @@ public static String getStringSchemaVersion(byte[] schemaVersionBytes) { * @param schemaInfo the schema info * @return the jsonified schema info */ - public static String jsonifySchemaInfo(SchemaInfo schemaInfo) { + public static String jsonifySchemaInfo(SchemaInfo schemaInfo, boolean prettyPrinting) { GsonBuilder gsonBuilder = new GsonBuilder() - .setPrettyPrinting() .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToStringAdapter(schemaInfo)) .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER); - + if (prettyPrinting) { + gsonBuilder.setPrettyPrinting(); + } return gsonBuilder.create().toJson(schemaInfo); } @@ -285,8 +286,8 @@ public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContex KeyValue schemaInfoKeyValue = DefaultImplementation.getDefaultImplementation().decodeKeyValueSchemaInfo(schemaInfo); JsonObject obj = new JsonObject(); - String keyJson = jsonifySchemaInfo(schemaInfoKeyValue.getKey()); - String valueJson = jsonifySchemaInfo(schemaInfoKeyValue.getValue()); + String keyJson = jsonifySchemaInfo(schemaInfoKeyValue.getKey(), false); + String valueJson = jsonifySchemaInfo(schemaInfoKeyValue.getValue(), false); obj.add("key", toJsonElement(keyJson)); obj.add("value", toJsonElement(valueJson)); return obj; @@ -312,7 +313,7 @@ public JsonElement serialize(SchemaInfo schemaInfo, Type type, JsonSerializationContext jsonSerializationContext) { // schema will not a json, so use toJsonElement - return toJsonElement(jsonifySchemaInfo(schemaInfo)); + return toJsonElement(jsonifySchemaInfo(schemaInfo, false)); } } From 2ce74993b10d5cf20d1a38208fa808679ad23ab5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 12:32:36 +0800 Subject: [PATCH 02/15] tests --- .../service/OneWayReplicatorUsingGlobalPartitionedTest.java | 6 ++++++ .../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 2a2a1befd1681..2bfbce816db2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -204,4 +204,10 @@ public void testRemoveCluster() throws Exception { admin2.topics().delete(topic); admin2.namespaces().deleteNamespace(ns1); } + + @Override + @Test(dataProvider = "enableDeduplication", enabled = false) + public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception { + super.testIncompatibleMultiVersionSchema(enableDeduplication); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index a530baab9bea0..7914ba5aebbf9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -223,4 +223,10 @@ public void testRemoveCluster() throws Exception { admin2.topics().delete(topic); admin2.namespaces().deleteNamespace(ns1); } + + @Override + @Test(dataProvider = "enableDeduplication", enabled = false) + public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception { + super.testIncompatibleMultiVersionSchema(enableDeduplication); + } } From 7834341c9a8b7ca19c3ca65a7b99845e4d44c574 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 12:42:58 +0800 Subject: [PATCH 03/15] tests --- .../pulsar/broker/service/OneWayReplicatorTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index ca26b16cb1c3a..99763bdfb5919 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1359,11 +1359,11 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro producer1.newMessage(Schema.BOOL).value(false).send(); producer1.newMessage(Schema.STRING).value("msg3").send(); // 3. several unloading, which causes replicator internal producer reconnects. -// for (int i = 0; i < 3; i++) { + for (int i = 0; i < 3; i++) { Thread.sleep(2000); admin2.topics().unload(topicName); waitReplicatorStarted(topicName); -// } + } // Verify: no individual acks. Awaitility.await().untilAsserted(() -> { PersistentTopic persistentTopic2 = @@ -1375,7 +1375,7 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2"); assertEquals(cursor.getTotalNonContiguousDeletedMessagesRange(), 0); - //assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) < 0); + assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) < 0); }); // 4. Adjust schema compatibility and unload topic on the remote side, which will solve the replication stuck // issue. @@ -1404,8 +1404,8 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro if (message == null) { break; } -// SchemaType schemaType = message.getValue().getSchemaType(); -// assertTrue(schemaType.equals(SchemaType.STRING) || schemaType.equals(SchemaType.BOOLEAN)); + SchemaType schemaType = message.getValue().getSchemaType(); + assertTrue(schemaType.equals(SchemaType.STRING) || schemaType.equals(SchemaType.BOOLEAN)); msgReceived.add(message.getValue().getNativeObject().toString()); log.info("received msg: {}", message.getValue().getNativeObject().toString()); } From 64e20cc528fab717c1e1d06aa7f2b8376bcdf276 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 17:36:52 +0800 Subject: [PATCH 04/15] code style --- .../java/org/apache/pulsar/client/api/SimpleSchemaTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 904754d1c729d..66d399fc5f62a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -69,7 +69,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; From 05233ff8494813856d064fc2d3887b1d44e0409c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 18:06:59 +0800 Subject: [PATCH 05/15] fix tests --- .../apache/pulsar/client/impl/schema/SchemaInfoTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java index acda79486caa0..ddcacf03d1a1f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java @@ -286,7 +286,7 @@ public static Object[][] schemas() { @Test(dataProvider = "schemas") public void testSchemaInfoToString(SchemaInfo si, String jsonifiedStr) { - assertEquals(si.toString(), jsonifiedStr); + assertEquals(si.toString(), removeBlank(jsonifiedStr)); } public static class SchemaInfoBuilderTest { @@ -335,7 +335,11 @@ public void testNullPropertyValue() { .build(); // null key will be skipped by Gson when serializing JSON to String - assertEquals(si.toString(), INT32_SCHEMA_INFO); + assertEquals(si.toString(), removeBlank(INT32_SCHEMA_INFO)); } } + + private static String removeBlank(String src) { + return src.replaceAll(" ", "").replaceAll("\n", "").replaceAll("\t", ""); + } } From 217b1c11154facce509fa50a4c811a8bd58e5345 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 14 Apr 2025 23:07:02 +0800 Subject: [PATCH 06/15] fix test --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b201ba9819d38..b7e2b03842fab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2558,7 +2558,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt // after users changed the compatibility strategy to make the schema is compatible. tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback, expectedEpoch); - } else if (!needStuckIfSchemaIncompatible && compareAndSetState(State.RegisteringSchema, State.Ready)) { + } else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) { // "Event 2-1" or "Event 3-2". // "pendingMessages" has more messages to register new schema. // This operation will not be conflict with another schema registration because both operations are From 85f29f8dd8e321b39826ac1a423bd747e234cc06 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 15 Apr 2025 14:38:27 +0800 Subject: [PATCH 07/15] address comment --- .../pulsar/client/impl/GeoReplicationProducerImpl.java | 4 ++-- .../org/apache/pulsar/client/impl/ProducerImpl.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java index 9c56b72866be2..464d4ef6e40a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -109,8 +109,8 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI && (pendingLId < lastPersistedSourceLedgerId || (pendingLId.longValue() == lastPersistedSourceLedgerId && pendingEId.longValue() <= lastPersistedSourceEntryId))) { if (MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) { - log.error("[{}] [{}] Replication due to incompatible schem and the replicator will be stuck by producer" - + " queue size limitation." + log.error("[{}] [{}] Replication is stuck because there is a schema is incompatible with the remote" + + " cluster, please modify the schema compatibility for the remote cluster." + " Latest published entry {}:{}, Entry who has broken schema: {}:{}," + " latest persisted source entry: {}:{}, pending queue size: {}.", topic, producerName, sourceLId, sourceEId, pendingLId, pendingEId, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b7e2b03842fab..d418f146ed5b7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -189,7 +189,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final Counter producersOpenedCounter; private final Counter producersClosedCounter; - private final boolean needStuckIfSchemaIncompatible; + private final boolean needStuckForOrderingIfSchemaRegFailed; public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema, @@ -203,7 +203,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration // Replication need be stuck when a message can not be replicated due to failed schema registration. Otherwise, // it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled deduplication // on the remote side. - this.needStuckIfSchemaIncompatible = conf.isReplProducer(); + this.needStuckForOrderingIfSchemaRegFailed = conf.isReplProducer(); if (conf.getMaxPendingMessages() > 0) { this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)); } else { @@ -2431,7 +2431,7 @@ protected void updateLastSeqPushed(OpSendMsg op) { * 2-2. {@link #pendingMessages} has no other messages that need to register new schema. * 3. If using multiple version producer, the new schema was failed to registered. * 3-1. If the new schema is incompatible. - * 3-1-1. If {@link #needStuckIfSchemaIncompatible} is true stuck all following publishing to avoid + * 3-1-1. If {@link #needStuckForOrderingIfSchemaRegFailed} is true stuck all following publishing to avoid * out-of-order issue. * 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages. * 3-2. The new schema registration failed due to other error, retry registering. @@ -2466,7 +2466,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt if (Broken.equals(op.msg.getSchemaState())) { // "Event 1-1" happens after "Event 3-1-1". // Maybe user has changed the schema compatibility strategy, will retry to register the new schema. - if (needStuckIfSchemaIncompatible) { + if (needStuckForOrderingIfSchemaRegFailed) { loopEndDueToSchemaRegisterNeeded = op; break; } else { @@ -2490,7 +2490,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt // Event 3-1-1. // New schema is incompatible, if users need to guarantee the publishing ordering, we should let // the producer be stuck until user changed the compatibility policy and unload the target topic. - if (needStuckIfSchemaIncompatible) { + if (needStuckForOrderingIfSchemaRegFailed) { log.error("[{}] [{}] Producer was stuck due to incompatible schem, please adjust your" + " schema compatibility strategy and unload the topic on the target cluster. {}", topic, producerName, String.valueOf(msgSchemaInfo)); From 48614fa2e9b6b2735c6f886113f7f9b5c98aad63 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 15 Apr 2025 20:53:53 +0800 Subject: [PATCH 08/15] address comments --- .../org/apache/pulsar/client/impl/ProducerImpl.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index d418f146ed5b7..49db5bdfb7851 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -189,7 +189,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final Counter producersOpenedCounter; private final Counter producersClosedCounter; - private final boolean needStuckForOrderingIfSchemaRegFailed; + private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure; public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema, @@ -203,7 +203,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration // Replication need be stuck when a message can not be replicated due to failed schema registration. Otherwise, // it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled deduplication // on the remote side. - this.needStuckForOrderingIfSchemaRegFailed = conf.isReplProducer(); + this.pauseSendingToPreservePublishOrderOnSchemaRegFailure = conf.isReplProducer(); if (conf.getMaxPendingMessages() > 0) { this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)); } else { @@ -2431,8 +2431,8 @@ protected void updateLastSeqPushed(OpSendMsg op) { * 2-2. {@link #pendingMessages} has no other messages that need to register new schema. * 3. If using multiple version producer, the new schema was failed to registered. * 3-1. If the new schema is incompatible. - * 3-1-1. If {@link #needStuckForOrderingIfSchemaRegFailed} is true stuck all following publishing to avoid - * out-of-order issue. + * 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true stuck all following + * publishing to avoid out-of-order issue. * 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages. * 3-2. The new schema registration failed due to other error, retry registering. * Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on @@ -2466,7 +2466,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt if (Broken.equals(op.msg.getSchemaState())) { // "Event 1-1" happens after "Event 3-1-1". // Maybe user has changed the schema compatibility strategy, will retry to register the new schema. - if (needStuckForOrderingIfSchemaRegFailed) { + if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) { loopEndDueToSchemaRegisterNeeded = op; break; } else { @@ -2490,7 +2490,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt // Event 3-1-1. // New schema is incompatible, if users need to guarantee the publishing ordering, we should let // the producer be stuck until user changed the compatibility policy and unload the target topic. - if (needStuckForOrderingIfSchemaRegFailed) { + if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) { log.error("[{}] [{}] Producer was stuck due to incompatible schem, please adjust your" + " schema compatibility strategy and unload the topic on the target cluster. {}", topic, producerName, String.valueOf(msgSchemaInfo)); From a7d6316af5033c8b85264b720640dd0943244b7b Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Tue, 15 Apr 2025 22:29:57 +0800 Subject: [PATCH 09/15] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java Co-authored-by: Lari Hotari --- .../org/apache/pulsar/client/impl/ProducerImpl.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 49db5bdfb7851..91d5c18f2d020 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2488,11 +2488,13 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator() : op.msg.getSchemaInfo(); // Event 3-1-1. - // New schema is incompatible, if users need to guarantee the publishing ordering, we should let - // the producer be stuck until user changed the compatibility policy and unload the target topic. + // When a schema is incompatible, we need to pause the producer to preserve message order. + // Otherwise, subsequent messages with compatible schemas would be delivered while this message + // remains stuck, causing out-of-order delivery or potential message loss with deduplication. if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) { - log.error("[{}] [{}] Producer was stuck due to incompatible schem, please adjust your" - + " schema compatibility strategy and unload the topic on the target cluster. {}", + log.error("[{}] [{}] Publishing paused: message schema incompatible with target cluster." + + " To resume publishing: 1) Adjust schema compatibility strategy on target cluster" + + " 2) Unload topic on target cluster. Schema details: {}", topic, producerName, String.valueOf(msgSchemaInfo)); loopEndDueToSchemaRegisterNeeded = op; break; From f61a52acb47045866258376966cdea7f51215404 Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Tue, 15 Apr 2025 22:30:06 +0800 Subject: [PATCH 10/15] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java Co-authored-by: Lari Hotari --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 91d5c18f2d020..7db2fd2a443f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2431,7 +2431,7 @@ protected void updateLastSeqPushed(OpSendMsg op) { * 2-2. {@link #pendingMessages} has no other messages that need to register new schema. * 3. If using multiple version producer, the new schema was failed to registered. * 3-1. If the new schema is incompatible. - * 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true stuck all following + * 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all following * publishing to avoid out-of-order issue. * 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages. * 3-2. The new schema registration failed due to other error, retry registering. From d085a820e924e5797423d3404e94f7c19eb978a0 Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Tue, 15 Apr 2025 22:30:17 +0800 Subject: [PATCH 11/15] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java Co-authored-by: Lari Hotari --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 7db2fd2a443f2..ba13804b6ffa3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -200,7 +200,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration this.userProvidedProducerName = StringUtils.isNotBlank(producerName); this.partitionIndex = partitionIndex; this.pendingMessages = createPendingMessagesQueue(); - // Replication need be stuck when a message can not be replicated due to failed schema registration. Otherwise, + // Replication needs to be paused when a message can not be replicated due to failed schema registration. Otherwise, // it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled deduplication // on the remote side. this.pauseSendingToPreservePublishOrderOnSchemaRegFailure = conf.isReplProducer(); From af4a689cba124d114baa191beed604c3d9eea591 Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Tue, 15 Apr 2025 22:30:27 +0800 Subject: [PATCH 12/15] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java Co-authored-by: Lari Hotari --- .../apache/pulsar/client/impl/GeoReplicationProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java index 464d4ef6e40a2..3262545cbfcb0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -109,7 +109,7 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI && (pendingLId < lastPersistedSourceLedgerId || (pendingLId.longValue() == lastPersistedSourceLedgerId && pendingEId.longValue() <= lastPersistedSourceEntryId))) { if (MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) { - log.error("[{}] [{}] Replication is stuck because there is a schema is incompatible with the remote" + log.error("[{}] [{}] Replication is paused because the schema is incompatible with the remote" + " cluster, please modify the schema compatibility for the remote cluster." + " Latest published entry {}:{}, Entry who has broken schema: {}:{}," + " latest persisted source entry: {}:{}, pending queue size: {}.", From 362f6b82ff39a291c39a37fb68b1c0996179d5ca Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 15 Apr 2025 22:34:18 +0800 Subject: [PATCH 13/15] checkstyle --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index ba13804b6ffa3..b7d598de562a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -200,9 +200,9 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration this.userProvidedProducerName = StringUtils.isNotBlank(producerName); this.partitionIndex = partitionIndex; this.pendingMessages = createPendingMessagesQueue(); - // Replication needs to be paused when a message can not be replicated due to failed schema registration. Otherwise, - // it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled deduplication - // on the remote side. + // Replication needs to be paused when a message can not be replicated due to failed schema registration. + // Otherwise, it may cause an out-of-order issue, and it may lead to a messages lost issue if users enabled + // deduplication on the remote side. this.pauseSendingToPreservePublishOrderOnSchemaRegFailure = conf.isReplProducer(); if (conf.getMaxPendingMessages() > 0) { this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)); From 30a488c324cd113817e26acc4646b73549086792 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 15 Apr 2025 22:40:03 +0800 Subject: [PATCH 14/15] checkstyle --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b7d598de562a1..824037e20e828 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2445,8 +2445,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt // called again once the new connection registers the producer with the broker. log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the " + " {} pending messages since they will deliver using another connection.", topic, - producerName, - pendingMessages.messagesCount()); + producerName, pendingMessages.messagesCount()); return; } final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion(); From 8c515822d4e0ce8a31f03ebaa4a1e513f57eeece Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 17 Apr 2025 11:41:20 +0800 Subject: [PATCH 15/15] checkstyle --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 824037e20e828..55d0ab92a715a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2489,7 +2489,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt // Event 3-1-1. // When a schema is incompatible, we need to pause the producer to preserve message order. // Otherwise, subsequent messages with compatible schemas would be delivered while this message - // remains stuck, causing out-of-order delivery or potential message loss with deduplication. + // remains stuck, causing out-of-order delivery or potential message loss with deduplication. if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) { log.error("[{}] [{}] Publishing paused: message schema incompatible with target cluster." + " To resume publishing: 1) Adjust schema compatibility strategy on target cluster"