Skip to content

Commit 56888b5

Browse files
committed
restore: log restore retries to job messages table
Currently, when a restore job is stuck in a retry loop, no indication is made to the user that errors were encountered and the job is retrying (aside from within the logs). This commit teaches the restore retrier to now log errors to the job messages table, throttled to once every 5 minutes to avoid a hot loop. Epic: CRDB-50823 Fixes: cockroachdb#149787, cockroachdb#148033 Release note (general change): Restore jobs now log errors on retry to the job messages table.
1 parent a685bfb commit 56888b5

File tree

5 files changed

+186
-31
lines changed

5 files changed

+186
-31
lines changed

pkg/backup/restore_job.go

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
6464
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
6565
"github.com/cockroachdb/cockroach/pkg/sql/stats"
66+
"github.com/cockroachdb/cockroach/pkg/util"
6667
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
6768
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
6869
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -91,20 +92,31 @@ var (
9192
settings.WithVisibility(settings.Reserved),
9293
settings.PositiveDuration,
9394
)
95+
96+
restoreRetryLogRate = settings.RegisterDurationSetting(
97+
settings.ApplicationLevel,
98+
"restore.retry_log_rate",
99+
"maximum rate at which retryable restore errors are logged to the job messages table",
100+
5*time.Minute,
101+
settings.WithVisibility(settings.Reserved),
102+
settings.PositiveDuration,
103+
)
94104
)
95105

96-
// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
97-
// tables we process in a single txn when restoring their table statistics.
98-
const restoreStatsInsertBatchSize = 10
106+
const (
107+
// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
108+
// tables we process in a single txn when restoring their table statistics.
109+
restoreStatsInsertBatchSize = 10
99110

100-
// maxRestoreRetryFastFail is the maximum number of times we will retry without
101-
// seeing any progress before fast-failing the restore job.
102-
const maxRestoreRetryFastFail = 5
111+
// maxRestoreRetryFastFail is the maximum number of times we will retry before
112+
// exceeding the restoreRetryProgressThreshold.
113+
maxRestoreRetryFastFail = 5
103114

104-
// restoreRetryProgressThreshold is the fraction of the job that must
105-
// be _exceeded_ before we no longer fast fail the restore job after hitting the
106-
// maxRestoreRetryFastFail threshold.
107-
const restoreRetryProgressThreshold = 0
115+
// restoreRetryProgressThreshold is the fraction of the job that must
116+
// be _exceeded_ before we no longer fast fail the restore job after hitting the
117+
// maxRestoreRetryFastFail threshold.
118+
restoreRetryProgressThreshold = 0
119+
)
108120

