@@ -45,6 +45,10 @@ const (
4545)
4646
4747const (
48+ // MaxBackupLTXFileN is the number of LTX files that can be compacted
49+ // together at a time when sending data to the backup service.
50+ MaxBackupLTXFileN = 256
51+
4852 MetricsMonitorInterval = 1 * time .Second
4953)
5054
@@ -1120,9 +1124,9 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11201124
11211125 // Check local replication position.
11221126 // If we haven't written anything yet then try to send data.
1123- pos := db .Pos ()
1124- if pos .IsZero () {
1125- return pos , nil
1127+ localPos := db .Pos ()
1128+ if localPos .IsZero () {
1129+ return localPos , nil
11261130 }
11271131
11281132 // If the database doesn't exist remotely, perform a full snapshot.
@@ -1132,11 +1136,11 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11321136
11331137 // If the position from the backup server is ahead of the primary then we
11341138 // need to perform a recovery so that we snapshot from the backup server.
1135- if remotePos .TXID > pos .TXID {
1139+ if remotePos .TXID > localPos .TXID {
11361140 slog .Warn ("restoring from backup" ,
11371141 slog .String ("name" , name ),
11381142 slog .Group ("pos" ,
1139- slog .String ("local" , pos .String ()),
1143+ slog .String ("local" , localPos .String ()),
11401144 slog .String ("remote" , remotePos .String ()),
11411145 ),
11421146 slog .String ("reason" , "remote-ahead" ),
@@ -1148,12 +1152,12 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11481152 // If the TXID matches the backup server, we need to ensure the checksum
11491153 // does as well. If it doesn't, we need to grab a snapshot from the backup
11501154 // server. If it does, then we can exit as we're already in sync.
1151- if remotePos .TXID == pos .TXID {
1152- if remotePos .PostApplyChecksum != pos .PostApplyChecksum {
1155+ if remotePos .TXID == localPos .TXID {
1156+ if remotePos .PostApplyChecksum != localPos .PostApplyChecksum {
11531157 slog .Warn ("restoring from backup" ,
11541158 slog .String ("name" , name ),
11551159 slog .Group ("pos" ,
1156- slog .String ("local" , pos .String ()),
1160+ slog .String ("local" , localPos .String ()),
11571161 slog .String ("remote" , remotePos .String ()),
11581162 ),
11591163 slog .String ("reason" , "chksum-mismatch" ),
@@ -1162,10 +1166,10 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11621166 }
11631167
11641168 slog .Debug ("database in sync with backup, skipping" , slog .String ("name" , name ))
1165- return pos , nil // already in sync
1169+ return localPos , nil // already in sync
11661170 }
11671171
1168- assert (remotePos .TXID < pos .TXID , "remote/local position must be ordered" )
1172+ assert (remotePos .TXID < localPos .TXID , "remote/local position must be ordered" )
11691173
11701174 // OPTIMIZE: Check that remote postApplyChecksum equals next TXID's preApplyChecksum
11711175
@@ -1176,13 +1180,14 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11761180 _ = r .(io.Closer ).Close ()
11771181 }
11781182 }()
1179- for txID := remotePos .TXID + 1 ; txID <= pos .TXID ; txID ++ {
1183+
1184+ for txID , n := remotePos .TXID + 1 , 0 ; txID <= localPos .TXID && n < MaxBackupLTXFileN ; txID , n = txID + 1 , n + 1 {
11801185 f , err := db .OpenLTXFile (txID )
11811186 if os .IsNotExist (err ) {
11821187 slog .Warn ("restoring from backup" ,
11831188 slog .String ("name" , name ),
11841189 slog .Group ("pos" ,
1185- slog .String ("local" , pos .String ()),
1190+ slog .String ("local" , localPos .String ()),
11861191 slog .String ("remote" , remotePos .String ()),
11871192 ),
11881193 slog .String ("txid" , txID .String ()),
@@ -1197,10 +1202,17 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11971202
11981203 // Compact LTX files through a pipe so we can pass it to the backup client.
11991204 pr , pw := io .Pipe ()
1205+ var pos ltx.Pos
12001206 go func () {
12011207 compactor := ltx .NewCompactor (pw , rdrs )
12021208 compactor .HeaderFlags = s .ltxHeaderFlags ()
1203- _ = pw .CloseWithError (compactor .Compact (ctx ))
1209+ if err := compactor .Compact (ctx ); err != nil {
1210+ _ = pw .CloseWithError (err )
1211+ return
1212+ }
1213+
1214+ pos = ltx .NewPos (compactor .Header ().MaxTXID , compactor .Trailer ().PostApplyChecksum )
1215+ _ = pw .Close ()
12041216 }()
12051217
12061218 var pmErr * ltx.PosMismatchError
@@ -1209,7 +1221,7 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
12091221 slog .Warn ("restoring from backup" ,
12101222 slog .String ("name" , name ),
12111223 slog .Group ("pos" ,
1212- slog .String ("local" , pos .String ()),
1224+ slog .String ("local" , localPos .String ()),
12131225 slog .String ("remote" , pmErr .Pos .String ()),
12141226 ),
12151227 slog .String ("reason" , "out-of-sync" ),
0 commit comments