Skip to content

[draft] [fix] [broker] fix messages lost in scenario geo-replication if enabled deduplication #20128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ protected boolean replicateEntries(List<Entry> entries) {
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

try {
if (!checkNoMessageSkipped(entries)) {
return false;
}

// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
Expand Down Expand Up @@ -173,6 +177,7 @@ protected boolean replicateEntries(List<Entry> entries) {
replicatorId, e);
}
log.info("[{}] Resume the data replication after the schema fetching done", replicatorId);
lastSent = PositionImpl.EARLIEST;
cursor.rewind();
fetchSchemaInProgress = false;
readMoreEntries();
Expand All @@ -183,6 +188,7 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.getMessageBuilder().clearTxnidLeastBits();
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
lastSent = entry.getPosition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastSent should be updated after the producer sends it successfully.
We are using the async API to send messages.
It looks like the inflight messages will also be treated as lastSent?

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the inflight messages will also be treated as lastSent?

Yes, it was done on purpose.


Because there has a scenario like this:

  • start sending messages 1~3 to the remote cluster, and it is not finished yet(lastSent is 0 now).
  • (Highlight)receive messages 4~5. these messages will be discarded and trigger a rewind. This will reduce the efficiency of task Replication.

If we update the variable lastSent when the sending is started(this PR does), it will works like this:

  • start sending messages 1~3 to the remote cluster, and it is not finished yet(lastSent is 3 now).
  • receive messages 4~5 and send them(lastSent is 5 now).
  • (Highlight)the message 3 failed to send, call rewind(markDeleted is 2 now) and set the variable lastSent to earliest.
    • Because the timeout of the producer is 0, the probability of this scenario occurring is very, very small.
  • receive the message 3 the second time. because 3 > max(2, earliest), the check hasMessageSkipped will be pass

producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
atLeastOneMessageSentForReplication = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -70,6 +72,7 @@ public abstract class PersistentReplicator extends AbstractReplicator

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected final ManagedLedgerImpl managedLedger;

protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private final Object dispatchRateLimiterLock = new Object();
Expand Down Expand Up @@ -108,6 +111,8 @@ public abstract class PersistentReplicator extends AbstractReplicator

protected volatile boolean fetchSchemaInProgress = false;

protected volatile Position lastSent = PositionImpl.EARLIEST;

public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor,
String remoteCluster, String remoteTopic,
BrokerService brokerService, PulsarClientImpl replicationClient)
Expand All @@ -116,6 +121,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
brokerService, replicationClient);
this.topic = localTopic;
this.cursor = cursor;
this.managedLedger = (ManagedLedgerImpl) localTopic.getManagedLedger();
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
Expand All @@ -135,6 +141,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
@Override
protected void readEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
lastSent = PositionImpl.EARLIEST;
cursor.rewind();

cursor.cancelPendingReadRequest();
Expand Down Expand Up @@ -268,6 +275,36 @@ protected void readMoreEntries() {
}
}

/**
* Check if some messages are skipped, if yes, the messages will be discarded and rewind is called.
* Why are messages are skipped, for example: if there has something wrong when Replicator is processing
* messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. But a new batch messages "[4:1 ~ 6:6]"
* is received later, then these messages will be sent.
*/
protected boolean checkNoMessageSkipped(List<Entry> entries) {
if (CollectionUtils.isEmpty(entries)){
return true;
}

PositionImpl markDeletedPos = (PositionImpl) cursor.getMarkDeletedPosition();
PositionImpl lastSentPos = (PositionImpl) lastSent;
PositionImpl lastProcessedPos = markDeletedPos.compareTo(lastSentPos) > 0 ? markDeletedPos : lastSentPos;

PositionImpl firstMessageReceived = (PositionImpl) entries.get(0).getPosition();
PositionImpl expectedFirstMessage = managedLedger.getNextValidPosition(lastProcessedPos);
if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) {
return true;
}
Comment on lines +295 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to change the method name to hasMessageSkipped because we allow duplicated messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Already fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the logic of replicateEntries easier to read, I changed the method name to checkNoMessageSkipped.


