diff --git a/pkg/scyllaclient/client_scylla.go b/pkg/scyllaclient/client_scylla.go index 02ea0e0486..7f40b1bbe4 100644 --- a/pkg/scyllaclient/client_scylla.go +++ b/pkg/scyllaclient/client_scylla.go @@ -15,6 +15,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/cespare/xxhash/v2" @@ -634,40 +635,135 @@ const ( IncrementalModeDisabled IncrementalMode = "disabled" ) -// TabletRepair schedules Scylla repair tablet table task and returns its ID. -// All tablets will be repaired with just a single task. It repairs all hosts -// by default, but it's possible to filter them by DC or host ID. -// Master is optional, as we can query any node for table repair task status. -func (c *Client) TabletRepair(ctx context.Context, keyspace, table, master string, dcs, hostIDs []string, incrementalMode IncrementalMode) (string, error) { - const allTablets = "all" - dontAwaitCompletion := "false" +// TabletRepairParams specifies optional parameters of tablet repair task. +type TabletRepairParams struct { + DCs []string + HostIDs []string + Incremental IncrementalMode + // LongPollingSeconds specifies intervals in which Callback is + // called with current task progress. Zero results in calling + // Callback only after task is finished. + LongPollingSeconds int64 + // Callback allowing to track task progress while it is running. + // When detailed progress is not available, Callback might + // be called with current and total set to 0. + Callback func(status ScyllaTaskState, current, total int64) +} + +// TabletRepair schedules tablet repair task and waits for it to finish. +// See TabletRepairParams for optional parameters. +func (c *Client) TabletRepair(ctx context.Context, ks, tab string, params TabletRepairParams) error { + const repairEntireTable = "all" ctx = withShouldRetryHandler(ctx, tabletRepairShouldRetryHandler) - if master != "" { - ctx = forceHost(ctx, master) - } p := operations.StorageServiceTabletsRepairPostParams{ - Context: ctx, - Ks: keyspace, - Table: table, - Tokens: allTablets, - AwaitCompletion: &dontAwaitCompletion, - } - if len(dcs) > 0 { - merged := strings.Join(dcs, ",") + Context: ctx, + Ks: ks, + Table: tab, + Tokens: repairEntireTable, + } + if len(params.DCs) > 0 { + merged := strings.Join(params.DCs, ",") p.SetDcsFilter(&merged) } - if len(hostIDs) > 0 { - merged := strings.Join(hostIDs, ",") + if len(params.HostIDs) > 0 { + merged := strings.Join(params.HostIDs, ",") p.SetHostsFilter(&merged) } - if incrementalMode != "" { - p.SetIncrementalMode(&incrementalMode) + if params.Incremental != "" { + p.SetIncrementalMode(¶ms.Incremental) } + + // Start by scheduling async tablet repair task + // and getting its ID. + p.AwaitCompletion = pointer.StringPtr("false") resp, err := c.scyllaOps.StorageServiceTabletsRepairPost(&p) if err != nil { - return "", err + return errors.Wrap(err, "schedule async tablet repair") + } + taskID := resp.GetPayload().TabletTaskID + + // Spawn goroutine responsible for calling callback + // with current task progress. It's not needed when + // either long polling or callback are not set. + // It's also not needed to record errors from such + // goroutine, as the final error will come from + // the synchronous wait below. Goroutine exists + // on error, task finish, or context cancellation. + progressCtx, progressCtxCancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + if params.LongPollingSeconds > 0 && params.Callback != nil { + wg.Go(func() { + for { + taskStatus, err := c.ScyllaWaitTask(progressCtx, "", taskID, params.LongPollingSeconds) + if err != nil { + return + } + // It's safe to cast progress to int64, + // as it is representing tablet count. + params.Callback( + ScyllaTaskState(taskStatus.State), + int64(taskStatus.ProgressCompleted), + int64(taskStatus.ProgressTotal), + ) + if ScyllaTaskState(taskStatus.State) == ScyllaTaskStateDone || + ScyllaTaskState(taskStatus.State) == ScyllaTaskStateFailed { + return + } + } + }) } - return resp.GetPayload().TabletTaskID, nil + + // Synchronously wait for task to finish. + // Information about tablet repair task progress is not kept on + // scylla side after task if finished. That's why we need synchronous + // waiting alongside long polling, so that we avoid scenario where + // tablet repair task finished in between long polling calls and + // the next one returns HTTP 404 error claiming that tablet repair + // task with given ID does not exist. Status returned from such call + // is also the final status on which callback is called. + taskStatus, err := c.ScyllaWaitTask(ctx, "", taskID, 0) + // Make sure that progress goroutine returns before proceeding + progressCtxCancel() + wg.Wait() + + if err != nil { + // Even with synchronous waiting, it's still possible for the same + // problem to occur, if tablet repair task finishes before we start + // waiting synchronously for it. In such case, we can retry tablet + // repair by synchronously waiting on the call for scheduling tablet + // repair task. We prefer not to do it, as in this case, we don't + // get tablet task ID and can't track its progress while it's running, + // but it's better than failing the whole repair. Also, this problem + // usually occurs for empty/small tables, so this retry shouldn't take + // much time. + if httpStatus, _ := StatusCodeAndMessageOf(err); httpStatus != http.StatusNotFound { + return errors.Wrap(err, "sync wait for tablet repair task") + } + p.AwaitCompletion = pointer.StringPtr("true") + _, err = c.scyllaOps.StorageServiceTabletsRepairPost(&p) + if err != nil { + return errors.Wrap(err, "schedule sync tablet repair") + } + // We don't have progress information because of the synchronous waiting + // on tablet repair schedule, so we can just report status done. + if params.Callback != nil { + params.Callback(ScyllaTaskStateDone, 0, 0) + } + return nil + } + + // Final callback on progress obtained from synchronous waiting + if params.Callback != nil { + params.Callback( + ScyllaTaskState(taskStatus.State), + int64(taskStatus.ProgressCompleted), + int64(taskStatus.ProgressTotal), + ) + } + if ScyllaTaskState(taskStatus.State) != ScyllaTaskStateDone { + return errors.Errorf("tablet repair task finished with state %q", ScyllaTaskStateFailed) + } + return nil } // Regex of schedule colocated table repair error. Taken from: diff --git a/pkg/service/repair/tablet/service_integration_test.go b/pkg/service/repair/tablet/service_integration_test.go index da4efa8739..be423c935c 100644 --- a/pkg/service/repair/tablet/service_integration_test.go +++ b/pkg/service/repair/tablet/service_integration_test.go @@ -355,7 +355,7 @@ func TestTabletRepairIntegration(t *testing.T) { h.Hrt.SetInterceptor(nil) // Start lots of scylla tablet repair tasks for ft := range allTables { - _, _ = h.Client.TabletRepair(testCtx, ft.ks, ft.tab, "", nil, nil, scyllaclient.IncrementalModeDisabled) + _ = h.Client.TabletRepair(testCtx, ft.ks, ft.tab, scyllaclient.TabletRepairParams{}) } // Make sure that they are visible on all nodes for _, host := range ManagedClusterHosts() { diff --git a/pkg/service/repair/tablet/worker.go b/pkg/service/repair/tablet/worker.go index 6e425f4fc7..93c2afcb26 100644 --- a/pkg/service/repair/tablet/worker.go +++ b/pkg/service/repair/tablet/worker.go @@ -111,7 +111,7 @@ func (w *worker) repairTable(ctx context.Context, client *scyllaclient.Client, k }(start) // Use scylla side default incremental mode (#4683) - id, err := client.TabletRepair(ctx, ks, tab, "", nil, nil, "") + err = client.TabletRepair(ctx, ks, tab, scyllaclient.TabletRepairParams{}) if err != nil { if _, _, ok := scyllaclient.IsColocatedTableErr(err); ok { // Since we always repair all tablet tables, @@ -119,25 +119,9 @@ func (w *worker) repairTable(ctx context.Context, client *scyllaclient.Client, k w.logger.Info(ctx, "Skipping repair of colocated tablet table", "keyspace", ks, "table", tab, "error", err) return nil } - return errors.Wrapf(err, "schedule tablet repair") - } - - w.logger.Info(ctx, "Scheduled tablet repair", "keyspace", ks, "table", tab, "task ID", id) - w.upsertTableProgress(ctx, pr) - - status, err := client.ScyllaWaitTask(ctx, "", id, 0) - if err != nil { - w.abortRepairTask(context.Background(), id) - return errors.Wrap(err, "get tablet repair task status") - } - switch scyllaclient.ScyllaTaskState(status.State) { - case scyllaclient.ScyllaTaskStateDone: - return nil - case scyllaclient.ScyllaTaskStateFailed: - return errors.Errorf("tablet repair task finished with status %q", scyllaclient.ScyllaTaskStateFailed) - default: - return errors.Errorf("unexpected tablet repair task status %q", status.State) + return errors.Wrapf(err, "tablet repair") } + return nil } func (w *worker) abortRepairTask(ctx context.Context, id string) { diff --git a/pkg/service/repair/worker.go b/pkg/service/repair/worker.go index 5a8d561038..05d60b5a7e 100644 --- a/pkg/service/repair/worker.go +++ b/pkg/service/repair/worker.go @@ -66,7 +66,7 @@ func (w *worker) runRepair(ctx context.Context, j job) (out error) { var ranges []scyllaclient.TokenRange switch j.jobType { case tabletJobType: - return w.fullTabletTableRepair(ctx, j.keyspace, j.table, j.master.String()) + return w.fullTabletTableRepair(ctx, j.keyspace, j.table) case smallTableJobType: ranges = nil case mergeRangesJobType: @@ -170,17 +170,25 @@ func (w *worker) isTableDeleted(ctx context.Context, j job) bool { return !exists } -func (w *worker) fullTabletTableRepair(ctx context.Context, keyspace, table, host string) error { +func (w *worker) fullTabletTableRepair(ctx context.Context, keyspace, table string) error { hostFilter, err := w.hostFilter(ctx) if err != nil { return errors.Wrap(err, "create host filter") } - var incrementalMode string + w.logger.Info(ctx, "Repairing entire tablet table", + "keyspace", keyspace, + "table", table, + ) + + p := scyllaclient.TabletRepairParams{ + DCs: w.target.DC, + HostIDs: hostFilter, + } if w.apiSupport.incrementalRepair { - incrementalMode = w.target.IncrementalMode + p.Incremental = w.target.IncrementalMode } - id, err := w.client.TabletRepair(ctx, keyspace, table, host, w.target.DC, hostFilter, incrementalMode) + err = w.client.TabletRepair(ctx, keyspace, table, p) if err != nil { convertedErr := w.convertColocatedTableRepairErr(err) if convertedErr == nil { @@ -192,28 +200,7 @@ func (w *worker) fullTabletTableRepair(ctx context.Context, keyspace, table, hos } return errors.Wrap(convertedErr, "schedule tablet repair task") } - - w.logger.Info(ctx, "Repairing entire tablet table", - "keyspace", keyspace, - "table", table, - "task ID", id, - ) - - status, err := w.client.ScyllaWaitTask(ctx, host, id, 0) - if err != nil { - w.scyllaAbortTask(host, id) - return errors.Wrap(err, "get tablet repair task status") - } - - switch scyllaclient.ScyllaTaskState(status.State) { - case scyllaclient.ScyllaTaskStateDone: - return nil - case scyllaclient.ScyllaTaskStateFailed: - return errors.Errorf("tablet repair task finished with status %q", scyllaclient.ScyllaTaskStateFailed) - default: - w.scyllaAbortTask(host, id) - return errors.Errorf("unexpected tablet repair task status %q", status.State) - } + return nil } func (w *worker) hostFilter(ctx context.Context) ([]string, error) { @@ -251,6 +238,7 @@ func (w *worker) convertColocatedTableRepairErr(scheduleTabletRepairErr error) e return errors.Wrap(scheduleTabletRepairErr, "base table is not repaired, use --keyspace flag to either include it or exclude its colocated tables") } +// TODO: abort ongoing tablet repair tasks on start but only for planned tables func (w *worker) scyllaAbortTask(host, id string) { if err := w.client.ScyllaAbortTask(context.Background(), host, id); err != nil { w.logger.Error(context.Background(), "Failed to abort task",