@@ -16,7 +16,6 @@ import (
1616 "errors"
1717 "fmt"
1818 "math"
19- "sort"
2019 "strconv"
2120 "strings"
2221 "sync"
@@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() {
126125 entry .Flags |= channels .Deleted
127126}
128127
128+ func (entry * LogEntry ) IsUnusedRange () bool {
129+ return entry .DocID == "" && entry .EndSequence > 0
130+ }
131+
129132type LogEntries []* LogEntry
130133
131134// A priority-queue of LogEntries, kept ordered by increasing sequence #.
@@ -634,71 +637,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
634637 // isn't possible under normal processing - unused sequence ranges will normally be moved
635638 // from pending to skipped in their entirety, as it's the processing of the pending sequence
636639 // *after* the range that triggers the range to be skipped. A partial range in skipped means
637- // an duplicate entry with a sequence within the bounds of the range was previously present
640+ // a duplicate entry with a sequence within the bounds of the range was previously present
638641 // in pending.
639- base .WarnfCtx (ctx , "unused sequence range of #%d to %d contains duplicate sequences" , fromSequence , toSequence )
642+ base .WarnfCtx (ctx , "unused sequence range of #%d to %d contains duplicate sequences, will be ignored " , fromSequence , toSequence )
640643 }
641644 return allChangedChannels
642645}
643646
644- // _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if
645- // those entries are in the range and handle it, so we don't push duplicate sequences to pending
647+ // _pushRangeToPending will push a sequence range to pendingLogs
646648func (c * changeCache ) _pushRangeToPending (ctx context.Context , startSeq , endSeq uint64 , timeReceived time.Time ) {
647- if c .pendingLogs .Len () == 0 {
648- // push whole range & return early to avoid duplicate checks
649- entry := & LogEntry {
650- TimeReceived : timeReceived ,
651- Sequence : startSeq ,
652- EndSequence : endSeq ,
653- }
654- heap .Push (& c .pendingLogs , entry )
655- return
656- }
657649
658- // check for duplicate sequences between range and pending logs
659- // loop till we have processed unused sequence range (or until we
660- // have range of sequences that aren't present in pending list)
661- for startSeq <= endSeq {
662- i , found := sort .Find (c .pendingLogs .Len (), func (i int ) int {
663- value := c .pendingLogs [i ]
664- if value .Sequence > endSeq {
665- // range is less than current pending entry
666- return - 1
667- }
668- if startSeq <= value .Sequence && endSeq >= value .Sequence {
669- // found pending entry that has duplicate entry between itself and unused range
670- return 0
671- }
672- // range is larger then current element
673- return 1
674- })
675- if found {
676- // grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1
677- pendingEntry := c .pendingLogs [i ]
678- base .DebugfCtx (ctx , base .KeyCache , "Ignoring duplicate of #%d (unusedSequence)" , pendingEntry .Sequence )
679- entry := & LogEntry {
680- TimeReceived : timeReceived ,
681- Sequence : startSeq ,
682- EndSequence : pendingEntry .Sequence - 1 ,
683- }
684- heap .Push (& c .pendingLogs , entry )
685- // update start seq on range
686- startSeq = pendingEntry .Sequence + 1
687- } else {
688- // if range not found in pending then break from loop early
689- break
690- }
650+ entry := & LogEntry {
651+ TimeReceived : timeReceived ,
652+ Sequence : startSeq ,
653+ EndSequence : endSeq ,
691654 }
655+ heap .Push (& c .pendingLogs , entry )
692656
693- // push what's left of seq range
694- if startSeq <= endSeq {
695- entry := & LogEntry {
696- TimeReceived : timeReceived ,
697- Sequence : startSeq ,
698- EndSequence : endSeq ,
699- }
700- heap .Push (& c .pendingLogs , entry )
701- }
702657}
703658
704659// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
@@ -886,16 +841,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
886841 isNext = oldestPending .Sequence == c .nextSequence
887842
888843 if isNext {
889- heap . Pop ( & c . pendingLogs )
844+ oldestPending = c . _popPendingLog ( ctx )
890845 changedChannels = changedChannels .UpdateWithSlice (c ._addToCache (ctx , oldestPending ))
846+ } else if oldestPending .Sequence < c .nextSequence {
847+ // oldest pending is lower than next sequence, should be ignored
848+ base .InfofCtx (ctx , base .KeyCache , "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached" , base .UD (oldestPending .DocID ), oldestPending .Sequence , oldestPending .EndSequence , c .nextSequence )
849+ oldestPending = c ._popPendingLog (ctx )
850+
851+ // If the oldestPending was a range that extended past nextSequence, update nextSequence
852+ if oldestPending .IsUnusedRange () && oldestPending .EndSequence >= c .nextSequence {
853+ c .nextSequence = oldestPending .EndSequence + 1
854+ }
891855 } else if len (c .pendingLogs ) > c .options .CachePendingSeqMaxNum || time .Since (c .pendingLogs [0 ].TimeReceived ) >= c .options .CachePendingSeqMaxWait {
892856 // Skip all sequences up to the oldest Pending
893857 c .PushSkipped (ctx , c .nextSequence , oldestPending .Sequence - 1 )
894- // disallow c.nextSequence decreasing
895- if c .nextSequence < oldestPending .Sequence {
896- c .nextSequence = oldestPending .Sequence
897- }
858+ c .nextSequence = oldestPending .Sequence
898859 } else {
860+ // nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences
899861 break
900862 }
901863 }
@@ -906,6 +868,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
906868 return changedChannels
907869}
908870
871+ // _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range,
872+ // performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry,
873+ // reduces the unused range to stop at the next pending entry.
874+ func (c * changeCache ) _popPendingLog (ctx context.Context ) * LogEntry {
875+ poppedEntry := heap .Pop (& c .pendingLogs ).(* LogEntry )
876+ // If it's not a range, no additional handling needed
877+ if ! poppedEntry .IsUnusedRange () {
878+ return poppedEntry
879+ }
880+ // If there are no more pending logs, no additional handling needed
881+ if len (c .pendingLogs ) == 0 {
882+ return poppedEntry
883+ }
884+
885+ nextPendingEntry := c .pendingLogs [0 ]
886+ // If popped entry range does not overlap with next pending entry, no additional handling needed
887+ // e.g. popped [15-20], nextPendingEntry is [25]
888+ if poppedEntry .EndSequence < nextPendingEntry .Sequence {
889+ return poppedEntry
890+ }
891+
892+ // If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead
893+ // e.g. popped [15-20], nextPendingEntry is [15]
894+ if poppedEntry .Sequence == nextPendingEntry .Sequence {
895+ base .InfofCtx (ctx , base .KeyCache , "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored" , poppedEntry .Sequence , poppedEntry .EndSequence , nextPendingEntry .DocID , nextPendingEntry .Sequence )
896+ return c ._popPendingLog (ctx )
897+ }
898+
899+ // Otherwise, reduce the popped unused range to end before the next pending sequence
900+ // e.g. popped [15-20], nextPendingEntry is [18]
901+ base .InfofCtx (ctx , base .KeyCache , "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated" , poppedEntry .Sequence , poppedEntry .EndSequence , nextPendingEntry .DocID , nextPendingEntry .Sequence )
902+ poppedEntry .EndSequence = nextPendingEntry .Sequence - 1
903+ return poppedEntry
904+ }
905+
909906func (c * changeCache ) GetStableSequence (docID string ) SequenceID {
910907 // Stable sequence is independent of docID in changeCache
911908 return SequenceID {Seq : c .LastSequence ()}
0 commit comments