-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[draft] [fix] [broker] fix messages lost in scenario geo-replication if enabled deduplication #20128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[draft] [fix] [broker] fix messages lost in scenario geo-replication if enabled deduplication #20128
Conversation
What kind of exceptions will happen to the producer? IMO, we should use infinite publish timeout, if the exception is not publish timeout, it should not be a retriable exception. We should stop the replication. |
@@ -1756,4 +1763,93 @@ public void testReplicatorProducerNotExceed() throws Exception { | |||
|
|||
Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2)); | |||
} | |||
|
|||
@Test | |||
public void testDiscontinuousMessages() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of exceptions will happen to the producer? IMO, we should use infinite publish timeout, if the exception is not publish timeout, it should not be a retriable exception. We should stop the replication.
My description of the scene wasn't good, it is not an exception
for sending messages. Now there are two scenarios:
- the schema of messages is loading, and Replicator will call
rewind
this scenario. See Code-1 below - something is wrong when processing
replicateEntries
, and Replicator will not callrewind
this scenario. See Code-2 below
Code-1
Code-2
I opened a comment list to easily track the context(^_^).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if we can reach https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192, can we just close the replicator and try to restart the replication task?
The main purpose of the description of the PR is to avoid processing [4,5] before [1,2,3] is processed successfully. It should be a corner case with a very low incidence. If close the issue replicator can help with the problem, it will be more simpler for implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if we can reach https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192, can we just close the replicator and try to restart the replication task?
Background: Now the Replicator is having some trouble starting and closing:
- After closing the internal producer of Replicator, this Replicator will not work anymore, there has no mechanism to get it running again
- I will push another PR to fix it.
- The function
disconnect
maybe has a bug that makes the Replicator continues to work after the internal cursor is closed- When
startProducer
anddisconnect
run concurrently, the threaddisconnect
may not get the right producer. - Maybe something is wrong else. I'm looking into it
- I will push another PR to fix it.
- When
So there is not yet a stable way to restart Replicator(other than an unload topic
).
Summary
Now this PR tries to solve three problems in the same way: Make the replicator processes messages only sequentially
- scenario-1: race condition of
read entries
andrewind
- scenario-2: the task replicate messages is aborted by a loading schema
- scenario-3: the task replicate messages is aborted by an unknown error(we are talking about)
Since scenario-1 and scenario-2 are there, the solution Make the replicator processes messages only sequentially
is needed, so we can do no additional processing on scenario-3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the current solution make the replicator not able to re-replicate data(reset the cursor of the replicator) to the remote cluster unless restart the replicator?
And will the solution be resolved by unloading the topic if the replicator runs into an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the current solution make the replicator not able to re-replicate data(reset the cursor of the replicator) to the remote cluster unless restart the replicator?
- If someone moves Replicator's cursor to a larger pointer manually(such as
pulsar-admin topics resetCursor
), everything is ok. The logic of checking "whether the message is continuous or not" isfirst_message_received <= Max(mark_deleted_pos, last_sent_pos) + 1
, sinceresetCursor
will move the variablemarkDeletePosition
to a larger value too, the continuous check will returntrue
. - If someone moves Replicator's cursor to a smaller pointer manually(such as
rewind
orpulsar-admin topics resetCursor
), everything is ok.
And will the solution be resolved by unloading the topic if the replicator runs into an exception?
If something makes the variable lastSent
to a wrong value, it can be solved by unload topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this way looks easier to understand and straightforward.
It's very similar to Pulsar connectors, like a connector to consume messages from a topic and publish them to another topic. I mean if want to introduce a principle to handle such issues, I think we don't want to add such logic to check if there are messages been skipped. This is why I think we should not introduce such logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see:
- for scenario-1: race condition of
read entries
andrewind
- a separate PR or PIP will solve this problem
- for scenario-3: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192
- restart Replicator to fix it.
for scenario-2: the task replicate messages is aborted by a loading schema
time | receive messages |
async load schema |
---|---|---|
1 | receive messages 1~3 |
|
2 | async load schema if needed | |
3 | receive messages 4~5 |
|
4 | send messages 4~5 to remote cluster |
|
5 | load schema success, rewind cursor to position 1 |
|
6 | (Highlight)Receive messages 1~3 and will send fail by duplication check |
This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.
I see. We can continue to handle these messages after loading schema is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui I think this suggestion is good. Thanks. I will try to solve the issues described above with multiple PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This scenario is hard to fix. Because there is no clear marker of which in-flight reads need to be discarded after step-2. I think I should learn how the connector handle messages.
But we will not get an error if the message is duplicated, right? The broker skips the duplicated sends and returns (-1, -1) as the message ID. We are safe to publish messages 1 to 5 again.
* messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. But a new batch messages "[4:1 ~ 6:6]" | ||
* is received later, then these messages will be sent. | ||
*/ | ||
protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) { | |
protected boolean hasMessageSkipped(List<Entry> entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reply of this comment is same as #20128 (comment)
if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) { | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to change the method name to hasMessageSkipped
because we allow duplicated messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Already fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make the logic of replicateEntries
easier to read, I changed the method name to checkNoMessageSkipped
.
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
Outdated
Show resolved
Hide resolved
@@ -183,6 +188,7 @@ protected boolean replicateEntries(List<Entry> entries) { | |||
msg.getMessageBuilder().clearTxnidLeastBits(); | |||
// Increment pending messages for messages produced locally | |||
PENDING_MESSAGES_UPDATER.incrementAndGet(this); | |||
lastSent = entry.getPosition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lastSent
should be updated after the producer sends it successfully.
We are using the async API to send messages.
It looks like the inflight messages will also be treated as lastSent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the inflight messages will also be treated as lastSent?
Yes, it was done on purpose.
Because there has a scenario like this:
- start sending messages
1~3
to the remote cluster, and it is not finished yet(lastSent
is 0 now). - (Highlight)receive messages
4~5
. these messages will be discarded and trigger a rewind. This will reduce the efficiency of task Replication.
If we update the variable lastSent
when the sending is started(this PR does), it will works like this:
- start sending messages
1~3
to the remote cluster, and it is not finished yet(lastSent
is 3 now). - receive messages
4~5
and send them(lastSent
is 5 now). - (Highlight)the message
3
failed to send, callrewind
(markDeleted
is 2 now) and set the variablelastSent
toearliest
.- Because the timeout of the producer is
0
, the probability of this scenario occurring is very, very small.
- Because the timeout of the producer is
- receive the message
3
the second time. because3 > max(2, earliest)
, the checkhasMessageSkipped
will be pass
@@ -113,6 +117,7 @@ protected boolean replicateEntries(List<Entry> entries) { | |||
|
|||
// Increment pending messages for messages produced locally | |||
PENDING_MESSAGES_UPDATER.incrementAndGet(this); | |||
lastSent = entry.getPosition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the comment in GeoReplicator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the context: the linked comment is https://github.com/apache/pulsar/pull/20128/files#r1171223753
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reply for this comment is same as #20128 (comment)
The pr had no activity for 30 days, mark with Stale label. |
This PR will continue after waiting for PIP-269 to complete. |
The pr had no activity for 30 days, mark with Stale label. |
Motivation
Background of Deduplication: Every message will have an attribute
sequence-id
, and Broker will mark the lastsequence-id
for each producer. If a producer sends a message with a smallersequence-id
, it will be rejected.Background of Geo-Replication: The replicator works like this: read messages from the source cluster; send messages to the target cluster; if there has something error, Replicator will rewind these messages; loop to the next reading.
Background of ManagedCursor: ManagedCursor marks the next position which will read from BK, and we call it
readPosition
, and update after a read is complete. For example:readPosition
with0.
readPosition
to10.
(Highlight)Therefore, if messages are sent out of order, many messages will be discarded. for example: if send
1,2,3,5,4
, then the message4
will be discarded by duplication check.There have three scenarios that make messages out of order
scenario-1
thread rewind
thread reading messages
[3:0 ~ 4:0]
readPosition=4:0, markDeleted=3:0
)readPosition
(--> 3:0
)`readPosition
(--> 5:0
)[4:0 ~ 5.0]
3:0 ~ 4:0
will nolanger be received until nextrewind
scenario-2
receive messages
async load schema
1~3
4~5
4~5
to remote cluster1
1~3
and will send fail by duplication check[1]: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L160-L179
scenario-3
1~3
4~5
4~5
to the remote cluster1~3
are lost[2]: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L190-L192
Modifications
Make the replicator processes messages only sequentially.
Why not just fix the issues which make messages out of order
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: