This pip is intended to fix issue #23239.
In the previous implementation of the method startMessageIdInclusive (#4331), we added startMessageIdInclusive() to support include current position of reset on ReaderBuilder.
However, the condition if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0)
in PersistentTopic#getNonDurableSubscription was directly removed.
When we use the NonDurableSubscription, this caused the entryId to decrease by 1 for non-batch messages,
resulting in wrong msgBackLog after topic unload for non-durable subscription.
Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, and fix the NonDurable Subscription msgBackLog incorrect after topic unload
- CommandSubscribe add the field resetIncludeHead, when use the ConsumerBuilder#startMessageIdInclusive or ReaderBuilder#startMessageIdInclusive this param is true, otherwise it is false.
- PersistTopic#getNonDurableSubscription add the judge condition
(msgId.getBatchIndex() >= 0 || resetIncludeHead)
, entryId - 1 will execute when msg is batch or the resetIncludeHead is true.
Add reset_include_head
field to the CommandSubscribe
.
PulsarApi.proto
message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Shared = 3;
}
required string topic = 1;
required string subscription = 2;
required SubType subType = 3;
required uint64 consumer_id = 4;
required uint64 request_id = 5;
optional string consumer_name = 6;
optional int32 priority_level = 7;
// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];
// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;
/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;
optional bool read_compacted = 11;
optional Schema schema = 12;
enum InitialPosition {
Latest = 0;
Earliest = 1;
}
// Signal whether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];
// Mark the subscription as "replicated". Pulsar will make sure
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;
// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];
// If specified, the subscription will reset cursor's position back
// to specified seconds and will send messages from that point
optional uint64 start_message_rollback_duration_sec = 16 [default = 0];
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;
optional bool reset_include_head = 20 [default = false];
}
- Mailing List discussion thread: https://lists.apache.org/thread/tvk9nl47gqtbx2nrdzknm4v8sm67ywp9
- Mailing List voting thread: https://lists.apache.org/thread/x5xknyyr0pyrhwoslbxbbcj0o1xyyyp6