109121
var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
110122
settings.ApplicationLevel,
@@ -198,10 +210,12 @@ func restoreWithRetry(
198210
// We want to retry a restore if there are transient failures (i.e. worker nodes
199211
// dying), so if we receive a retryable error, re-plan and retry the restore.
200212
retryOpts, progThreshold := getRetryOptionsAndProgressThreshold(execCtx)
213+
logRate := restoreRetryLogRate.Get(&execCtx.ExecCfg().Settings.SV)
214+
logThrottler := util.Every(logRate)
201215
var (
202-
res roachpb.RowCount
203-
err error
204-
currPersistedSpans, prevPersistedSpans jobspb.RestoreFrontierEntries
216+
res roachpb.RowCount
217+
err error
218+
prevPersistedSpans jobspb.RestoreFrontierEntries
205219
)
206220
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
207221
res, err = restore(
@@ -237,16 +251,19 @@ func restoreWithRetry(
237251

238252
log.Warningf(ctx, "encountered retryable error: %+v", err)
239253

240-
// Check if retry counter should be reset if progress was made.
241-
currPersistedSpans = resumer.job.
242-
Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
243-
if !currPersistedSpans.Equal(prevPersistedSpans) {
244-
// If the previous persisted spans are different than the current, it
245-
// implies that further progress has been persisted.
246-
r.Reset()
247-
log.Infof(ctx, "restored frontier has advanced since last retry, resetting retry counter")
254+
if logThrottler.ShouldProcess(timeutil.Now()) {
255+
// We throttle the logging of errors to the jobs messages table to avoid
256+
// flooding the table during the hot loop of a retry.
257+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
258+
return resumer.job.Messages().Record(
259+
ctx, txn, "error", fmt.Sprintf("restore encountered error: %v", err),
260+
)
261+
}); err != nil {
262+
log.Warningf(ctx, "failed to record job error message: %v", err)
263+
}
248264
}
249-
prevPersistedSpans = currPersistedSpans
265+
266+
prevPersistedSpans = maybeResetRetry(ctx, resumer, &r, prevPersistedSpans)
250267

251268
// Fail fast if no progress has been made after a certain number of retries.
252269
if r.CurrentAttempt() >= maxRestoreRetryFastFail &&
@@ -268,6 +285,7 @@ func restoreWithRetry(
268285
if err != nil {
269286
return res, jobs.MarkPauseRequestError(errors.Wrap(err, "exhausted retries"))
270287
}
288+
271289
return res, nil
272290
}
273291

@@ -281,8 +299,9 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
281299
// event that some progress has been made.
282300
maxDuration := restoreRetryMaxDuration.Get(&execCtx.ExecCfg().Settings.SV)
283301
retryOpts := retry.Options{
284-
MaxBackoff: 5 * time.Minute,
285-
MaxDuration: maxDuration,
302+
InitialBackoff: 50 * time.Millisecond,
303+
MaxBackoff: 5 * time.Minute,
304+
MaxDuration: maxDuration,
286305
}
287306
var progThreshold float32 = restoreRetryProgressThreshold
288307
if knobs := execCtx.ExecCfg().BackupRestoreTestingKnobs; knobs != nil {
@@ -297,6 +316,26 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
297316
return retryOpts, progThreshold
298317
}
299318

319+
// maybeResetRetry checks on the progress of the restore job and resets the
320+
// retry loop if progress has been made. It returns the latest progress.
321+
func maybeResetRetry(
322+
ctx context.Context,
323+
resumer *restoreResumer,
324+
rt *retry.Retry,
325+
prevProgress jobspb.RestoreFrontierEntries,
326+
) jobspb.RestoreFrontierEntries {
327+
// Check if retry counter should be reset if progress was made.
328+
var currProgress jobspb.RestoreFrontierEntries = resumer.job.
329+
Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
330+
if !currProgress.Equal(prevProgress) {
331+
// If the previous persisted spans are different than the current, it
332+
// implies that further progress has been persisted.
333+
rt.Reset()
334+
log.Infof(ctx, "restored frontier has advanced since last retry, resetting retry counter")
335+
}
336+
return currProgress
337+
}
338+
300339
type storeByLocalityKV map[string]cloudpb.ExternalStorage
301340

302341
func makeBackupLocalityMap(
@@ -384,10 +423,13 @@ func restore(
384423
restoreCheckpoint := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
385424
requiredSpans := dataToRestore.getSpans()
386425
progressTracker, err := makeProgressTracker(
426+
job,
427+
execCtx.ExecCfg(),
387428
requiredSpans,
388429
restoreCheckpoint,
389430
restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV),
390-
endTime)
431+
endTime,
432+
)
391433
if err != nil {
392434
return emptyRowCount, err
393435
}

pkg/backup/restore_progress.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
12+
"github.com/cockroachdb/cockroach/pkg/jobs"
1213
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
1415
"github.com/cockroachdb/cockroach/pkg/settings"
16+
"github.com/cockroachdb/cockroach/pkg/sql"
1517
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1618
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1719
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -36,6 +38,9 @@ var restoreCheckpointMaxBytes = settings.RegisterByteSizeSetting(
3638
var completedSpanTime = hlc.MaxTimestamp
3739

3840
type progressTracker struct {
41+
job *jobs.Job
42+
execCfg *sql.ExecutorConfig
43+
3944
// nextRequiredSpanKey maps a required span endkey to the subsequent requiredSpan's startKey.
4045
nextRequiredSpanKey map[string]roachpb.Key
4146

@@ -56,12 +61,13 @@ type progressTracker struct {
5661
}
5762

5863
func makeProgressTracker(
64+
job *jobs.Job,
65+
execCfg *sql.ExecutorConfig,
5966
requiredSpans roachpb.Spans,
6067
persistedSpans []jobspb.RestoreProgress_FrontierEntry,
6168
maxBytes int64,
6269
endTime hlc.Timestamp,
6370
) (*progressTracker, error) {
64-
6571
var (
6672
checkpointFrontier spanUtils.Frontier
6773
err error
@@ -76,13 +82,17 @@ func makeProgressTracker(
7682
nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key
7783
}
7884

79-
pt := &progressTracker{}
85+
pt := &progressTracker{
86+
job: job,
87+
execCfg: execCfg,
88+
nextRequiredSpanKey: nextRequiredSpanKey,
89+
maxBytes: maxBytes,
90+
endTime: endTime,
91+
}
8092
pt.mu.checkpointFrontier = checkpointFrontier
81-
pt.nextRequiredSpanKey = nextRequiredSpanKey
82-
pt.maxBytes = maxBytes
83-
pt.endTime = endTime
8493
return pt, nil
8594
}
95+
8696
func (pt *progressTracker) close() {
8797
pt.mu.Lock()
8898
defer pt.mu.Unlock()

pkg/backup/restore_progress_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ func TestProgressTracker(t *testing.T) {
8282
},
8383
} {
8484
restoreTime := hlc.Timestamp{}
85-
pt, err := makeProgressTracker(requiredSpans, persistedSpans, 0, restoreTime)
85+
pt, err := makeProgressTracker(
86+
nil /* job */, &execCfg, requiredSpans, persistedSpans, 0, restoreTime,
87+
)
8688
require.NoError(t, err, "step %d", i)
8789

8890
done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo))

pkg/backup/restore_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/backup/backuptestutils"
1515
"github.com/cockroachdb/cockroach/pkg/base"
16+
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
1617
"github.com/cockroachdb/cockroach/pkg/jobs"
1718
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1819
"github.com/cockroachdb/cockroach/pkg/sql"
@@ -206,3 +207,91 @@ func TestRestoreRetryFastFails(t *testing.T) {
206207
require.Equal(t, expFastFailAttempts, attempts)
207208
})
208209
}
210+
211+
func TestRestoreJobMessages(t *testing.T) {
212+
defer leaktest.AfterTest(t)()
213+
defer log.Scope(t).Close(t)
214+
215+
mu := struct {
216+
syncutil.Mutex
217+
retryCount int
218+
}{}
219+
// allowSuccess is a channel that will be closed when we want to allow the
220+
// restore job to succeed.
221+
allowSuccess := make(chan struct{})
222+
223+
testKnobs := &sql.BackupRestoreTestingKnobs{
224+
RestoreDistSQLRetryPolicy: &retry.Options{
225+
InitialBackoff: time.Microsecond,
226+
Multiplier: 1,
227+
MaxBackoff: time.Microsecond,
228+
// We will be allowing the restore job to succeed after a few job messages
229+
// are logged, so we just need MaxDuration to be long enough that it won't
230+
// be hit.
231+
MaxDuration: 5 * time.Minute,
232+
},
233+
RunBeforeRestoreFlow: func() error {
234+
mu.Lock()
235+
defer mu.Unlock()
236+
237+
if mu.retryCount < maxRestoreRetryFastFail {
238+
// Have not consumed all retries before a fast fail.
239+
mu.retryCount++
240+
return syscall.ECONNRESET
241+
}
242+
243+
return nil
244+
},
245+
RunAfterRestoreFlow: func() error {
246+
mu.Lock()
247+
defer mu.Unlock()
248+
249+
select {
250+
case <-allowSuccess:
251+
return nil
252+
default:
253+
mu.retryCount++
254+
return syscall.ECONNRESET
255+
}
256+
},
257+
}
258+
var params base.TestClusterArgs
259+
params.ServerArgs.Knobs.BackupRestore = testKnobs
260+
261+
const numAccounts = 10
262+
_, sqlDB, tmpDir, cleanupFn := backuptestutils.StartBackupRestoreTestCluster(
263+
t, singleNode, backuptestutils.WithParams(params), backuptestutils.WithBank(numAccounts),
264+
)
265+
defer cleanupFn()
266+
defer nodelocal.ReplaceNodeLocalForTesting(tmpDir)()
267+
268+
sqlDB.Exec(t, "SET CLUSTER SETTING restore.retry_log_rate = '100ms'")
269+
sqlDB.Exec(t, "BACKUP DATABASE data INTO 'nodelocal://1/backup'")
270+
271+
var restoreJobID jobspb.JobID
272+
sqlDB.QueryRow(
273+
t, `RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/backup'
274+
WITH detached, new_db_name = 'restored_data_'`,
275+
).Scan(&restoreJobID)
276+
277+
// Allow the restore job to fail a few times to log some error messages.
278+
time.AfterFunc(2*time.Second, func() {
279+
close(allowSuccess)
280+
})
281+
jobutils.WaitForJobToHaveStatus(t, sqlDB, restoreJobID, jobs.StateSucceeded)
282+
283+
var numErrMessages int
284+
sqlDB.QueryRow(
285+
t, `SELECT count(*) FROM system.job_message WHERE job_id = $1 AND kind = $2`,
286+
restoreJobID, "error",
287+
).Scan(&numErrMessages)
288+
require.Greater(t, numErrMessages, 1)
289+
// Depending on if the test is run under stress or not, we may log more
290+
// messages, so we just check that we log fewer than 50 messages. If there
291+
// were no throttling, there would be significantly more messages logged
292+
// (100+ without stress), so this is sufficient.
293+
require.Less(t, numErrMessages, 50)
294+
295+
finalStatusMsg := jobutils.GetJobStatusMessage(t, sqlDB, restoreJobID)
296+
require.Empty(t, finalStatusMsg, "status message should be cleared on job completion")
297+
}

pkg/testutils/jobutils/jobs_verification.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ func WaitForJobToHaveStatus(
100100
}
101101
}
102102

103+
// GetJobStatusMessage retrieves the current status message for a job.
104+
func GetJobStatusMessage(
105+
t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID,
106+
) jobs.StatusMessage {
107+
t.Helper()
108+
statuses := db.QueryStr(t, "SELECT status FROM system.job_status WHERE job_id = $1", jobID)
109+
if len(statuses) == 0 {
110+
return ""
111+
}
112+
return jobs.StatusMessage(statuses[0][0])
113+
}
114+
103115
// BulkOpResponseFilter creates a blocking response filter for the responses
104116
// related to bulk IO/backup/restore/import: Export, Import and AddSSTable. See
105117
// discussion on RunJob for where this might be useful.

0 commit comments

Comments
 (0)