From 7fa9324865224e6c0efdf99655cca75dd6d401e4 Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Wed, 9 Oct 2024 20:53:03 +0800 Subject: [PATCH 1/2] [improve][pip] PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation --- pip/pip-386.md | 118 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 pip/pip-386.md diff --git a/pip/pip-386.md b/pip/pip-386.md new file mode 100644 index 0000000000000..c886096e29f07 --- /dev/null +++ b/pip/pip-386.md @@ -0,0 +1,118 @@ +# PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation + +# Motivation + +This pip is intended to fix issue https://github.com/apache/pulsar/issues/23239. + +In the previous implementation of the method startMessageIdInclusive (https://github.com/apache/pulsar/pull/4331), +we added startMessageIdInclusive() to support include current position of reset on ReaderBuilder. + +However, the condition `if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0)` in PersistentTopic#getNonDurableSubscription was directly removed. +When we use the NonDurableSubscription, this caused the entryId to decrease by 1 for non-batch messages, +resulting in wrong msgBackLog after topic unload for non-durable subscription. + +# Goals + +Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, and fix the NonDurable Subscription msgBackLog incorrect after topic unload + +# High Level Design + +# Detailed Design + +## Design & Implementation Details + +- CommandSubscribe add the field **resetIncludeHead**, when use the ConsumerBuilder#startMessageIdInclusive or ReaderBuilder#startMessageIdInclusive this param is true, otherwise it is false. +- PersistTopic#getNonDurableSubscription add the judge condition `(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId - 1 will execute **when msg is batch or the resetIncludeHead is true.** + + +```java + if (ledgerId >= 0 && entryId >= 0 + && msgId instanceof BatchMessageIdImpl + && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) { + // When the start message is relative to a batch, we need to take one step back on the previous + // message, + // because the "batch" might not have been consumed in its entirety. + // The client will then be able to discard the first messages if needed. + entryId = msgId.getEntryId() - 1; + } +``` + + +### Binary protocol + +Add `reset_include_head` field to the `CommandSubscribe`. + +```protobuf +PulsarApi.proto + +message CommandSubscribe { + enum SubType { + Exclusive = 0; + Shared = 1; + Failover = 2; + Key_Shared = 3; + } + required string topic = 1; + required string subscription = 2; + required SubType subType = 3; + + required uint64 consumer_id = 4; + required uint64 request_id = 5; + optional string consumer_name = 6; + optional int32 priority_level = 7; + + // Signal wether the subscription should be backed by a + // durable cursor or not + optional bool durable = 8 [default = true]; + + // If specified, the subscription will position the cursor + // markd-delete position on the particular message id and + // will send messages from that point + optional MessageIdData start_message_id = 9; + + /// Add optional metadata key=value to this consumer + repeated KeyValue metadata = 10; + + optional bool read_compacted = 11; + + optional Schema schema = 12; + enum InitialPosition { + Latest = 0; + Earliest = 1; + } + // Signal whether the subscription will initialize on latest + // or not -- earliest + optional InitialPosition initialPosition = 13 [default = Latest]; + + // Mark the subscription as "replicated". Pulsar will make sure + // to periodically sync the state of replicated subscriptions + // across different clusters (when using geo-replication). + optional bool replicate_subscription_state = 14; + + // If true, the subscribe operation will cause a topic to be + // created if it does not exist already (and if topic auto-creation + // is allowed by broker. + // If false, the subscribe operation will fail if the topic + // does not exist. + optional bool force_topic_creation = 15 [default = true]; + + // If specified, the subscription will reset cursor's position back + // to specified seconds and will send messages from that point + optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; + + optional KeySharedMeta keySharedMeta = 17; + + repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch + optional uint64 consumer_epoch = 19; + + optional bool reset_include_head = 20 [default = false]; +} +``` + + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From f604dedaf503342e32ce40b7a8965847644e15a2 Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Thu, 17 Oct 2024 12:39:19 +0800 Subject: [PATCH 2/2] update --- pip/pip-386.md | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/pip/pip-386.md b/pip/pip-386.md index c886096e29f07..1a7dad32db540 100644 --- a/pip/pip-386.md +++ b/pip/pip-386.md @@ -25,19 +25,6 @@ Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, a - PersistTopic#getNonDurableSubscription add the judge condition `(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId - 1 will execute **when msg is batch or the resetIncludeHead is true.** -```java - if (ledgerId >= 0 && entryId >= 0 - && msgId instanceof BatchMessageIdImpl - && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) { - // When the start message is relative to a batch, we need to take one step back on the previous - // message, - // because the "batch" might not have been consumed in its entirety. - // The client will then be able to discard the first messages if needed. - entryId = msgId.getEntryId() - 1; - } -``` - - ### Binary protocol Add `reset_include_head` field to the `CommandSubscribe`. @@ -114,5 +101,5 @@ message CommandSubscribe { # Links -* Mailing List discussion thread: -* Mailing List voting thread: +* Mailing List discussion thread: https://lists.apache.org/thread/tvk9nl47gqtbx2nrdzknm4v8sm67ywp9 +* Mailing List voting thread: https://lists.apache.org/thread/x5xknyyr0pyrhwoslbxbbcj0o1xyyyp6