Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 92 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2983,11 +2983,45 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// fully recovered from disk.
isRecovering := true

var failedSnapshots int
var (
snapMu sync.Mutex
snapshotting bool
fallbackSnapshot bool
failedSnapshots int
)

doSnapshot := func(force bool) {
// Suppress during recovery.
if mset == nil || isRecovering || isRestore {
return
}
snapMu.Lock()
defer snapMu.Unlock()
// If snapshots have failed, and we're not forced to, we'll wait for the timer since it'll now be forced.
if mset == nil || isRecovering || isRestore || (!force && failedSnapshots > 0) {
if !force && failedSnapshots > 0 {
return
}
// Suppress if an async snapshot is already in progress.
if snapshotting {
return
}

// If we had a significant number of failed snapshots, start relaxing Raft-layer checks
// to force it through. We might have been catching up a peer for a long period, and this
// protects our log size from growing indefinitely.
forceSnapshot := failedSnapshots > 4
c, err := n.CreateSnapshotCheckpoint(forceSnapshot)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-check catchup state before installing checkpoint

CreateSnapshotCheckpoint is invoked before flushAllPending() and stateSnapshot(), but the Raft catchup guard (len(n.progress) == 0) is only enforced at checkpoint creation time, not in checkpoint.InstallSnapshot. If a follower catchup starts in that window, this path will still install/compact a non-forced snapshot, which bypasses the previous safety behavior and can interrupt active catchups with avoidable retry/snapshot churn.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be fixed easily, but the fix would likely belong to a separate PR (the fix would be in raft code, and it would affect meta snapshots as well).
Having said that, I think that this is not such a big issue. In unlucky situations InstallSnapshot may interrupt a catchup that has started after the call the CreateSnapshotCheckpoint. In that case, there wouldn't be a lot of work wasted in the first place.

if err != nil {
if err != errNoSnapAvailable && err != errNodeClosed {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
mset.acc.Name, mset.name(), n.Group(), err)
// If this is the first failure, reduce the interval of the snapshot timer.
// This ensures we're not waiting too long for snapshotting to eventually become forced.
if failedSnapshots == 0 {
t.Reset(compactMinInterval)
}
failedSnapshots++
}
return
}

Expand All @@ -3001,30 +3035,61 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
"group": n.Group(),
"err": err,
})
c.Abort()
mset.setWriteErr(err)
n.Stop()
return
}

// If we had a significant number of failed snapshots, start relaxing Raft-layer checks
// to force it through. We might have been catching up a peer for a long period, and this
// protects our log size from growing indefinitely.
forceSnapshot := failedSnapshots > 4
if err := n.InstallSnapshot(mset.stateSnapshot(), forceSnapshot); err == nil {
// If there was a failed snapshot before, we reduced the timer's interval.
// Reset it back to the original interval now.
if failedSnapshots > 0 {
t.Reset(compactInterval + rci)
snap := mset.stateSnapshot()

handleInstallResult := func(err error) {
snapshotting = false
if err == nil {
// If there was a failed snapshot before, we reduced the timer's interval.
// Reset it back to the original interval now.
if failedSnapshots > 0 {
t.Reset(compactInterval + rci)
}
failedSnapshots = 0
fallbackSnapshot = false
} else {
c.Abort()

if err == errNoSnapAvailable || err == errNodeClosed || err == errCatchupsRunning || err == errSnapAborted {
return
}

s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v, will fall back to blocking snapshot",
mset.acc.Name, mset.name(), n.Group(), err)
fallbackSnapshot = true
// If this is the first failure, reduce the interval of the snapshot timer.
// This ensures we're not waiting too long for snapshotting to eventually become forced.
if failedSnapshots == 0 {
t.Reset(compactMinInterval)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard ticker reset after monitor shutdown

handleInstallResult runs from the async snapshot goroutine, but monitorStream can exit on mqch/qch and run defer t.Stop() before that goroutine completes. If that late callback reaches this t.Reset(...) path (for example when the in-flight snapshot fails because the node/store is being stopped), it re-arms a ticker that no longer has a consumer, leaving an orphaned periodic timer after the monitor is gone. This can accumulate background wakeups during stream moves/scale-downs when snapshots are in flight.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a real concern. Sure, if t is Reset after Stop, it may keep ticking unnecessarily. It also means that is it no longer referenced, so it will eventually stop when collected. Not sure it is worth introducing complexity for a ticker that will tick every 15 seconds, in the worst case.

}
failedSnapshots++
}
failedSnapshots = 0
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
// If this is the first failure, reduce the interval of the snapshot timer.
// This ensures we're not waiting too long for snapshotting to eventually become forced.
if failedSnapshots == 0 {
t.Reset(compactMinInterval)
}

snapshotting = true
if fallbackSnapshot {
_, err = c.InstallSnapshot(snap)
handleInstallResult(err)
} else {
started := s.startGoRoutine(func() {
defer s.grWG.Done()

_, err := c.InstallSnapshot(snap)

snapMu.Lock()
defer snapMu.Unlock()
handleInstallResult(err)
})
if !started {
snapshotting = false
c.Abort()
}
failedSnapshots++
}
}

Expand Down Expand Up @@ -3101,13 +3166,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
select {
case <-s.quitCh:
// Server shutting down, but we might receive this before qch, so try to snapshot.
snapMu.Lock()
fallbackSnapshot = true
snapMu.Unlock()
doSnapshot(false)
return
case <-mqch:
// Clean signal from shutdown routine so do best effort attempt to snapshot.
// Don't snapshot if not shutting down, monitor goroutine could be going away
// on a scale down or a remove for example.
if s.isShuttingDown() {
snapMu.Lock()
fallbackSnapshot = true
snapMu.Unlock()
doSnapshot(false)
}
return
Expand Down Expand Up @@ -3307,7 +3378,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

case <-t.C:
// Start forcing snapshots if they failed previously.
snapMu.Lock()
forceIfFailed := failedSnapshots > 0
snapMu.Unlock()
doSnapshot(forceIfFailed)

case <-uch:
Expand Down
Loading