log.warn("[{}] Detected messages are skipped. first received: {}, last processed: {}",
replicatorId, firstMessageReceived, expectedFirstMessage);
entries.forEach(Entry::release);
cursor.cancelPendingReadRequest();
this.lastSent = PositionImpl.EARLIEST;
cursor.rewind();
return false;
}

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -332,6 +369,7 @@ public void sendComplete(Exception exception) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception);
// cursor should be rewinded since it was incremented when readMoreEntries
replicator.lastSent = PositionImpl.EARLIEST;
replicator.cursor.rewind();
} else {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ protected boolean replicateEntries(List<Entry> entries) {
boolean atLeastOneMessageSentForReplication = false;

try {
if (!checkNoMessageSkipped(entries)) {
return false;
}

// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
Expand Down Expand Up @@ -113,6 +117,7 @@ protected boolean replicateEntries(List<Entry> entries) {

// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
lastSent = entry.getPosition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment in GeoReplicator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the context: the linked comment is https://github.com/apache/pulsar/pull/20128/files#r1171223753

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reply for this comment is same as #20128 (comment)

producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
atLeastOneMessageSentForReplication = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -51,6 +54,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -67,6 +72,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -90,6 +96,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -1756,4 +1763,93 @@ public void testReplicatorProducerNotExceed() throws Exception {

Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2));
}

@Test
public void testDiscontinuousMessages() throws Exception {
Copy link
Contributor Author

@poorbarcode poorbarcode Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @codelipenghui

What kind of exceptions will happen to the producer? IMO, we should use infinite publish timeout, if the exception is not publish timeout, it should not be a retriable exception. We should stop the replication.

My description of the scene wasn't good, it is not an exception for sending messages. Now there are two scenarios:

  • the schema of messages is loading, and Replicator will call rewind this scenario. See Code-1 below
  • something is wrong when processing replicateEntries, and Replicator will not call rewind this scenario. See Code-2 below

Code-1

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L160-L179

Code-2

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192

I opened a comment list to easily track the context(^_^).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if we can reach https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192, can we just close the replicator and try to restart the replication task?

The main purpose of the description of the PR is to avoid processing [4,5] before [1,2,3] is processed successfully. It should be a corner case with a very low incidence. If close the issue replicator can help with the problem, it will be more simpler for implementation.

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

Ok, if we can reach https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192, can we just close the replicator and try to restart the replication task?

Background: Now the Replicator is having some trouble starting and closing:

  • After closing the internal producer of Replicator, this Replicator will not work anymore, there has no mechanism to get it running again
    • I will push another PR to fix it.
  • The function disconnect maybe has a bug that makes the Replicator continues to work after the internal cursor is closed
    • When startProducer and disconnect run concurrently, the thread disconnect may not get the right producer.
    • Maybe something is wrong else. I'm looking into it
    • I will push another PR to fix it.

So there is not yet a stable way to restart Replicator(other than an unload topic).


Summary

Now this PR tries to solve three problems in the same way: Make the replicator processes messages only sequentially

  • scenario-1: race condition of read entries and rewind
  • scenario-2: the task replicate messages is aborted by a loading schema
  • scenario-3: the task replicate messages is aborted by an unknown error(we are talking about)

Since scenario-1 and scenario-2 are there, the solution Make the replicator processes messages only sequentially is needed, so we can do no additional processing on scenario-3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the current solution make the replicator not able to re-replicate data(reset the cursor of the replicator) to the remote cluster unless restart the replicator?

And will the solution be resolved by unloading the topic if the replicator runs into an exception?

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

Will the current solution make the replicator not able to re-replicate data(reset the cursor of the replicator) to the remote cluster unless restart the replicator?

  1. If someone moves Replicator's cursor to a larger pointer manually(such as pulsar-admin topics resetCursor), everything is ok. The logic of checking "whether the message is continuous or not" is first_message_received <= Max(mark_deleted_pos, last_sent_pos) + 1, since resetCursor will move the variable markDeletePosition to a larger value too, the continuous check will return true.
  2. If someone moves Replicator's cursor to a smaller pointer manually(such as rewind or pulsar-admin topics resetCursor), everything is ok.

And will the solution be resolved by unloading the topic if the replicator runs into an exception?

If something makes the variable lastSent to a wrong value, it can be solved by unload topic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this way looks easier to understand and straightforward.
It's very similar to Pulsar connectors, like a connector to consume messages from a topic and publish them to another topic. I mean if want to introduce a principle to handle such issues, I think we don't want to add such logic to check if there are messages been skipped. This is why I think we should not introduce such logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

I see:

for scenario-2: the task replicate messages is aborted by a loading schema

time receive messages async load schema
1 receive messages 1~3
2 async load schema if needed
3 receive messages 4~5
4 send messages 4~5 to remote cluster
5 load schema success, rewind cursor to position 1
6 (Highlight)Receive messages 1~3 and will send fail by duplication check

This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.

I see. We can continue to handle these messages after loading schema is finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui I think this suggestion is good. Thanks. I will try to solve the issues described above with multiple PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.

But we will not get an error if the message is duplicated, right? The broker skips the duplicated sends and returns (-1, -1) as the message ID. We are safe to publish messages 1 to 5 again.

final Field fieldReadPosition = ManagedCursorImpl.class.getDeclaredField("readPosition");
fieldReadPosition.setAccessible(true);
final Field fieldPendingMessages = PersistentReplicator.class.getDeclaredField("pendingMessages");
fieldPendingMessages.setAccessible(true);
final String namespace = "pulsar/ns1";
final TopicName tName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"));
final String tpName = tName.toString();
final String ignoredSubName = "sub_ignore";
final String replicatorSubName = "pulsar.repl.r2";

// create topics.
admin1.topics().createNonPartitionedTopic(tpName);
admin1.topics().createSubscription(tpName, ignoredSubName, MessageId.earliest);
admin1.topicPolicies().setDeduplicationStatus(tpName, true);
try {
admin2.topics().createNonPartitionedTopic(tpName);
} catch (Exception ex){/* Maybe ConflictException: because replicator will create this topic too.*/}
admin2.topics().createSubscription(tpName, ignoredSubName, MessageId.earliest);
admin2.schemas().createSchema(tpName, Schema.STRING.getSchemaInfo());
admin2.topicPolicies().setDeduplicationStatus(tpName, true);

// init clients.
final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
final Producer producer1 = client1.newProducer().topic(tpName).enableBatching(false).create();
final Consumer consumer2 = client2.newConsumer().topic(tpName).subscriptionName(ignoredSubName).subscribe();
final PersistentTopic tp =
(PersistentTopic) pulsar1.getBrokerService().getTopic(tName.toString(), false).join().get();
final ManagedLedgerImpl ml = (ManagedLedgerImpl) tp.getManagedLedger();
final ManagedCursorImpl replicatorCursor = (ManagedCursorImpl) ml.openCursor(replicatorSubName,
CommandSubscribe.InitialPosition.Earliest);
tp.checkReplication().join();
final GeoPersistentReplicator replicator = (GeoPersistentReplicator) tp.getReplicators().values().get(0);

// pause replication.
fieldPendingMessages.set(replicator, Integer.MAX_VALUE);
// sleep some seconds to skip the first reading.
Thread.sleep(2000);

// Set a wrong read position to mock discontinuous read.
BiConsumer<MessageId, Throwable> actionToMakeReadPositionError = (messageId, ex) -> {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
PositionImpl wrongReadPosition =
PositionImpl.get(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId());
Position originalPos = replicatorCursor.getReadPosition();
try {
fieldReadPosition.set(replicatorCursor, wrongReadPosition);
} catch (IllegalAccessException e) {
}
log.info("original read-pos: {}, set wrong pos: {}", originalPos,
replicatorCursor.getReadPosition());
};

// Send messages.
final int sentCount = 200;
for (int i = 0; i < sentCount; i++){
CompletableFuture<MessageId> sendFuture =
producer1.sendAsync(Integer.valueOf(i).toString().getBytes(Charset.defaultCharset()));
if (i % 10 == 0 && i > 10) {
sendFuture.whenComplete(actionToMakeReadPositionError);
}
}

// Start replication.
fieldPendingMessages.set(replicator, 0);
replicator.readEntriesComplete(Collections.emptyList(), null);

// Receive replicated messages.
final AtomicInteger receivedCounter = new AtomicInteger();
final ArrayBlockingQueue<Integer> receivedMessages = new ArrayBlockingQueue<>(sentCount);
// 60s is set only to avoid flaky test, which is slow to run locally.
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
Message<byte[]> msg = consumer2.receive(2, TimeUnit.SECONDS);
if (msg != null) {
String receivedMsg = new String(msg.getValue(), Charset.defaultCharset());
log.info("received: {}", receivedMsg);
receivedMessages.add(Integer.valueOf(receivedMsg));
receivedCounter.incrementAndGet();
}
assertEquals(receivedCounter.get(), sentCount);
});

// Verify the order in which messages are received.
ArrayList<Integer> sortedReceivedMessages = new ArrayList<>(receivedMessages);
assertEquals(receivedMessages, sortedReceivedMessages);
}
}
Loading