Skip to content

Commit 7eb3bcd

Browse files
authored
Properly handle cases where fetch records returns error (#64)
1 parent f1b82b9 commit 7eb3bcd

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

pkg/dataplane/http/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1360,7 +1360,7 @@ func decodeCapnpAttributes(keyValues node_common_capnp.VnObjectItemsGetMappedKey
13601360
func (c *context) getItemsParseJSONResponse(response *v3io.Response, getItemsInput *v3io.GetItemsInput) (*v3io.GetItemsOutput, error) {
13611361

13621362
getItemsResponse := struct {
1363-
Items []map[string]map[string]interface{}
1363+
Items []map[string]map[string]interface{}
13641364
NextMarker string
13651365
LastItemIncluded string
13661366
}{}

pkg/dataplane/streamconsumergroup/claim.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"path"
66
"strconv"
7+
"strings"
78
"time"
89

910
"github.com/v3io/v3io-go/pkg/dataplane"
@@ -104,12 +105,33 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
104105
for {
105106
select {
106107
case <-time.After(fetchInterval):
107-
c.currentShardLocation, err = c.fetchRecordBatch(c.currentShardLocation)
108+
location, err := c.fetchRecordBatch(c.currentShardLocation)
108109
if err != nil {
109-
c.logger.WarnWith("Failed fetching record batch", "err", errors.GetErrorStackString(err, 10))
110+
c.logger.WarnWith("Failed fetching record batch",
111+
"shardId", c.shardID,
112+
"err", errors.GetErrorStackString(err, 10))
113+
114+
// if the error is that the location we asked for is bad, try to seek earliest
115+
if c.checkIllegalLocationErr(err) {
116+
location, err := c.recoverLocationAfterIllegalLocationErr()
117+
118+
if err != nil {
119+
c.logger.WarnWith("Failed to recover after illegal location",
120+
"shardId", c.shardID,
121+
"err", errors.GetErrorStackString(err, 10))
122+
123+
continue
124+
}
125+
126+
// set next location for next time
127+
c.currentShardLocation = location
128+
}
129+
110130
continue
111131
}
112132

133+
c.currentShardLocation = location
134+
113135
case <-stopChannel:
114136
close(c.recordBatchChan)
115137
c.logger.Debug("Stopping fetch")
@@ -202,3 +224,47 @@ func (c *claim) getCurrentShardLocation(shardID int) (string, error) {
202224

203225
return currentShardLocation, nil
204226
}
227+
228+
func (c *claim) checkIllegalLocationErr(err error) bool {
229+
230+
// sanity
231+
if err == nil {
232+
return false
233+
}
234+
235+
// try to get the cause
236+
causeErr := errors.Cause(err)
237+
if causeErr == nil {
238+
239+
// if we can't determind the cause, don't assume location (sanity)
240+
return false
241+
}
242+
243+
return strings.Contains(causeErr.Error(), "IllegalLocationException")
244+
}
245+
246+
func (c *claim) recoverLocationAfterIllegalLocationErr() (string, error) {
247+
c.logger.InfoWith("Location requested is invalid. Trying to seek to earliest",
248+
"shardId", c.shardID,
249+
"location", c.currentShardLocation)
250+
251+
streamPath, err := c.member.streamConsumerGroup.getShardPath(c.shardID)
252+
if err != nil {
253+
return "", errors.Wrap(err, "Failed to get shard path")
254+
}
255+
256+
location, err := c.member.streamConsumerGroup.getShardLocationWithSeek(&v3io.SeekShardInput{
257+
Path: streamPath,
258+
Type: v3io.SeekShardInputTypeEarliest,
259+
})
260+
261+
if err != nil {
262+
return "", errors.Wrap(err, "Failed to seek to earliest")
263+
}
264+
265+
c.logger.InfoWith("Got new location after seeking to earliest",
266+
"shardId", c.shardID,
267+
"location", location)
268+
269+
return location, nil
270+
}

0 commit comments

Comments
 (0)