Skip to content

Commit 4d07c51

Browse files
committed
Make stream snapshots asynchronous
The monitorStream function can block for a long time when creating and installing a snapshot of a stream's state. This can lead to increased tail latency. This commit extends the RaftNode interface with a new InstallSnapshotAsync method. This method performs snapshot writing and WAL compaction in a in a separate goroutine, making the process non-blocking. The existing InstallSnapshot method is now a synchronous wrapper around the new asynchronous implementation. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent aa1b2fc commit 4d07c51

File tree

3 files changed

+189
-35
lines changed

3 files changed

+189
-35
lines changed

server/jetstream_cluster.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2554,9 +2554,20 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25542554
// fully recovered from disk.
25552555
isRecovering := true
25562556

2557-
doSnapshot := func() {
2558-
if mset == nil || isRecovering || isRestore {
2559-
return
2557+
snapState := struct {
2558+
// Only one snapshot is in progress at any given time
2559+
inProgress bool
2560+
// Keep track store state, if snapshot is successful update lastState
2561+
curState SimpleState
2562+
// This is where we want to receive async snapshot results
2563+
ch chan InstalledSnapshot
2564+
}{
2565+
ch: make(chan InstalledSnapshot, 1),
2566+
}
2567+
2568+
wantSnapshot := func() bool {
2569+
if mset == nil || isRecovering || isRestore || snapState.inProgress {
2570+
return false
25602571
}
25612572

25622573
// Before we actually calculate the detailed state and encode it, let's check the
@@ -2569,18 +2580,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25692580
// consumers on idle streams but better to be safe than sorry!
25702581
ne, nb := n.Size()
25712582
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
2572-
return
2583+
return false
25732584
}
25742585

2575-
// Make sure all pending data is flushed before allowing snapshots.
2576-
mset.flushAllPending()
2577-
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
2578-
lastState = curState
2579-
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
2580-
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
2586+
snapState.curState = curState
2587+
return true
2588+
}
2589+
2590+
handleSnapshotErr := func(err error) {
2591+
switch err {
2592+
case nil:
2593+
lastState = snapState.curState
2594+
case errNoSnapAvailable, errNodeClosed, errCatchupsRunning:
2595+
// ignore the error
2596+
default:
2597+
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
2598+
mset.acc.Name, mset.name(), n.Group(), err)
25812599
}
2600+
25822601
}
25832602

2603+
doSnapshot := func() {
2604+
if wantSnapshot() {
2605+
// Make sure all pending data is flushed before allowing snapshots.
2606+
mset.flushAllPending()
2607+
err := n.InstallSnapshot(mset.stateSnapshot())
2608+
handleSnapshotErr(err)
2609+
}
2610+
}
2611+
2612+
doSnapshotAsync := func() {
2613+
if wantSnapshot() {
2614+
// Make sure all pending data is flushed before allowing snapshots.
2615+
mset.flushAllPending()
2616+
n.InstallSnapshotAsync(mset.stateSnapshot(), snapState.ch)
2617+
snapState.inProgress = true
2618+
}
2619+
}
2620+
2621+
asyncSnapshotDone := func(snap InstalledSnapshot) {
2622+
handleSnapshotErr(snap.Err)
2623+
snapState.inProgress = false
2624+
}
2625+
2626+
defer func() {
2627+
if snapState.inProgress {
2628+
s := <-snapState.ch
2629+
asyncSnapshotDone(s)
2630+
}
2631+
}()
2632+
25842633
// We will establish a restoreDoneCh no matter what. Will never be triggered unless
25852634
// we replace with the restore chan.
25862635
restoreDoneCh := make(<-chan error)
@@ -2652,6 +2701,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
26522701

