Skip to content

[improve][pip] PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation #23427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions pip/pip-386.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation

# Motivation

This pip is intended to fix issue https://github.com/apache/pulsar/issues/23239.

In the previous implementation of the method startMessageIdInclusive (https://github.com/apache/pulsar/pull/4331),
we added startMessageIdInclusive() to support include current position of reset on ReaderBuilder.

However, the condition `if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0)` in PersistentTopic#getNonDurableSubscription was directly removed.
When we use the NonDurableSubscription, this caused the entryId to decrease by 1 for non-batch messages,
resulting in wrong msgBackLog after topic unload for non-durable subscription.

# Goals

Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, and fix the NonDurable Subscription msgBackLog incorrect after topic unload

# High Level Design

# Detailed Design

## Design & Implementation Details

- CommandSubscribe add the field **resetIncludeHead**, when use the ConsumerBuilder#startMessageIdInclusive or ReaderBuilder#startMessageIdInclusive this param is true, otherwise it is false.
- PersistTopic#getNonDurableSubscription add the judge condition `(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId - 1 will execute **when msg is batch or the resetIncludeHead is true.**


### Binary protocol

Add `reset_include_head` field to the `CommandSubscribe`.

```protobuf
PulsarApi.proto

message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Shared = 3;
}
required string topic = 1;
required string subscription = 2;
required SubType subType = 3;

required uint64 consumer_id = 4;
required uint64 request_id = 5;
optional string consumer_name = 6;
optional int32 priority_level = 7;

// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];

// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;

/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;

optional bool read_compacted = 11;

optional Schema schema = 12;
enum InitialPosition {
Latest = 0;
Earliest = 1;
}
// Signal whether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];

// Mark the subscription as "replicated". Pulsar will make sure
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;

// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];

// If specified, the subscription will reset cursor's position back
// to specified seconds and will send messages from that point
optional uint64 start_message_rollback_duration_sec = 16 [default = 0];

optional KeySharedMeta keySharedMeta = 17;

repeated KeyValue subscription_properties = 18;

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

optional bool reset_include_head = 20 [default = false];
}
```


# Links

* Mailing List discussion thread: https://lists.apache.org/thread/tvk9nl47gqtbx2nrdzknm4v8sm67ywp9
* Mailing List voting thread: https://lists.apache.org/thread/x5xknyyr0pyrhwoslbxbbcj0o1xyyyp6