-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix wrong result of reader.hasNext/Next after seeking by id or time #1340
base: master
Are you sure you want to change the base?
Conversation
effd9c7
to
c6f09e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any user depends on the existing seek
behavior (inclusive by default), he/she might treat it as a "message lost" issue.
This behavior change is beyond the scope of this PR. I also think it needs a discussion at the mail list. We also need to document the change and tell existing users to set startMessageInclusive
with true to keep the old behavior. But it's still a breaking change because if startMessageInclusive
is true, the behavior of subscribing also changes.
Therefore, my opinion is to keep the existing seek behavior not changed for now. The "fix" could be delayed to next release.
@BewareMyPower I changed the title. I think my intention was to clarify the issue with I believe email discussions are pointless because this is a behavior fix. If we need to inform users, we should clearly explain it in the release notes for the next release. |
Start discuss on thread: https://lists.apache.org/thread/knxsq637pyhltthn77859mpxb5wcxglj |
I'm not very sure what do you mean "message lost"? |
func TestReaderSeek2(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topicName := newTopicName()
ctx := context.Background()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
ids := make([]MessageID, 0)
assert.Nil(t, err)
for i := 0; i < 10; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
})
assert.Nil(t, err)
ids = append(ids, id)
}
reader, err := client.CreateReader(ReaderOptions{
Topic: topicName,
StartMessageID: EarliestMessageID(),
})
// Seek to id[3] so that reader can continue reading from "msg-3"
reader.Seek(ids[3])
msg, err := reader.Next(ctx)
assert.Nil(t, err)
t.Log(string(msg.Payload()))
} Users seek to |
if id.BatchIndex != nil { | ||
msgID.batchIdx = *id.BatchIndex | ||
if msgID.batchIdx == -1 { | ||
msgID.batchIdx = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reset this value to 0? -1 is not the batch message, 0 is batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a concern. Could you provide a test case to show this change might not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Go Client, it's not like this. You can run the following code to test:
Need change msg id String method first.
func (id *messageID) String() string {
return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, id.batchIdx, id.partitionIdx)
}
func TestMessageID_BatchIdx(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
ctx := context.Background()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
})
assert.Nil(t, err)
defer consumer.Close()
// send 10 messages with no batch
var lastMsgID MessageID
for i := 0; i < 10; i++ {
lastMsgID, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
t.Logf("Sended message msgId: %#v \n", lastMsgID.String())
assert.NoError(t, err)
assert.NotNil(t, lastMsgID)
}
// Receive 10 message with no batch
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("Received message msgId: %#v \n", msg.ID().String())
consumer.Ack(msg)
}
}
output:
consumer_test.go:572: Sended message msgId: "3141:0:0:0"
consumer_test.go:572: Sended message msgId: "3141:1:0:0"
consumer_test.go:572: Sended message msgId: "3141:2:0:0"
consumer_test.go:572: Sended message msgId: "3141:3:0:0"
consumer_test.go:572: Sended message msgId: "3141:4:0:0"
consumer_test.go:572: Sended message msgId: "3141:5:0:0"
consumer_test.go:572: Sended message msgId: "3141:6:0:0"
consumer_test.go:572: Sended message msgId: "3141:7:0:0"
consumer_test.go:572: Sended message msgId: "3141:8:0:0"
consumer_test.go:572: Sended message msgId: "3141:9:0:0"
consumer_test.go:581: Received message msgId: "3141:0:0:0"
consumer_test.go:581: Received message msgId: "3141:1:0:0"
consumer_test.go:581: Received message msgId: "3141:2:0:0"
consumer_test.go:581: Received message msgId: "3141:3:0:0"
consumer_test.go:581: Received message msgId: "3141:4:0:0"
consumer_test.go:581: Received message msgId: "3141:5:0:0"
consumer_test.go:581: Received message msgId: "3141:6:0:0"
consumer_test.go:581: Received message msgId: "3141:7:0:0"
consumer_test.go:581: Received message msgId: "3141:8:0:0"
consumer_test.go:581: Received message msgId: "3141:9:0:0"
In Java, it will be a different implementation class, so there's no batchIndex field when disabled batch.
If we want to keep it consistent, we can change it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, because the batchIndex type is int32
, not point.
I think you should remove this change. When the batch index is -1, do not change the value to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address the batch index issue.
There are three attributes to handle seek/filter/subscribe for Reader.
startMessageID
: The start message ID, which can be specified by the user when creating the reader. It can be set to a specific position, or to theearliest
orlatest
. When creating a consumer, this is set in the request, and the broker uses it to set mark delete position.lastDequeuedMsg
: The most recent message dispatched from the consumer's internal queue to the user. If it's nil, it means no messages have been delivered to the user. If there is a value, it indicates the user has consumed up to this point.lastMessageInBroker
: The ID of the last message in the topic on the broker.Question-1: How to determine hasNext?
Refer to this code
In simple terms, to minimize interaction with the broker:
lastDequeuedMsg
has a value, it clearly indicates consumption up to this position. Compare thislastDequeuedMsg with lastMessageInBroker
.startMessageID with lastMessageInBroker
.The final comparison result, if true, indicates there really are messages. If false, it doesn’t necessarily mean there are no messages, because
lastMessageInBroker
is a cache, and the topic may have more messages already written. Therefore, lastMessageInBroker is updated from the broker again for reevaluationQuestion-2: What happens when the consumer reconnects?
When the consumer reconnects, it clears all messages in the
receive queue
and updates startMessageId with the first message in the queue. This indicates that the user has been consumed up to this position.!!! The above logic works well without a seek operation, but once the user calls a seek operation, the situation becomes special.
Question-3: How does the seek operation work?
For the broker:
When the client sends a seek request to the broker, the broker will do the following:
For the client:
lastDequeuedMsg
to null because after seeking, this value will be invalid. (Tip: Currently missing, this PR fixes it.)startMessageId
to the first position of the queue upon reconnection. (Tip: Currently missing, this PR fixes it.)lastMessageId
and set startMessageId to it. (Tip: Currently missing, a separate PR will fix this later as it currently doesn’t support directly passing latest id.)Modifications
You can run tests
TestReaderWithSeekByID
andTestReaderWithSeekByTime
to understand the user-side issues fixed by this PR. Simply put, after seeking, callinghasNext
,Next
again should meet expectations.Verifying this change
Does this pull request potentially affect one of the following parts:
Documentation