Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong result of reader.hasNext/Next after seeking by id or time #1340

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

shibd
Copy link
Member

@shibd shibd commented Feb 26, 2025

There are three attributes to handle seek/filter/subscribe for Reader.

  1. startMessageID: The start message ID, which can be specified by the user when creating the reader. It can be set to a specific position, or to the earliest or latest. When creating a consumer, this is set in the request, and the broker uses it to set mark delete position.
  2. lastDequeuedMsg: The most recent message dispatched from the consumer's internal queue to the user. If it's nil, it means no messages have been delivered to the user. If there is a value, it indicates the user has consumed up to this point.
  3. lastMessageInBroker: The ID of the last message in the topic on the broker.

Question-1: How to determine hasNext?

Refer to this code

In simple terms, to minimize interaction with the broker:

  1. If lastDequeuedMsg has a value, it clearly indicates consumption up to this position. Compare this lastDequeuedMsg with lastMessageInBroker.
  2. If there is no value, compare startMessageID with lastMessageInBroker.

The final comparison result, if true, indicates there really are messages. If false, it doesn’t necessarily mean there are no messages, because lastMessageInBroker is a cache, and the topic may have more messages already written. Therefore, lastMessageInBroker is updated from the broker again for reevaluation

Question-2: What happens when the consumer reconnects?

When the consumer reconnects, it clears all messages in the receive queue and updates startMessageId with the first message in the queue. This indicates that the user has been consumed up to this position.

!!! The above logic works well without a seek operation, but once the user calls a seek operation, the situation becomes special.

Question-3: How does the seek operation work?

For the broker:
When the client sends a seek request to the broker, the broker will do the following:

  1. Rewind the cursor to the seek position (ID or time).
  2. Send a close request to the consumer.
  3. Send a respond that the seek operation is complete.

For the client:

  1. Send a seek request. Wait for the broker to respond that the seek is complete
  2. Set lastDequeuedMsg to null because after seeking, this value will be invalid. (Tip: Currently missing, this PR fixes it.)
  3. Clear the queue to avoid resetting startMessageId to the first position of the queue upon reconnection. (Tip: Currently missing, this PR fixes it.)
  4. If seekByID:
    • If the passed ID is a specific position or earliest, set startMessageId to seek Id. (Tip: Currently missing, this PR fixes it.)
    • If the passed ID is latest, obtain the topic lastMessageId and set startMessageId to it. (Tip: Currently missing, a separate PR will fix this later as it currently doesn’t support directly passing latest id.)
  5. If seekByTime, the client doesn’t know the messageId corresponding to the time during the seek, it needs to call getLastMessageId and set markdeleteposition to startMessageId. (Tip: Currently missing, this PR fixes it.)
  6. Seek complete, then return to the user.
  7. BTW: Actually, this is just the completion of the seek operation. The real completion requires waiting for the consumer to successfully reconnect to the broker, but this happens in the background and the user doesn't need to be concerned about it. Also, note that we don't need to wait for the consumer to reconnect to the broker before returning the seek completion to the client, because when the broker responds that the seek is complete, it has already reset the cursor. After the client receives the response, it has cleared the queue, so no previous messages will come in. In extremely rare cases (where data from the old connection's buffer re-enters the queue after clearing), we will filter these messages using startMessageId to filter message ID.

Modifications

You can run tests TestReaderWithSeekByID and TestReaderWithSeekByTime to understand the user-side issues fixed by this PR. Simply put, after seeking, calling hasNext, Next again should meet expectations.

Verifying this change

  1. Add TestReaderWithSeekByID and TestReaderWithSeekByTime to cover this change.
  2. I did not change any other test, other tests passed mean not introducing any break change.

Does this pull request potentially affect one of the following parts:

  • No

Documentation

  • No

@shibd shibd self-assigned this Feb 26, 2025
@shibd shibd added this to the v0.15.0 milestone Feb 27, 2025
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any user depends on the existing seek behavior (inclusive by default), he/she might treat it as a "message lost" issue.

This behavior change is beyond the scope of this PR. I also think it needs a discussion at the mail list. We also need to document the change and tell existing users to set startMessageInclusive with true to keep the old behavior. But it's still a breaking change because if startMessageInclusive is true, the behavior of subscribing also changes.

Therefore, my opinion is to keep the existing seek behavior not changed for now. The "fix" could be delayed to next release.

@shibd shibd changed the title Fix wrong result of hasNext after seeking by id or time Fix wrong result of hasNext/Receive after seeking by id or time Mar 6, 2025
@shibd
Copy link
Member Author

shibd commented Mar 6, 2025

This behavior change is beyond the scope of this PR.

@BewareMyPower I changed the title. I think my intention was to clarify the issue with hasNext or receive after seek.

I believe email discussions are pointless because this is a behavior fix. If we need to inform users, we should clearly explain it in the release notes for the next release.

@shibd
Copy link
Member Author

shibd commented Mar 6, 2025

@shibd shibd changed the title Fix wrong result of hasNext/Receive after seeking by id or time Fix wrong result of reader.hasNext/Next after seeking by id or time Mar 6, 2025
@geniusjoe
Copy link
Contributor

geniusjoe commented Mar 7, 2025

If any user depends on the existing seek behavior (inclusive by default), he/she might treat it as a "message lost" issue.

This behavior change is beyond the scope of this PR. I also think it needs a discussion at the mail list. We also need to document the change and tell existing users to set startMessageInclusive with true to keep the old behavior. But it's still a breaking change because if startMessageInclusive is true, the behavior of subscribing also changes.

Therefore, my opinion is to keep the existing seek behavior not changed for now. The "fix" could be delayed to next release.

I'm not very sure what do you mean "message lost"?
Does previous behavior continue to consume messages regardless of seek() happened, or just continue to consume messages in client cache and then jump to consume the position which seek() defined? Would you mind explaining more specific which part of messages are lost compared to the current fixes.

@BewareMyPower
Copy link
Contributor

@geniusjoe

func TestReaderSeek2(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
        assert.Nil(t, err)
        defer client.Close()

        topicName := newTopicName()
        ctx := context.Background()
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topicName,
        })
        ids := make([]MessageID, 0)
        assert.Nil(t, err)
        for i := 0; i < 10; i++ {
                id, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-%d", i)),
                })
                assert.Nil(t, err)
                ids = append(ids, id)
        }
        reader, err := client.CreateReader(ReaderOptions{
                Topic:          topicName,
                StartMessageID: EarliestMessageID(),
        })
        // Seek to id[3] so that reader can continue reading from "msg-3"
        reader.Seek(ids[3])
        msg, err := reader.Next(ctx)
        assert.Nil(t, err)
        t.Log(string(msg.Payload()))
}

