Skip to content

Commit 16a5caa

Browse files
committed
Make stream snapshots asynchronous
The monitorStream function can block for a long time when creating and installing a snapshot of a stream's state. This can lead to increased tail latency. This commit extends the RaftNode interface with a new InstallSnapshotAsync method. This method performs snapshot writing and WAL compaction in a in a separate goroutine, making the process non-blocking. The existing InstallSnapshot method is now a synchronous wrapper around the new asynchronous implementation. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent e2661b5 commit 16a5caa

File tree

3 files changed

+189
-34
lines changed

3 files changed

+189
-34
lines changed

server/jetstream_cluster.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2519,9 +2519,21 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25192519
// fully recovered from disk.
25202520
isRecovering := true
25212521

2522-
doSnapshot := func() {
2522+
snapState := struct {
2523+
inProgress bool
2524+
curState SimpleState
2525+
ch chan InstalledSnapshot
2526+
}{
2527+
ch: make(chan InstalledSnapshot, 1),
2528+
}
2529+
2530+
wantSnapshot := func() bool {
25232531
if mset == nil || isRecovering || isRestore {
2524-
return
2532+
return false
2533+
}
2534+
2535+
if snapState.inProgress {
2536+
return false
25252537
}
25262538

25272539
// Before we actually calculate the detailed state and encode it, let's check the
@@ -2534,18 +2546,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25342546
// consumers on idle streams but better to be safe than sorry!
25352547
ne, nb := n.Size()
25362548
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
2537-
return
2549+
return false
2550+
}
2551+
2552+
snapState.curState = curState
2553+
return true
2554+
}
2555+
2556+
handleSnapshotErr := func(err error) {
2557+
switch err {
2558+
case nil:
2559+
lastState = snapState.curState
2560+
case errNoSnapAvailable, errNodeClosed, errCatchupsRunning:
2561+
// ignore the error
2562+
default:
2563+
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
2564+
mset.acc.Name, mset.name(), n.Group(), err)
2565+
}
2566+
2567+
}
2568+
2569+
doSnapshot := func() {
2570+
if wantSnapshot() {
2571+
// Make sure all pending data is flushed before allowing snapshots.
2572+
mset.flushAllPending()
2573+
err := n.InstallSnapshot(mset.stateSnapshot())
2574+
handleSnapshotErr(err)
25382575
}
2576+
}
25392577

2540-
// Make sure all pending data is flushed before allowing snapshots.
2541-
mset.flushAllPending()
2542-
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
2543-
lastState = curState
2544-
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
2545-
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
2578+
doSnapshotAsync := func() {
2579+
if wantSnapshot() {
2580+
// Make sure all pending data is flushed before allowing snapshots.
2581+
mset.flushAllPending()
2582+
n.InstallSnapshotAsync(mset.stateSnapshot(), snapState.ch)
2583+
snapState.inProgress = true
25462584
}
25472585
}
25482586

2587+
snapshotDone := func(snap InstalledSnapshot) {
2588+
handleSnapshotErr(snap.Err)
2589+
snapState.inProgress = false
2590+
}
2591+
2592+
defer func() {
2593+
if snapState.inProgress {
2594+
s := <-snapState.ch
2595+
snapshotDone(s)
2596+
}
2597+
}()
2598+
25492599
// We will establish a restoreDoneCh no matter what. Will never be triggered unless
25502600
// we replace with the restore chan.
25512601
restoreDoneCh := make(<-chan error)
@@ -2617,6 +2667,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
26172667

