Skip to content

Commit 042d266

Browse files
committed
Add support for startFrom query parameter in SSEHandler
Add support for `startFrom` query parameter in HTTP handler. * Modify `src/route/handlers.go` to parse the `startFrom` query parameter in the `SSEHandler` function. * Add code to apply seek after subscribing based on the `startFrom` query parameter in `src/route/handlers.go`. * Create a new file `src/pulsarutil/start_pos.go` and add the `GetStartOption` function to handle the `startFrom` query parameter. --- For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/kafkaesque-io/pulsar-beam?shareId=XXXX-XXXX-XXXX-XXXX).
1 parent 1a2f110 commit 042d266

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

src/pulsarutil/start_pos.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package pulsarutil
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
"time"
8+
9+
"github.com/apache/pulsar-client-go/pulsar"
10+
)
11+
12+
func GetStartOption(startFrom string) (pulsar.SubscriptionInitialPosition, *pulsar.MessageID, *time.Time, error) {
13+
switch {
14+
case startFrom == "earliest":
15+
return pulsar.SubscriptionPositionEarliest, nil, nil, nil
16+
case strings.HasPrefix(startFrom, "messageId:"):
17+
parts := strings.Split(strings.TrimPrefix(startFrom, "messageId:"), ":")
18+
if len(parts) != 2 {
19+
return 0, nil, nil, fmt.Errorf("invalid messageId format, expected ledgerId:entryId")
20+
}
21+
ledgerId, err1 := strconv.ParseInt(parts[0], 10, 64)
22+
entryId, err2 := strconv.ParseInt(parts[1], 10, 64)
23+
if err1 != nil || err2 != nil {
24+
return 0, nil, nil, fmt.Errorf("invalid messageId numbers")
25+
}
26+
msgID := pulsar.NewMessageID(ledgerId, entryId, -1)
27+
return 0, &msgID, nil, nil
28+
case strings.HasPrefix(startFrom, "timestamp:"):
29+
millisStr := strings.TrimPrefix(startFrom, "timestamp:")
30+
millis, err := strconv.ParseInt(millisStr, 10, 64)
31+
if err != nil {
32+
return 0, nil, nil, fmt.Errorf("invalid timestamp")
33+
}
34+
t := time.UnixMilli(millis)
35+
return 0, nil, &t, nil
36+
default:
37+
return 0, nil, nil, fmt.Errorf("unsupported startFrom value")
38+
}
39+
}

src/route/handlers.go

+20
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/kafkaesque-io/pulsar-beam/src/model"
1919
"github.com/kafkaesque-io/pulsar-beam/src/pulsardriver"
2020
"github.com/kafkaesque-io/pulsar-beam/src/util"
21+
"github.com/kafkaesque-io/pulsar-beam/src/pulsarutil"
2122

2223
log "github.com/sirupsen/logrus"
2324
)
@@ -196,6 +197,9 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) {
196197
return
197198
}
198199

200+
// Parse the startFrom query parameter
201+
startFrom := r.URL.Query().Get("startFrom")
202+
199203
// Make sure that the writer supports flushing.
200204
flusher, ok := w.(http.Flusher)
201205
if !ok {
@@ -219,6 +223,22 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) {
219223
defer consumer.Unsubscribe()
220224
}
221225

226+
// Apply seek after subscribing based on the startFrom query parameter
227+
if startFrom != "" {
228+
pos, msgID, ts, err := pulsarutil.GetStartOption(startFrom)
229+
if err != nil {
230+
http.Error(w, err.Error(), http.StatusBadRequest)
231+
return
232+
}
233+
if msgID != nil {
234+
consumer.Seek(*msgID)
235+
} else if ts != nil {
236+
consumer.SeekByTime(*ts)
237+
} else {
238+
consumer.SeekByTime(time.Now())
239+
}
240+
}
241+
222242
consumChan := consumer.Chan()
223243
for {
224244
select {

0 commit comments

Comments
 (0)