Skip to content

Commit 5ba7cd0

Browse files
authored
CBG-4221 [3.1.11 backport] Don't increment high sequence cached for unused sequences on pendingLogs (#7106)
CBG-4221 [3.1.11 backport] Don't increment high sequence cached for unused sequences on pendingLogs If unused sequences ranges are pushed to pendingLogs, they shouldn't increment the channel cache's highSequenceCached until they are processed.
1 parent b6127c1 commit 5ba7cd0

3 files changed

Lines changed: 10 additions & 10 deletions

File tree

db/change_cache.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
575575
} else {
576576
changedChannels.Add(unusedSeq)
577577
}
578-
c.channelCache.AddUnusedSequence(change)
579578
if c.notifyChange != nil && len(changedChannels) > 0 {
580579
c.notifyChange(ctx, changedChannels)
581580
}
@@ -598,7 +597,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
598597
}
599598
changedChannels := c.processEntry(ctx, change)
600599
allChangedChannels = allChangedChannels.Update(changedChannels)
601-
c.channelCache.AddUnusedSequence(change)
602600
if c.notifyChange != nil {
603601
c.notifyChange(ctx, allChangedChannels)
604602
}
@@ -608,9 +606,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
608606
// push unused range to either pending or skipped lists based on current state of the change cache
609607
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)
610608

611-
// update high seq cached
612-
c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence})
613-
614609
if c.notifyChange != nil {
615610
c.notifyChange(ctx, allChangedChannels)
616611
}
@@ -803,8 +798,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann
803798
}
804799
delete(c.receivedSeqs, change.Sequence)
805800

806-
// If unused sequence or principal, we're done after updating sequence
801+
// If unused sequence, notify the cache and return
807802
if change.DocID == "" {
803+
c.channelCache.AddUnusedSequence(change)
808804
return nil
809805
}
810806

db/change_cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2538,7 +2538,7 @@ func TestReleasedSequenceRangeHandlingEverythingPending(t *testing.T) {
25382538
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
25392539
assert.Equal(c, uint64(2), testChangeCache.nextSequence)
25402540
dbContext.UpdateCalculatedStats(ctx)
2541-
assert.Equal(c, int64(25), dbContext.DbStats.CacheStats.HighSeqCached.Value())
2541+
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.HighSeqCached.Value())
25422542
}, time.Second*10, time.Millisecond*100)
25432543
}
25442544

@@ -2644,7 +2644,7 @@ func TestReleasedSequenceRangeHandlingEverythingPendingLowPendingCapacity(t *tes
26442644
defer testChangeCache.Stop(ctx)
26452645
require.NoError(t, err)
26462646

2647-
// process unused sequence range
2647+
// process unused sequence range, will be sent to pending. Triggers seq 1 being sent to skipped
26482648
testChangeCache.releaseUnusedSequenceRange(ctx, 2, 25, time.Now())
26492649

26502650
require.EventuallyWithT(t, func(c *assert.CollectT) {
@@ -2755,7 +2755,7 @@ func TestReleasedSequenceRangeHandlingSingleSequence(t *testing.T) {
27552755
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
27562756
assert.Equal(c, uint64(1), testChangeCache.nextSequence)
27572757
dbContext.UpdateCalculatedStats(ctx)
2758-
assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.HighSeqCached.Value())
2758+
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.HighSeqCached.Value())
27592759
}, time.Second*10, time.Millisecond*100)
27602760

27612761
// process change that should overload pending and push sequence 1 to skipped

db/channel_cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) {
193193

194194
// Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence
195195
func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {
196-
c.updateHighCacheSequence(change.Sequence)
196+
if change.EndSequence > 0 {
197+
c.updateHighCacheSequence(change.EndSequence)
198+
} else {
199+
c.updateHighCacheSequence(change.Sequence)
200+
}
197201
}
198202

199203
// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence

0 commit comments

Comments
 (0)