26182668
for {
26192669
select {
2670+
case s := <-snapState.ch:
2671+
snapshotDone(s)
26202672
case <-s.quitCh:
26212673
// Server shutting down, but we might receive this before qch, so try to snapshot.
26222674
doSnapshot()
@@ -2726,7 +2778,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
27262778
// Check about snapshotting
27272779
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
27282780
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
2729-
doSnapshot()
2781+
doSnapshotAsync()
27302782
}
27312783

27322784
case isLeader = <-lch:
@@ -2822,7 +2874,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
28222874
stopDirectMonitoring()
28232875

28242876
case <-t.C:
2825-
doSnapshot()
2877+
doSnapshotAsync()
28262878

28272879
case <-uch:
28282880
// keep stream assignment current

server/raft.go

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type RaftNode interface {
4242
ProposeMulti(entries []*Entry) error
4343
ForwardProposal(entry []byte) error
4444
InstallSnapshot(snap []byte) error
45+
InstallSnapshotAsync(snap []byte, ch chan<- InstalledSnapshot)
4546
SendSnapshot(snap []byte) error
4647
NeedSnapshot() bool
4748
Applied(index uint64) (entries uint64, bytes uint64)
@@ -230,6 +231,7 @@ type raft struct {
230231
observer bool // The node is observing, i.e. not able to become leader
231232
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
232233
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
234+
snapshotting bool // Snapshot is in progress
233235
}
234236

235237
type proposedEntry struct {
@@ -307,6 +309,7 @@ var (
307309
errNodeClosed = errors.New("raft: node is closed")
308310
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
309311
errNoSnapAvailable = errors.New("raft: no snapshot available")
312+
errSnapInProgress = errors.New("raft: snapshot is already in progress")
310313
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
311314
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
312315
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
@@ -1242,49 +1245,110 @@ func (n *raft) SendSnapshot(data []byte) error {
12421245
return nil
12431246
}
12441247

1245-
// Used to install a snapshot for the given term and applied index. This will release
1246-
// all of the log entries up to and including index. This should not be called with
1247-
// entries that have been applied to the FSM but have not been applied to the raft state.
1248-
func (n *raft) InstallSnapshot(data []byte) error {
1249-
if n.State() == Closed {
1250-
return errNodeClosed
1251-
}
1248+
type InstalledSnapshot struct {
1249+
Term uint64
1250+
Index uint64
1251+
Path string
1252+
Err error
1253+
}
12521254

1253-
n.Lock()
1254-
defer n.Unlock()
1255+
func (n *raft) installSnapshotAsyncLocked(data []byte, ch chan<- InstalledSnapshot) {
1256+
if n.snapshotting {
1257+
ch <- InstalledSnapshot{Err: errSnapInProgress}
1258+
return
1259+
}
12551260

12561261
// If a write error has occurred already then stop here.
1257-
if werr := n.werr; werr != nil {
1258-
return werr
1262+
if n.werr != nil {
1263+
ch <- InstalledSnapshot{Err: n.werr}
1264+
return
12591265
}
12601266

1261-
// Check that a catchup isn't already taking place. If it is then we won't
1262-
// allow installing snapshots until it is done.
1267+
// Check that a catchup isn't already taking place. If it is then we
1268+
// won't allow installing snapshots until it is done.
12631269
if len(n.progress) > 0 || n.paused {
1264-
return errCatchupsRunning
1270+
ch <- InstalledSnapshot{Err: errCatchupsRunning}
1271+
return
12651272
}
12661273

12671274
if n.applied == 0 {
12681275
n.debug("Not snapshotting as there are no applied entries")
1269-
return errNoSnapAvailable
1276+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1277+
return
12701278
}
12711279

1272-
var term uint64
1273-
if ae, _ := n.loadEntry(n.applied); ae != nil {
1274-
term = ae.term
1275-
} else {
1280+
ae, _ := n.loadEntry(n.applied)
1281+
if ae == nil {
12761282
n.debug("Not snapshotting as entry %d is not available", n.applied)
1277-
return errNoSnapAvailable
1283+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1284+
return
12781285
}
12791286

1280-
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)
1287+
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), ae.term, n.applied)
12811288

1282-
return n.installSnapshot(&snapshot{
1283-
lastTerm: term,
1289+
encoded := n.encodeSnapshot(&snapshot{
1290+
lastTerm: ae.term,
12841291
lastIndex: n.applied,
12851292
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}),
12861293
data: data,
12871294
})
1295+
1296+
snapDir := filepath.Join(n.sd, snapshotsDir)
1297+
snapFile := filepath.Join(snapDir, fmt.Sprintf(snapFileT, ae.term, n.applied))
1298+
snap := InstalledSnapshot{Term: ae.term, Index: n.applied, Path: snapFile}
1299+
1300+
n.snapshotting = true
1301+
1302+
go func() {
1303+
snap.Err = writeFileWithSync(snap.Path, encoded, defaultFilePerms)
1304+
n.Lock()
1305+
if n.State() == Closed {
1306+
snap.Err = errNodeClosed
1307+
}
1308+
if snap.Err == nil {
1309+
// Delete our previous snapshot file if it exists.
1310+
if n.snapfile != _EMPTY_ && n.snapfile != snap.Path {
1311+
os.Remove(n.snapfile)
1312+
}
1313+
// Remember our latest snapshot file.
1314+
n.snapfile = snap.Path
1315+
_, snap.Err = n.wal.Compact(snap.Index + 1)
1316+
if snap.Err != nil {
1317+
n.setWriteErrLocked(snap.Err)
1318+
} else {
1319+
var state StreamState
1320+
n.wal.FastState(&state)
1321+
n.papplied = snap.Index
1322+
n.bytes = state.Bytes
1323+
}
1324+
}
1325+
n.snapshotting = false
1326+
n.Unlock()
1327+
ch <- snap
1328+
}()
1329+
}
1330+
1331+
// InstallSnapshotAsync installs a snapshot asynchronously. It writes the
1332+
// snapshot to disk and compacts the WAL in a separate goroutine. The caller
1333+
// is notified of the result on the provided channel.
1334+
func (n *raft) InstallSnapshotAsync(data []byte, ch chan<- InstalledSnapshot) {
1335+
if n.State() == Closed {
1336+
ch <- InstalledSnapshot{Err: errNodeClosed}
1337+
return
1338+
}
1339+
n.Lock()
1340+
defer n.Unlock()
1341+
n.installSnapshotAsyncLocked(data, ch)
1342+
}
1343+
1344+
// InstallSnapshot installs a snapshot for the current applied index. This is a
1345+
// synchronous call that will block until the snapshot is installed, and will
1346+
// release all of the log entries up to the applied index.
1347+
func (n *raft) InstallSnapshot(data []byte) error {
1348+
ch := make(chan InstalledSnapshot, 1)
1349+
n.InstallSnapshotAsync(data, ch)
1350+
snap := <-ch
1351+
return snap.Err
12881352
}
12891353

12901354
// Install the snapshot.

server/raft_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4010,3 +4010,42 @@ func TestNRGChainOfBlocksStopAndCatchUp(t *testing.T) {
40104010
}
40114011
}
40124012
}
4013+
4014+
func TestNRGAsyncSnapshotInProgress(t *testing.T) {
4015+
n, cleanup := initSingleMemRaftNode(t)
4016+
defer cleanup()
4017+
4018+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
4019+
entries := []*Entry{newEntry(EntryNormal, esm)}
4020+
nats0 := "S1Nunr6R" // "nats-0"
4021+
4022+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
4023+
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
4024+
4025+
n.processAppendEntry(aeMsg1, n.aesub)
4026+
n.processAppendEntry(aeHeartbeat, n.aesub)
4027+
n.Applied(1)
4028+
4029+
// Channels to receive the results
4030+
ch1 := make(chan InstalledSnapshot, 1)
4031+
ch2 := make(chan InstalledSnapshot, 1)
4032+
4033+
n.Lock()
4034+
n.installSnapshotAsyncLocked(nil, ch1)
4035+
n.installSnapshotAsyncLocked(nil, ch2)
4036+
4037+
select {
4038+
case s := <-ch2:
4039+
require_Error(t, s.Err, errSnapInProgress)
4040+
case <-time.After(5 * time.Second):
4041+
t.Fatalf("Unexpected time out while waiting for snapshot result")
4042+
}
4043+
n.Unlock()
4044+
4045+
select {
4046+
case s := <-ch1:
4047+
require_NoError(t, s.Err)
4048+
case <-time.After(5 * time.Second):
4049+
t.Fatalf("Unexpected time out while waiting for snapshot result")
4050+
}
4051+
}

0 commit comments

Comments
 (0)