@@ -16,7 +16,6 @@ import (
1616 "errors"
1717 "fmt"
1818 "math"
19- "sort"
2019 "strconv"
2120 "strings"
2221 "sync"
@@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() {
126125 entry .Flags |= channels .Deleted
127126}
128127
128+ func (entry * LogEntry ) IsUnusedRange () bool {
129+ return entry .DocID == "" && entry .EndSequence > 0
130+ }
131+
129132type LogEntries []* LogEntry
130133
131134// A priority-queue of LogEntries, kept ordered by increasing sequence #.
@@ -633,71 +636,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
633636 // isn't possible under normal processing - unused sequence ranges will normally be moved
634637 // from pending to skipped in their entirety, as it's the processing of the pending sequence
635638 // *after* the range that triggers the range to be skipped. A partial range in skipped means
636- // an duplicate entry with a sequence within the bounds of the range was previously present
639+ // a duplicate entry with a sequence within the bounds of the range was previously present
637640 // in pending.
638- base .WarnfCtx (ctx , "unused sequence range of #%d to %d contains duplicate sequences" , fromSequence , toSequence )
641+ base .WarnfCtx (ctx , "unused sequence range of #%d to %d contains duplicate sequences, will be ignored " , fromSequence , toSequence )
639642 }
640643 return allChangedChannels
641644}
642645
643- // _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if
644- // those entries are in the range and handle it, so we don't push duplicate sequences to pending
646+ // _pushRangeToPending will push a sequence range to pendingLogs
645647func (c * changeCache ) _pushRangeToPending (ctx context.Context , startSeq , endSeq uint64 , timeReceived time.Time ) {
646- if c .pendingLogs .Len () == 0 {
647- // push whole range & return early to avoid duplicate checks
648- entry := & LogEntry {
649- TimeReceived : timeReceived ,
650- Sequence : startSeq ,
651- EndSequence : endSeq ,
652- }
653- heap .Push (& c .pendingLogs , entry )
654- return
655- }
656648
657- // check for duplicate sequences between range and pending logs
658- // loop till we have processed unused sequence range (or until we
659- // have range of sequences that aren't present in pending list)
660- for startSeq <= endSeq {
661- i , found := sort .Find (c .pendingLogs .Len (), func (i int ) int {
662- value := c .pendingLogs [i ]
663- if value .Sequence > endSeq {
664- // range is less than current pending entry
665- return - 1
666- }
667- if startSeq <= value .Sequence && endSeq >= value .Sequence {
668- // found pending entry that has duplicate entry between itself and unused range
669- return 0
670- }
671- // range is larger then current element
672- return 1
673- })
674- if found {
675- // grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1
676- pendingEntry := c .pendingLogs [i ]
677- base .DebugfCtx (ctx , base .KeyCache , "Ignoring duplicate of #%d (unusedSequence)" , pendingEntry .Sequence )
678- entry := & LogEntry {
679- TimeReceived : timeReceived ,
680- Sequence : startSeq ,
681- EndSequence : pendingEntry .Sequence - 1 ,
682- }
683- heap .Push (& c .pendingLogs , entry )
684- // update start seq on range
685- startSeq = pendingEntry .Sequence + 1
686- } else {
687- // if range not found in pending then break from loop early
688- break
689- }
649+ entry := & LogEntry {
650+ TimeReceived : timeReceived ,
651+ Sequence : startSeq ,
652+ EndSequence : endSeq ,
690653 }
654+ heap .Push (& c .pendingLogs , entry )
691655
692- // push what's left of seq range
693- if startSeq <= endSeq {
694- entry := & LogEntry {
695- TimeReceived : timeReceived ,
696- Sequence : startSeq ,
697- EndSequence : endSeq ,
698- }
699- heap .Push (& c .pendingLogs , entry )
700- }
701656}
702657
703658// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
@@ -885,16 +840,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
885840 isNext = oldestPending .Sequence == c .nextSequence
886841
887842 if isNext {
888- heap . Pop ( & c . pendingLogs )
843+ oldestPending = c . _popPendingLog ( ctx )
889844 changedChannels = changedChannels .UpdateWithSlice (c ._addToCache (ctx , oldestPending ))
845+ } else if oldestPending .Sequence < c .nextSequence {
846+ // oldest pending is lower than next sequence, should be ignored
847+ base .InfofCtx (ctx , base .KeyCache , "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached" , base .UD (oldestPending .DocID ), oldestPending .Sequence , oldestPending .EndSequence , c .nextSequence )
848+ oldestPending = c ._popPendingLog (ctx )
849+
850+ // If the oldestPending was a range that extended past nextSequence, update nextSequence
851+ if oldestPending .IsUnusedRange () && oldestPending .EndSequence >= c .nextSequence {
852+ c .nextSequence = oldestPending .EndSequence + 1
853+ }
890854 } else if len (c .pendingLogs ) > c .options .CachePendingSeqMaxNum || time .Since (c .pendingLogs [0 ].TimeReceived ) >= c .options .CachePendingSeqMaxWait {
891855 // Skip all sequences up to the oldest Pending
892856 c .PushSkipped (ctx , c .nextSequence , oldestPending .Sequence - 1 )
893- // disallow c.nextSequence decreasing
894- if c .nextSequence < oldestPending .Sequence {
895- c .nextSequence = oldestPending .Sequence
896- }
857+ c .nextSequence = oldestPending .Sequence
897858 } else {
859+ // nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences
898860 break
899861 }
900862 }
@@ -905,6 +867,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
905867 return changedChannels
906868}
907869
870+ // _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range,
871+ // performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry,
872+ // reduces the unused range to stop at the next pending entry.
873+ func (c * changeCache ) _popPendingLog (ctx context.Context ) * LogEntry {
874+ poppedEntry := heap .Pop (& c .pendingLogs ).(* LogEntry )
875+ // If it's not a range, no additional handling needed
876+ if ! poppedEntry .IsUnusedRange () {
877+ return poppedEntry
878+ }
879+ // If there are no more pending logs, no additional handling needed
880+ if len (c .pendingLogs ) == 0 {
881+ return poppedEntry
882+ }
883+
884+ nextPendingEntry := c .pendingLogs [0 ]
885+ // If popped entry range does not overlap with next pending entry, no additional handling needed
886+ // e.g. popped [15-20], nextPendingEntry is [25]
887+ if poppedEntry .EndSequence < nextPendingEntry .Sequence {
888+ return poppedEntry
889+ }
890+
891+ // If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead
892+ // e.g. popped [15-20], nextPendingEntry is [15]
893+ if poppedEntry .Sequence == nextPendingEntry .Sequence {
894+ base .InfofCtx (ctx , base .KeyCache , "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored" , poppedEntry .Sequence , poppedEntry .EndSequence , nextPendingEntry .DocID , nextPendingEntry .Sequence )
895+ return c ._popPendingLog (ctx )
896+ }
897+
898+ // Otherwise, reduce the popped unused range to end before the next pending sequence
899+ // e.g. popped [15-20], nextPendingEntry is [18]
900+ base .InfofCtx (ctx , base .KeyCache , "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated" , poppedEntry .Sequence , poppedEntry .EndSequence , nextPendingEntry .DocID , nextPendingEntry .Sequence )
901+ poppedEntry .EndSequence = nextPendingEntry .Sequence - 1
902+ return poppedEntry
903+ }
904+
908905func (c * changeCache ) GetStableSequence (docID string ) SequenceID {
909906 // Stable sequence is independent of docID in changeCache
910907 return SequenceID {Seq : c .LastSequence ()}
0 commit comments