Users seek to ids[3] (the position of "msg-3") to expect consuming from "msg-3". However, after the change, "msg-3" is unexpectedly skipped, the test code above will print "msg-4".

if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
if msgID.batchIdx == -1 {
msgID.batchIdx = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reset this value to 0? -1 is not the batch message, 0 is batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a concern. Could you provide a test case to show this change might not work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Go Client, it's not like this. You can run the following code to test:

Need change msg id String method first.

func (id *messageID) String() string {
	return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, id.batchIdx, id.partitionIdx)
}
func TestMessageID_BatchIdx(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: true,
	})
	assert.Nil(t, err)
	defer producer.Close()

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:             topic,
		SubscriptionName:  "my-sub",
		Type:              Exclusive,
		ReceiverQueueSize: 10,
	})
	assert.Nil(t, err)
	defer consumer.Close()

	// send 10 messages with no batch
	var lastMsgID MessageID
	for i := 0; i < 10; i++ {
		lastMsgID, err = producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		t.Logf("Sended message msgId: %#v \n", lastMsgID.String())
		assert.NoError(t, err)
		assert.NotNil(t, lastMsgID)
	}

	// Receive 10 message with no batch
	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		t.Logf("Received message msgId: %#v \n", msg.ID().String())
		consumer.Ack(msg)
	}
}

output:
   consumer_test.go:572: Sended message msgId: "3141:0:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:1:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:2:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:3:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:4:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:5:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:6:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:7:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:8:0:0" 
    consumer_test.go:572: Sended message msgId: "3141:9:0:0" 
    consumer_test.go:581: Received message msgId: "3141:0:0:0" 
    consumer_test.go:581: Received message msgId: "3141:1:0:0" 
    consumer_test.go:581: Received message msgId: "3141:2:0:0" 
    consumer_test.go:581: Received message msgId: "3141:3:0:0" 
    consumer_test.go:581: Received message msgId: "3141:4:0:0" 
    consumer_test.go:581: Received message msgId: "3141:5:0:0" 
    consumer_test.go:581: Received message msgId: "3141:6:0:0" 
    consumer_test.go:581: Received message msgId: "3141:7:0:0" 
    consumer_test.go:581: Received message msgId: "3141:8:0:0" 
    consumer_test.go:581: Received message msgId: "3141:9:0:0" 

In Java, it will be a different implementation class, so there's no batchIndex field when disabled batch.

If we want to keep it consistent, we can change it later.

@shibd shibd requested a review from nodece March 9, 2025 03:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants