Skip to content

Commit 230fc20

Browse files
committed
IG-19366: if the shard does not exist yet, turn seek-latest to seek-earliest
1 parent 4d7cfc9 commit 230fc20

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

pkg/dataplane/streamconsumergroup/claim.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,22 +222,30 @@ func (c *claim) fetchRecordBatch(location string) (string, error) {
222222

223223
func (c *claim) getCurrentShardLocation(shardID int) (string, error) {
224224

225+
initialLocation := c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.InitialLocation
226+
225227
// get the location from persistency
226-
currentShardLocation, err := c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID)
228+
currentShardLocation, err := c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID, initialLocation)
227229
if err != nil && errors.RootCause(err) != ErrShardNotFound {
228230
return "", errors.Wrap(err, "Failed to get shard location from persistency")
229231
}
230232

231233
// if shard wasn't found, try again periodically
232234
if errors.RootCause(err) == ErrShardNotFound {
235+
236+
// if the shard does not exist yet, turn seek-latest to seek-earliest (IG-19366)
237+
if initialLocation == v3io.SeekShardInputTypeLatest {
238+
initialLocation = v3io.SeekShardInputTypeEarliest
239+
}
240+
233241
for {
234242
select {
235243

236244
// TODO: from configuration
237245
case <-time.After(c.member.streamConsumerGroup.config.SequenceNumber.ShardWaitInterval):
238246

239247
// get the location from persistency
240-
currentShardLocation, err = c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID)
248+
currentShardLocation, err = c.member.streamConsumerGroup.getShardLocationFromPersistency(shardID, initialLocation)
241249
if err != nil {
242250
if errors.RootCause(err) == ErrShardNotFound {
243251

pkg/dataplane/streamconsumergroup/streamconsumergroup.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (scg *streamConsumerGroup) getStateFilePath() string {
208208
return path.Join(scg.streamPath, fmt.Sprintf("%s-state.json", scg.name))
209209
}
210210

211-
func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int) (string, error) {
211+
func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int, initialLocation v3io.SeekShardInputType) (string, error) {
212212
scg.logger.DebugWith("Getting shard sequenceNumber from persistency", "shardID", shardID)
213213

214214
seekShardInput := v3io.SeekShardInput{}
@@ -223,7 +223,7 @@ func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int) (st
223223
return "", errors.Wrap(err, "Failed to get shard sequenceNumber from item attributes")
224224
}
225225

226-
seekShardInput.Type = scg.config.Claim.RecordBatchFetch.InitialLocation
226+
seekShardInput.Type = initialLocation
227227
} else {
228228

229229
// use sequence number

0 commit comments

Comments
 (0)