Skip to content

Commit f9b3c0f

Browse files
authored
Add seek logic for reader (#356)
Signed-off-by: xiaolong.ran <[email protected]> ### Motivation Follow #222 and add the seek logic for reader ### Modifications - Add `seek by msgID` interface - Add `seek by time` interface - Add test case ### Verifying this change - [x] Make sure that the change passes the CI checks.
1 parent a7e7239 commit f9b3c0f

File tree

5 files changed

+123
-10
lines changed

5 files changed

+123
-10
lines changed

pulsar/consumer_partition.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
207207
if msgID.entryID != noMessageEntry {
208208
pc.startMessageID = msgID
209209

210-
err = pc.requestSeek(msgID)
210+
err = pc.requestSeek(msgID.messageID)
211211
if err != nil {
212212
return nil, err
213213
}
@@ -276,7 +276,7 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
276276
req.msgID, req.err = pc.requestGetLastMessageID()
277277
}
278278

279-
func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
279+
func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
280280
requestID := pc.client.rpcClient.NewRequestID()
281281
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
282282
RequestId: proto.Uint64(requestID),
@@ -286,7 +286,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
286286
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
287287
if err != nil {
288288
pc.log.WithError(err).Error("Failed to get last message id")
289-
return messageID{}, err
289+
return trackingMessageID{}, err
290290
}
291291
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
292292
return convertToMessageID(id), nil
@@ -365,7 +365,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
365365

366366
func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
367367
defer close(seek.doneCh)
368-
seek.err = pc.requestSeek(seek.msgID)
368+
seek.err = pc.requestSeek(seek.msgID.messageID)
369369
}
370370

371371
func (pc *partitionConsumer) requestSeek(msgID messageID) error {

pulsar/internal/connection.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -665,8 +665,8 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
665665
}
666666

667667
func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
668-
c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
669668
consumerID := closeConsumer.GetConsumerId()
669+
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
670670

671671
c.Lock()
672672
defer c.Unlock()

pulsar/reader.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package pulsar
1919

20-
import "context"
20+
import (
21+
"context"
22+
"time"
23+
)
2124

2225
// ReaderMessage package Reader and Message as a struct to use
2326
type ReaderMessage struct {
@@ -88,4 +91,21 @@ type Reader interface {
8891

8992
// Close the reader and stop the broker to push more messages
9093
Close()
94+
95+
// Reset the subscription associated with this reader to a specific message id.
96+
// The message id can either be a specific message or represent the first or last messages in the topic.
97+
//
98+
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
99+
// seek() on the individual partitions.
100+
Seek(MessageID) error
101+
102+
// Reset the subscription associated with this reader to a specific message publish time.
103+
//
104+
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
105+
// the individual partitions.
106+
//
107+
// @param timestamp
108+
// the message publish time where to reposition the subscription
109+
//
110+
SeekByTime(time time.Time) error
91111
}

pulsar/reader_impl.go

+38
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package pulsar
2020
import (
2121
"context"
2222
"fmt"
23+
"sync"
2324
"time"
2425

2526
"github.com/prometheus/client_golang/prometheus"
@@ -45,6 +46,7 @@ var (
4546
)
4647

4748
type reader struct {
49+
sync.Mutex
4850
pc *partitionConsumer
4951
messageCh chan ConsumerMessage
5052
lastMessageInBroker trackingMessageID
@@ -187,3 +189,39 @@ func (r *reader) Close() {
187189
r.pc.Close()
188190
readersClosed.Inc()
189191
}
192+
193+
func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
194+
mid, ok := toTrackingMessageID(msgID)
195+
if !ok {
196+
r.log.Warnf("invalid message id type %T", msgID)
197+
return trackingMessageID{}, false
198+
}
199+
200+
partition := int(mid.partitionIdx)
201+
// did we receive a valid partition index?
202+
if partition < 0 {
203+
r.log.Warnf("invalid partition index %d expected", partition)
204+
return trackingMessageID{}, false
205+
}
206+
207+
return mid, true
208+
}
209+
210+
func (r *reader) Seek(msgID MessageID) error {
211+
r.Lock()
212+
defer r.Unlock()
213+
214+
mid, ok := r.messageID(msgID)
215+
if !ok {
216+
return nil
217+
}
218+
219+
return r.pc.Seek(mid)
220+
}
221+
222+
func (r *reader) SeekByTime(time time.Time) error {
223+
r.Lock()
224+
defer r.Unlock()
225+
226+
return r.pc.SeekByTime(time)
227+
}

pulsar/reader_test.go

+59-4
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,65 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
447447
}
448448
}
449449

450+
func TestReaderSeek(t *testing.T) {
451+
client, err := NewClient(ClientOptions{
452+
URL: lookupURL,
453+
})
454+
assert.Nil(t, err)
455+
defer client.Close()
456+
457+
topicName := newTopicName()
458+
ctx := context.Background()
459+
460+
producer, err := client.CreateProducer(ProducerOptions{
461+
Topic: topicName,
462+
})
463+
assert.Nil(t, err)
464+
defer producer.Close()
465+
466+
reader, err := client.CreateReader(ReaderOptions{
467+
Topic: topicName,
468+
StartMessageID: EarliestMessageID(),
469+
})
470+
assert.Nil(t, err)
471+
defer reader.Close()
472+
473+
const N = 10
474+
var seekID MessageID
475+
for i := 0; i < N; i++ {
476+
id, err := producer.Send(ctx, &ProducerMessage{
477+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
478+
})
479+
assert.Nil(t, err)
480+
481+
if i == 4 {
482+
seekID = id
483+
}
484+
}
485+
err = producer.Flush()
486+
assert.NoError(t, err)
487+
488+
for i := 0; i < N; i++ {
489+
msg, err := reader.Next(ctx)
490+
assert.Nil(t, err)
491+
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
492+
}
493+
494+
err = reader.Seek(seekID)
495+
assert.Nil(t, err)
496+
497+
readerOfSeek, err := client.CreateReader(ReaderOptions{
498+
Topic: topicName,
499+
StartMessageID: seekID,
500+
StartMessageIDInclusive: true,
501+
})
502+
assert.Nil(t, err)
503+
504+
msg, err := readerOfSeek.Next(ctx)
505+
assert.Nil(t, err)
506+
assert.Equal(t, "hello-4", string(msg.Payload()))
507+
}
508+
450509
func TestReaderLatestInclusiveHasNext(t *testing.T) {
451510
client, err := NewClient(ClientOptions{
452511
URL: lookupURL,
@@ -498,14 +557,10 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
498557
assert.Nil(t, err)
499558
defer reader.Close()
500559

501-
var msgID MessageID
502560
if reader.HasNext() {
503561
msg, err := reader.Next(context.Background())
504562
assert.NoError(t, err)
505563

506564
assert.Equal(t, []byte("hello-9"), msg.Payload())
507-
msgID = msg.ID()
508565
}
509-
510-
assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
511566
}

0 commit comments

Comments
 (0)