@@ -2977,11 +2977,45 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
29772977 // fully recovered from disk.
29782978 isRecovering := true
29792979
2980- var failedSnapshots int
2980+ var (
2981+ snapMu sync.Mutex
2982+ snapshotting bool
2983+ fallbackSnapshot bool
2984+ failedSnapshots int
2985+ )
2986+
29812987 doSnapshot := func (force bool ) {
29822988 // Suppress during recovery.
2989+ if mset == nil || isRecovering || isRestore {
2990+ return
2991+ }
2992+ snapMu .Lock ()
2993+ defer snapMu .Unlock ()
29832994 // If snapshots have failed, and we're not forced to, we'll wait for the timer since it'll now be forced.
2984- if mset == nil || isRecovering || isRestore || (! force && failedSnapshots > 0 ) {
2995+ if ! force && failedSnapshots > 0 {
2996+ return
2997+ }
2998+ // Suppress if an async snapshot is already in progress.
2999+ if snapshotting {
3000+ return
3001+ }
3002+
3003+ // If we had a significant number of failed snapshots, start relaxing Raft-layer checks
3004+ // to force it through. We might have been catching up a peer for a long period, and this
3005+ // protects our log size from growing indefinitely.
3006+ forceSnapshot := failedSnapshots > 4
3007+ c , err := n .CreateSnapshotCheckpoint (forceSnapshot )
3008+ if err != nil {
3009+ if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
3010+ s .RateLimitWarnf ("Failed to install snapshot for '%s > %s' [%s]: %v" ,
3011+ mset .acc .Name , mset .name (), n .Group (), err )
3012+ // If this is the first failure, reduce the interval of the snapshot timer.
3013+ // This ensures we're not waiting too long for snapshotting to eventually become forced.
3014+ if failedSnapshots == 0 {
3015+ t .Reset (compactMinInterval )
3016+ }
3017+ failedSnapshots ++
3018+ }
29853019 return
29863020 }
29873021
@@ -2995,30 +3029,61 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
29953029 "group" : n .Group (),
29963030 "err" : err ,
29973031 })
3032+ c .Abort ()
29983033 mset .setWriteErr (err )
29993034 n .Stop ()
30003035 return
30013036 }
30023037
3003- // If we had a significant number of failed snapshots, start relaxing Raft-layer checks
3004- // to force it through. We might have been catching up a peer for a long period, and this
3005- // protects our log size from growing indefinitely.
3006- forceSnapshot := failedSnapshots > 4
3007- if err := n .InstallSnapshot (mset .stateSnapshot (), forceSnapshot ); err == nil {
3008- // If there was a failed snapshot before, we reduced the timer's interval.
3009- // Reset it back to the original interval now.
3010- if failedSnapshots > 0 {
3011- t .Reset (compactInterval + rci )
3038+ snap := mset .stateSnapshot ()
3039+
3040+ handleInstallResult := func (err error ) {
3041+ snapshotting = false
3042+ if err == nil {
3043+ // If there was a failed snapshot before, we reduced the timer's interval.
3044+ // Reset it back to the original interval now.
3045+ if failedSnapshots > 0 {
3046+ t .Reset (compactInterval + rci )
3047+ }
3048+ failedSnapshots = 0
3049+ fallbackSnapshot = false
3050+ } else {
3051+ c .Abort ()
3052+
3053+ if err == errNoSnapAvailable || err == errNodeClosed || err == errCatchupsRunning || err == errSnapAborted {
3054+ return
3055+ }
3056+
3057+ s .RateLimitWarnf ("Failed to install snapshot for '%s > %s' [%s]: %v, will fall back to blocking snapshot" ,
3058+ mset .acc .Name , mset .name (), n .Group (), err )
3059+ fallbackSnapshot = true
3060+ // If this is the first failure, reduce the interval of the snapshot timer.
3061+ // This ensures we're not waiting too long for snapshotting to eventually become forced.
3062+ if failedSnapshots == 0 {
3063+ t .Reset (compactMinInterval )
3064+ }
3065+ failedSnapshots ++
30123066 }
3013- failedSnapshots = 0
3014- } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
3015- s .RateLimitWarnf ("Failed to install snapshot for '%s > %s' [%s]: %v" , mset .acc .Name , mset .name (), n .Group (), err )
3016- // If this is the first failure, reduce the interval of the snapshot timer.
3017- // This ensures we're not waiting too long for snapshotting to eventually become forced.
3018- if failedSnapshots == 0 {
3019- t .Reset (compactMinInterval )
3067+ }
3068+
3069+ snapshotting = true
3070+ if fallbackSnapshot {
3071+ _ , err = c .InstallSnapshot (snap )
3072+ handleInstallResult (err )
3073+ } else {
3074+ started := s .startGoRoutine (func () {
3075+ defer s .grWG .Done ()
3076+
3077+ _ , err := c .InstallSnapshot (snap )
3078+
3079+ snapMu .Lock ()
3080+ defer snapMu .Unlock ()
3081+ handleInstallResult (err )
3082+ })
3083+ if ! started {
3084+ snapshotting = false
3085+ c .Abort ()
30203086 }
3021- failedSnapshots ++
30223087 }
30233088 }
30243089
@@ -3095,13 +3160,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
30953160 select {
30963161 case <- s .quitCh :
30973162 // Server shutting down, but we might receive this before qch, so try to snapshot.
3163+ snapMu .Lock ()
3164+ fallbackSnapshot = true
3165+ snapMu .Unlock ()
30983166 doSnapshot (false )
30993167 return
31003168 case <- mqch :
31013169 // Clean signal from shutdown routine so do best effort attempt to snapshot.
31023170 // Don't snapshot if not shutting down, monitor goroutine could be going away
31033171 // on a scale down or a remove for example.
31043172 if s .isShuttingDown () {
3173+ snapMu .Lock ()
3174+ fallbackSnapshot = true
3175+ snapMu .Unlock ()
31053176 doSnapshot (false )
31063177 }
31073178 return
@@ -3301,7 +3372,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
33013372
33023373 case <- t .C :
33033374 // Start forcing snapshots if they failed previously.
3375+ snapMu .Lock ()
33043376 forceIfFailed := failedSnapshots > 0
3377+ snapMu .Unlock ()
33053378 doSnapshot (forceIfFailed )
33063379
33073380 case <- uch :
0 commit comments