diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java index 33e797fcb219f..539abe7eed450 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java @@ -24,7 +24,7 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.impl.InjectedClientCnxClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherLockTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherLockTest.java new file mode 100644 index 0000000000000..9a1e12b20da62 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherLockTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky; +import java.util.Collections; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.api.proto.CommandPing; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class DispatcherLockTest extends ProducerConsumerBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + /** + * The method is used to verify that the Broker will not leave an orphan consumer in the scenario below: + * 1. Register "consumer-1" + * - "consumer-1" will be maintained by the Subscription. + * - "consumer-1" will be maintained by the Dispatcher. + * 2. The connection of "consumer-1" has something wrong. We call this connection "connection-1" + * 3. Try to register "consumer-2" + * - "consumer-2" will be maintained by the Subscription. At this time, there are two consumers under this + * subscription. + * - This will trigger a connection check task for connection-1, we call this task "CheckConnectionLiveness". + * This task will be executed in another thread, which means it will release the lock `Synchronized(dispatcher)` + * - "consumer-2" was not maintained by the Dispatcher yet. + * 4. "CheckConnectionLiveness" will kick out "consumer-1" after 5 seconds, then "consumer-2" will be maintained + * by the Dispatcher. + * (Highlight) Race condition: if the connection of "consumer-2" went to a wrong state before step 4, + * "consumer-2" maintained by the Subscription and not maintained by the Dispatcher. Would the scenario below + * will happen? + * 1. "connection-2" closed. + * 2. Remove "consumer-2" from the Subscription. + * 3. Try to remove "consumer-2" from the Dispatcher, but there are no consumers under this Dispatcher. To remove + * nothing. + * 4. "CheckConnectionLiveness" is finished; put "consumer-2" into the Dispatcher. + * 5. At this moment, the consumer's state of Subscription and Dispatcher are not consistent. There is an orphan + * consumer under the Dispatcher. + */ + @Test + public void testNoOrphanConsumerIfLostDispatcherLock() throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + List ranges = Collections.singletonList(new Range(0, 65535)); + KeySharedPolicySticky sharedPolicySticky = new KeySharedPolicySticky.KeySharedPolicySticky().ranges(ranges); + final String consumerName1 = "c1"; + final String consumerName2 = "c2"; + + // Create a client that injected logic: do not answer for the command Ping + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClient skipHealthCheckClient = InjectedClientCnxClientBuilder.create(clientBuilder, + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + public void handlePing(CommandPing ping) { + // do not response anything. + } + }); + PulsarClientImpl normalClient = (PulsarClientImpl) newPulsarClient(lookupUrl.toString(), 0); + + // 1. Register "consumer-1" + skipHealthCheckClient.newConsumer().topic(tpName).subscriptionName(subscription) + .consumerName(consumerName1).keySharedPolicy(sharedPolicySticky) + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + // Wait for all commands of the consumer c1 have been handled. To avoid the Broker mark the connection is active + // after it receive anything. + Thread.sleep(1000); + + // Try to register "consumer-2" + normalClient.newConsumer().topic(tpName).subscriptionName(subscription) + .consumerName(consumerName2).keySharedPolicy(sharedPolicySticky) + .subscriptionType(SubscriptionType.Key_Shared).subscribeAsync(); + // Wait for "consumer-2" maintained by the Subscription, and the task "CheckConnectionLiveness" is in-progress. + Thread.sleep(1000); + + // Make a race condition: close "connection-2". + normalClient.close(); + + // Verify no orphan consumers were left under the Dispatcher. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + List + consumers = persistentTopic.getSubscription(subscription).getDispatcher().getConsumers(); + Awaitility.await().untilAsserted(() -> { + log.info("consumer size: {}", consumers.size()); + assertEquals(consumers.size(), 0); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/InjectedClientCnxClientBuilder.java similarity index 86% rename from pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java rename to pulsar-broker/src/test/java/org/apache/pulsar/client/impl/InjectedClientCnxClientBuilder.java index d29dd4f7061b8..d45c7e33852f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/InjectedClientCnxClientBuilder.java @@ -16,18 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.api; +package org.apache.pulsar.client.impl; import io.netty.channel.EventLoopGroup; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.PulsarTestClient; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.util.netty.EventLoopUtil; +@Slf4j public class InjectedClientCnxClientBuilder { public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder, @@ -42,7 +48,7 @@ public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder, ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup, () -> clientCnxFactory.generate(conf, eventLoopGroup)); - return new PulsarClientImpl(conf, eventLoopGroup, pool); + return new PulsarTestClient(conf, eventLoopGroup, pool, new AtomicReference<>()); } public interface ClientCnxFactory { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java index 8126ba1bba928..e2f877a9c3b44 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java @@ -86,7 +86,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar clientCnxSupplierReference); } - private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, + PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, AtomicReference> clientCnxSupplierReference) throws PulsarClientException { super(conf, eventLoopGroup, cnxPool); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java index e9071f171a29e..035a3261eddcc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.util; +import org.apache.pulsar.common.naming.TopicName; + public class RetryMessageUtil { public static final String SYSTEM_PROPERTY_RECONSUMETIMES = "RECONSUMETIMES"; @@ -31,4 +33,20 @@ public class RetryMessageUtil { public static final int MAX_RECONSUMETIMES = 16; public static final String RETRY_GROUP_TOPIC_SUFFIX = "-RETRY"; public static final String DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; + + public static boolean isDefaultRetryTopic(String topicName) { + return isDefaultRetryTopic(TopicName.get(topicName)); + } + + public static boolean isDefaultRetryTopic(TopicName topicName) { + return topicName.getLocalName().endsWith(RETRY_GROUP_TOPIC_SUFFIX); + } + + public static boolean isDefaultDLQ(String topicName) { + return isDefaultDLQ(TopicName.get(topicName)); + } + + public static boolean isDefaultDLQ(TopicName topicName) { + return topicName.getLocalName().endsWith(DLQ_GROUP_TOPIC_SUFFIX); + } } \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index 51cd61afd6362..43337a4b6b08d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -82,7 +82,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override - protected final void handlePing(CommandPing ping) { + protected void handlePing(CommandPing ping) { // Immediately reply success to ping requests if (log.isDebugEnabled()) { log.debug("[{}] Replying back to ping message", ctx.channel());