Skip to content

[fix][broker]fix memory leak, messages lost, incorrect replication state if using multiple versions schema #24178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -79,6 +82,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -88,9 +92,12 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -1324,4 +1331,95 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
admin1.topics().delete(topicName, false);
}

@DataProvider
public Object[][] enableDeduplication() {
return new Object[][] {
{false},
{true},
};
}

@Test(dataProvider = "enableDeduplication")
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
final String subscriptionName = "s1";
// 1.Create topic.
admin1.topics().createNonPartitionedTopic(topicName);
Producer<byte[]> producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
waitReplicatorStarted(topicName);
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
admin2.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
if (enableDeduplication) {
admin2.topicPolicies().setDeduplicationStatus(topicName, true);
}
// 2. Publish messages with multiple schemas.
producer1.newMessage(Schema.STRING).value("msg1").send();
producer1.newMessage(Schema.BOOL).value(false).send();
producer1.newMessage(Schema.STRING).value("msg3").send();
// 3. several unloading, which causes replicator internal producer reconnects.
for (int i = 0; i < 3; i++) {
Thread.sleep(2000);
admin2.topics().unload(topicName);
waitReplicatorStarted(topicName);
}
// Verify: no individual acks.
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertTrue(
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true) > 0);
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2");
assertEquals(cursor.getTotalNonContiguousDeletedMessagesRange(), 0);
assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) < 0);
});
// 4. Adjust schema compatibility and unload topic on the remote side, which will solve the replication stuck
// issue.
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
admin2.topics().unload(topicName);
admin1.topics().unload(topicName);
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2");
assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) >= 0);
});
// Verify: no out-of-order; schemas are as expected.
Consumer<GenericRecord> consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
.subscriptionName(subscriptionName).subscribe();
Collection<String> msgReceived;
if (enableDeduplication) {
msgReceived = new ArrayList<>();
} else {
msgReceived = new LinkedHashSet<>();
}
while (true) {
Message<GenericRecord> message = consumer2.receive(2, TimeUnit.SECONDS);
if (message == null) {
break;
}
SchemaType schemaType = message.getValue().getSchemaType();
assertTrue(schemaType.equals(SchemaType.STRING) || schemaType.equals(SchemaType.BOOLEAN));
msgReceived.add(message.getValue().getNativeObject().toString());
log.info("received msg: {}", message.getValue().getNativeObject().toString());
}
assertEquals(msgReceived, Arrays.asList("msg1", "false", "msg3"));
List<SchemaInfo> schemaInfoList = admin2.schemas().getAllSchemas(topicName);
assertEquals(schemaInfoList.size(), 2);
assertEquals(schemaInfoList.get(0).getType(), SchemaType.STRING);
assertEquals(schemaInfoList.get(1).getType(), SchemaType.BOOLEAN);

// cleanup.
consumer2.close();
producer1.close();
admin2.topics().deleteSubscription(topicName, subscriptionName);
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.FORWARD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
Expand All @@ -65,6 +66,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
protected final String defaultTenant = "public";
protected final String replicatedNamespace = defaultTenant + "/default";
protected final String nonReplicatedNamespace = defaultTenant + "/ns1";
protected final String sourceClusterAlwaysSchemaCompatibleNamespace = defaultTenant + "/always-compatible";

protected final String cluster1 = "r1";

Expand Down Expand Up @@ -157,6 +159,10 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
admin1.namespaces().createNamespace(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
admin1.namespaces().createNamespace(
sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster1, cluster2));
admin1.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
admin1.namespaces().createNamespace(nonReplicatedNamespace);

if (!usingGlobalZK) {
Expand All @@ -177,6 +183,9 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
admin2.namespaces().createNamespace(replicatedNamespace);
admin2.namespaces().createNamespace(sourceClusterAlwaysSchemaCompatibleNamespace);
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.FORWARD);
admin2.namespaces().createNamespace(nonReplicatedNamespace);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,10 @@ public void testRemoveCluster() throws Exception {
admin2.topics().delete(topic);
admin2.namespaces().deleteNamespace(ns1);
}

@Override
@Test(dataProvider = "enableDeduplication", enabled = false)
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
super.testIncompatibleMultiVersionSchema(enableDeduplication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,10 @@ public void testRemoveCluster() throws Exception {
admin2.topics().delete(topic);
admin2.namespaces().deleteNamespace(ns1);
}

@Override
@Test(dataProvider = "enableDeduplication", enabled = false)
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
super.testIncompatibleMultiVersionSchema(enableDeduplication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
Expand All @@ -42,25 +43,33 @@
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.TestNGInstanceOrder;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand All @@ -76,6 +85,8 @@
public class SimpleSchemaTest extends ProducerConsumerBase {

private static final String NAMESPACE = "my-property/my-ns";
private static final String NAMESPACE_ALWAYS_COMPATIBLE = "my-property/always-compatible";
private static final String NAMESPACE_NEVER_COMPATIBLE = "my-property/never-compatible";

@DataProvider(name = "batchingModes")
public static Object[][] batchingModes() {
Expand Down Expand Up @@ -124,6 +135,12 @@ protected void setup() throws Exception {
this.isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
admin.namespaces().createNamespace(NAMESPACE_ALWAYS_COMPATIBLE);
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_ALWAYS_COMPATIBLE,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE);
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE,
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
}

@AfterClass(alwaysRun = true)
Expand Down Expand Up @@ -340,6 +357,78 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
}
}

@Test
public void testProducerConnectStateWhenRegisteringSchema() throws Exception {
final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_ALWAYS_COMPATIBLE + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subscription, MessageId.earliest);

// Create a pulsar client with a delayed response of "getOrCreateSchemaResponse"
CompletableFuture<Void> responseSignal = new CompletableFuture<>();
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleGetOrCreateSchemaResponse(
CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
responseSignal.join();
super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse);
}
});
Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create();
producer.newMessage(Schema.STRING).value("msg").sendAsync();

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
assertEquals(persistentTopic.getProducers().size(), 1);
assertTrue(producer.isConnected());

// cleanup.
responseSignal.complete(null);
producer.close();
client.close();
Awaitility.await().untilAsserted(() -> {
assertEquals(persistentTopic.getProducers().size(), 0);
});
admin.topics().delete(topic);
}

@Test
public void testNoMemoryLeakIfSchemaIncompatible() throws Exception {
final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_NEVER_COMPATIBLE + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subscription, MessageId.earliest);

// Create a pulsar client with a delayed response of "getOrCreateSchemaResponse"
CompletableFuture<Void> responseSignal = new CompletableFuture<>();
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleGetOrCreateSchemaResponse(
CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
responseSignal.join();
super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse);
}
});
Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create();
producer.newMessage(Schema.STRING).value("msg").sendAsync();

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
assertEquals(persistentTopic.getProducers().size(), 1);
assertTrue(producer.isConnected());

// cleanup.
responseSignal.complete(null);
producer.close();
client.close();
Awaitility.await().untilAsserted(() -> {
assertEquals(persistentTopic.getProducers().size(), 0);
});
admin.topics().delete(topic);
}

@Test
public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
String topic = NAMESPACE + "/schema-test";
Expand Down
Loading
Loading