26532702
for {
26542703
select {
2704+
case s := <-snapState.ch:
2705+
// async snapshot is done, handle the result
2706+
asyncSnapshotDone(s)
26552707
case <-s.quitCh:
26562708
// Server shutting down, but we might receive this before qch, so try to snapshot.
26572709
doSnapshot()
@@ -2761,7 +2813,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
27612813
// Check about snapshotting
27622814
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
27632815
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
2764-
doSnapshot()
2816+
doSnapshotAsync()
27652817
}
27662818

27672819
case isLeader = <-lch:
@@ -2856,7 +2908,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
28562908
stopDirectMonitoring()
28572909

28582910
case <-t.C:
2859-
doSnapshot()
2911+
doSnapshotAsync()
28602912

28612913
case <-uch:
28622914
// keep stream assignment current

server/raft.go

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type RaftNode interface {
4141
ProposeMulti(entries []*Entry) error
4242
ForwardProposal(entry []byte) error
4343
InstallSnapshot(snap []byte) error
44+
InstallSnapshotAsync(snap []byte, ch chan<- InstalledSnapshot)
4445
SendSnapshot(snap []byte) error
4546
NeedSnapshot() bool
4647
Applied(index uint64) (entries uint64, bytes uint64)
@@ -229,6 +230,7 @@ type raft struct {
229230
observer bool // The node is observing, i.e. not able to become leader
230231
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
231232
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
233+
snapshotting bool // Snapshot is in progress
232234
}
233235

234236
type proposedEntry struct {
@@ -306,6 +308,7 @@ var (
306308
errNodeClosed = errors.New("raft: node is closed")
307309
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
308310
errNoSnapAvailable = errors.New("raft: no snapshot available")
311+
errSnapInProgress = errors.New("raft: snapshot is already in progress")
309312
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
310313
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
311314
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
@@ -1242,49 +1245,110 @@ func (n *raft) SendSnapshot(data []byte) error {
12421245
return nil
12431246
}
12441247

1245-
// Used to install a snapshot for the given term and applied index. This will release
1246-
// all of the log entries up to and including index. This should not be called with
1247-
// entries that have been applied to the FSM but have not been applied to the raft state.
1248-
func (n *raft) InstallSnapshot(data []byte) error {
1249-
if n.State() == Closed {
1250-
return errNodeClosed
1251-
}
1248+
type InstalledSnapshot struct {
1249+
Term uint64
1250+
Index uint64
1251+
Path string
1252+
Err error
1253+
}
12521254

1253-
n.Lock()
1254-
defer n.Unlock()
1255+
func (n *raft) installSnapshotAsyncLocked(data []byte, ch chan<- InstalledSnapshot) {
1256+
if n.snapshotting {
1257+
ch <- InstalledSnapshot{Err: errSnapInProgress}
1258+
return
1259+
}
12551260

12561261
// If a write error has occurred already then stop here.
1257-
if werr := n.werr; werr != nil {
1258-
return werr
1262+
if n.werr != nil {
1263+
ch <- InstalledSnapshot{Err: n.werr}
1264+
return
12591265
}
12601266

1261-
// Check that a catchup isn't already taking place. If it is then we won't
1262-
// allow installing snapshots until it is done.
1267+
// Check that a catchup isn't already taking place. If it is then we
1268+
// won't allow installing snapshots until it is done.
12631269
if len(n.progress) > 0 || n.paused {
1264-
return errCatchupsRunning
1270+
ch <- InstalledSnapshot{Err: errCatchupsRunning}
1271+
return
12651272
}
12661273

12671274
if n.applied == 0 {
12681275
n.debug("Not snapshotting as there are no applied entries")
1269-
return errNoSnapAvailable
1276+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1277+
return
12701278
}
12711279

1272-
var term uint64
1273-
if ae, _ := n.loadEntry(n.applied); ae != nil {
1274-
term = ae.term
1275-
} else {
1280+
ae, _ := n.loadEntry(n.applied)
1281+
if ae == nil {
12761282
n.debug("Not snapshotting as entry %d is not available", n.applied)
1277-
return errNoSnapAvailable
1283+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1284+
return
12781285
}
12791286

1280-
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)
1287+
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), ae.term, n.applied)
12811288

1282-
return n.installSnapshot(&snapshot{
1283-
lastTerm: term,
1289+
encoded := n.encodeSnapshot(&snapshot{
1290+
lastTerm: ae.term,
12841291
lastIndex: n.applied,
12851292
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}),
12861293
data: data,
12871294
})
1295+
1296+
snapDir := filepath.Join(n.sd, snapshotsDir)
1297+
snapFile := filepath.Join(snapDir, fmt.Sprintf(snapFileT, ae.term, n.applied))
1298+
snap := InstalledSnapshot{Term: ae.term, Index: n.applied, Path: snapFile}
1299+
1300+
n.snapshotting = true
1301+
1302+
go func() {
1303+
snap.Err = writeFileWithSync(snap.Path, encoded, defaultFilePerms)
1304+
n.Lock()
1305+
if n.State() == Closed {
1306+
snap.Err = errNodeClosed
1307+
}
1308+
if snap.Err == nil {
1309+
// Delete our previous snapshot file if it exists.
1310+
if n.snapfile != _EMPTY_ && n.snapfile != snap.Path {
1311+
os.Remove(n.snapfile)
1312+
}
1313+
// Remember our latest snapshot file.
1314+
n.snapfile = snap.Path
1315+
_, snap.Err = n.wal.Compact(snap.Index + 1)
1316+
if snap.Err != nil {
1317+
n.setWriteErrLocked(snap.Err)
1318+
} else {
1319+
var state StreamState
1320+
n.wal.FastState(&state)
1321+
n.papplied = snap.Index
1322+
n.bytes = state.Bytes
1323+
}
1324+
}
1325+
n.snapshotting = false
1326+
n.Unlock()
1327+
ch <- snap
1328+
}()
1329+
}
1330+
1331+
// InstallSnapshotAsync installs a snapshot asynchronously. It writes the
1332+
// snapshot to disk and compacts the WAL in a separate goroutine. The caller
1333+
// is notified of the result on the provided channel.
1334+
func (n *raft) InstallSnapshotAsync(data []byte, ch chan<- InstalledSnapshot) {
1335+
if n.State() == Closed {
1336+
ch <- InstalledSnapshot{Err: errNodeClosed}
1337+
return
1338+
}
1339+
n.Lock()
1340+
defer n.Unlock()
1341+
n.installSnapshotAsyncLocked(data, ch)
1342+
}
1343+
1344+
// InstallSnapshot installs a snapshot for the current applied index. This is a
1345+
// synchronous call that will block until the snapshot is installed, and will
1346+
// release all of the log entries up to the applied index.
1347+
func (n *raft) InstallSnapshot(data []byte) error {
1348+
ch := make(chan InstalledSnapshot, 1)
1349+
n.InstallSnapshotAsync(data, ch)
1350+
snap := <-ch
1351+
return snap.Err
12881352
}
12891353

12901354
// Install the snapshot.

server/raft_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4110,3 +4110,41 @@ func TestNRGChainOfBlocksStopAndCatchUp(t *testing.T) {
41104110
}
41114111
}
41124112
}
4113+
4114+
func TestNRGAsyncSnapshotInProgress(t *testing.T) {
4115+
n, cleanup := initSingleMemRaftNode(t)
4116+
defer cleanup()
4117+
4118+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
4119+
entries := []*Entry{newEntry(EntryNormal, esm)}
4120+
nats0 := "S1Nunr6R" // "nats-0"
4121+
4122+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
4123+
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
4124+
4125+
n.processAppendEntry(aeMsg1, n.aesub)
4126+
n.processAppendEntry(aeHeartbeat, n.aesub)
4127+
n.Applied(1)
4128+
4129+
ch1 := make(chan InstalledSnapshot, 1)
4130+
ch2 := make(chan InstalledSnapshot, 1)
4131+
4132+
n.Lock()
4133+
n.installSnapshotAsyncLocked(nil, ch1)
4134+
n.installSnapshotAsyncLocked(nil, ch2)
4135+
4136+
select {
4137+
case s := <-ch2:
4138+
require_Error(t, s.Err, errSnapInProgress)
4139+
case <-time.After(5 * time.Second):
4140+
t.Fatalf("Unexpected time out while waiting for snapshot result")
4141+
}
4142+
n.Unlock()
4143+
4144+
select {
4145+
case s := <-ch1:
4146+
require_NoError(t, s.Err)
4147+
case <-time.After(5 * time.Second):
4148+
t.Fatalf("Unexpected time out while waiting for snapshot result")
4149+
}
4150+
}

0 commit comments

Comments
 (0)