Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 120 additions & 24 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -634,40 +635,135 @@ const (
IncrementalModeDisabled IncrementalMode = "disabled"
)

// TabletRepair schedules Scylla repair tablet table task and returns its ID.
// All tablets will be repaired with just a single task. It repairs all hosts
// by default, but it's possible to filter them by DC or host ID.
// Master is optional, as we can query any node for table repair task status.
func (c *Client) TabletRepair(ctx context.Context, keyspace, table, master string, dcs, hostIDs []string, incrementalMode IncrementalMode) (string, error) {
const allTablets = "all"
dontAwaitCompletion := "false"
// TabletRepairParams specifies optional parameters of tablet repair task.
type TabletRepairParams struct {
DCs []string
HostIDs []string
Incremental IncrementalMode
// LongPollingSeconds specifies intervals in which Callback is
// called with current task progress. Zero results in calling
// Callback only after task is finished.
LongPollingSeconds int64
// Callback allowing to track task progress while it is running.
// When detailed progress is not available, Callback might
// be called with current and total set to 0.
Callback func(status ScyllaTaskState, current, total int64)
}

// TabletRepair schedules tablet repair task and waits for it to finish.
// See TabletRepairParams for optional parameters.
func (c *Client) TabletRepair(ctx context.Context, ks, tab string, params TabletRepairParams) error {
const repairEntireTable = "all"
ctx = withShouldRetryHandler(ctx, tabletRepairShouldRetryHandler)
if master != "" {
ctx = forceHost(ctx, master)
}
p := operations.StorageServiceTabletsRepairPostParams{
Context: ctx,
Ks: keyspace,
Table: table,
Tokens: allTablets,
AwaitCompletion: &dontAwaitCompletion,
}
if len(dcs) > 0 {
merged := strings.Join(dcs, ",")
Context: ctx,
Ks: ks,
Table: tab,
Tokens: repairEntireTable,
}
if len(params.DCs) > 0 {
merged := strings.Join(params.DCs, ",")
p.SetDcsFilter(&merged)
}
if len(hostIDs) > 0 {
merged := strings.Join(hostIDs, ",")
if len(params.HostIDs) > 0 {
merged := strings.Join(params.HostIDs, ",")
p.SetHostsFilter(&merged)
}
if incrementalMode != "" {
p.SetIncrementalMode(&incrementalMode)
if params.Incremental != "" {
p.SetIncrementalMode(&params.Incremental)
}

// Start by scheduling async tablet repair task
// and getting its ID.
p.AwaitCompletion = pointer.StringPtr("false")
resp, err := c.scyllaOps.StorageServiceTabletsRepairPost(&p)
if err != nil {
return "", err
return errors.Wrap(err, "schedule async tablet repair")
}
taskID := resp.GetPayload().TabletTaskID

// Spawn goroutine responsible for calling callback
// with current task progress. It's not needed when
// either long polling or callback are not set.
// It's also not needed to record errors from such
// goroutine, as the final error will come from
// the synchronous wait below. Goroutine exists
// on error, task finish, or context cancellation.
progressCtx, progressCtxCancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
if params.LongPollingSeconds > 0 && params.Callback != nil {
wg.Go(func() {
for {
taskStatus, err := c.ScyllaWaitTask(progressCtx, "", taskID, params.LongPollingSeconds)
if err != nil {
return
}
// It's safe to cast progress to int64,
// as it is representing tablet count.
params.Callback(
ScyllaTaskState(taskStatus.State),
int64(taskStatus.ProgressCompleted),
int64(taskStatus.ProgressTotal),
)
if ScyllaTaskState(taskStatus.State) == ScyllaTaskStateDone ||
ScyllaTaskState(taskStatus.State) == ScyllaTaskStateFailed {
return
}
}
})
}
return resp.GetPayload().TabletTaskID, nil

// Synchronously wait for task to finish.
// Information about tablet repair task progress is not kept on
// scylla side after task if finished. That's why we need synchronous
// waiting alongside long polling, so that we avoid scenario where
// tablet repair task finished in between long polling calls and
// the next one returns HTTP 404 error claiming that tablet repair
// task with given ID does not exist. Status returned from such call
// is also the final status on which callback is called.
taskStatus, err := c.ScyllaWaitTask(ctx, "", taskID, 0)
// Make sure that progress goroutine returns before proceeding
progressCtxCancel()
wg.Wait()

if err != nil {
// Even with synchronous waiting, it's still possible for the same
// problem to occur, if tablet repair task finishes before we start
// waiting synchronously for it. In such case, we can retry tablet
// repair by synchronously waiting on the call for scheduling tablet
// repair task. We prefer not to do it, as in this case, we don't
// get tablet task ID and can't track its progress while it's running,
// but it's better than failing the whole repair. Also, this problem
// usually occurs for empty/small tables, so this retry shouldn't take
// much time.
if httpStatus, _ := StatusCodeAndMessageOf(err); httpStatus != http.StatusNotFound {
return errors.Wrap(err, "sync wait for tablet repair task")
}
p.AwaitCompletion = pointer.StringPtr("true")
_, err = c.scyllaOps.StorageServiceTabletsRepairPost(&p)
if err != nil {
return errors.Wrap(err, "schedule sync tablet repair")
}
// We don't have progress information because of the synchronous waiting
// on tablet repair schedule, so we can just report status done.
if params.Callback != nil {
params.Callback(ScyllaTaskStateDone, 0, 0)
}
return nil
}

// Final callback on progress obtained from synchronous waiting
if params.Callback != nil {
params.Callback(
ScyllaTaskState(taskStatus.State),
int64(taskStatus.ProgressCompleted),
int64(taskStatus.ProgressTotal),
)
}
if ScyllaTaskState(taskStatus.State) != ScyllaTaskStateDone {
return errors.Errorf("tablet repair task finished with state %q", ScyllaTaskStateFailed)
}
return nil
}

// Regex of schedule colocated table repair error. Taken from:
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/repair/tablet/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestTabletRepairIntegration(t *testing.T) {
h.Hrt.SetInterceptor(nil)
// Start lots of scylla tablet repair tasks
for ft := range allTables {
_, _ = h.Client.TabletRepair(testCtx, ft.ks, ft.tab, "", nil, nil, scyllaclient.IncrementalModeDisabled)
_ = h.Client.TabletRepair(testCtx, ft.ks, ft.tab, scyllaclient.TabletRepairParams{})
}
// Make sure that they are visible on all nodes
for _, host := range ManagedClusterHosts() {
Expand Down
22 changes: 3 additions & 19 deletions pkg/service/repair/tablet/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,17 @@ func (w *worker) repairTable(ctx context.Context, client *scyllaclient.Client, k
}(start)

// Use scylla side default incremental mode (#4683)
id, err := client.TabletRepair(ctx, ks, tab, "", nil, nil, "")
err = client.TabletRepair(ctx, ks, tab, scyllaclient.TabletRepairParams{})
if err != nil {
if _, _, ok := scyllaclient.IsColocatedTableErr(err); ok {
// Since we always repair all tablet tables,
// we can always skip colocated tablet repair error.
w.logger.Info(ctx, "Skipping repair of colocated tablet table", "keyspace", ks, "table", tab, "error", err)
return nil
}
return errors.Wrapf(err, "schedule tablet repair")
}

w.logger.Info(ctx, "Scheduled tablet repair", "keyspace", ks, "table", tab, "task ID", id)
w.upsertTableProgress(ctx, pr)

status, err := client.ScyllaWaitTask(ctx, "", id, 0)
if err != nil {
w.abortRepairTask(context.Background(), id)
return errors.Wrap(err, "get tablet repair task status")
}
switch scyllaclient.ScyllaTaskState(status.State) {
case scyllaclient.ScyllaTaskStateDone:
return nil
case scyllaclient.ScyllaTaskStateFailed:
return errors.Errorf("tablet repair task finished with status %q", scyllaclient.ScyllaTaskStateFailed)
default:
return errors.Errorf("unexpected tablet repair task status %q", status.State)
return errors.Wrapf(err, "tablet repair")
}
return nil
}

func (w *worker) abortRepairTask(ctx context.Context, id string) {
Expand Down
42 changes: 15 additions & 27 deletions pkg/service/repair/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
var ranges []scyllaclient.TokenRange
switch j.jobType {
case tabletJobType:
return w.fullTabletTableRepair(ctx, j.keyspace, j.table, j.master.String())
return w.fullTabletTableRepair(ctx, j.keyspace, j.table)
case smallTableJobType:
ranges = nil
case mergeRangesJobType:
Expand Down Expand Up @@ -170,17 +170,25 @@
return !exists
}

func (w *worker) fullTabletTableRepair(ctx context.Context, keyspace, table, host string) error {
func (w *worker) fullTabletTableRepair(ctx context.Context, keyspace, table string) error {
hostFilter, err := w.hostFilter(ctx)
if err != nil {
return errors.Wrap(err, "create host filter")
}

var incrementalMode string
w.logger.Info(ctx, "Repairing entire tablet table",
"keyspace", keyspace,
"table", table,
)

p := scyllaclient.TabletRepairParams{
DCs: w.target.DC,
HostIDs: hostFilter,
}
if w.apiSupport.incrementalRepair {
incrementalMode = w.target.IncrementalMode
p.Incremental = w.target.IncrementalMode
}
id, err := w.client.TabletRepair(ctx, keyspace, table, host, w.target.DC, hostFilter, incrementalMode)
err = w.client.TabletRepair(ctx, keyspace, table, p)
if err != nil {
convertedErr := w.convertColocatedTableRepairErr(err)
if convertedErr == nil {
Expand All @@ -192,28 +200,7 @@
}
return errors.Wrap(convertedErr, "schedule tablet repair task")
}

w.logger.Info(ctx, "Repairing entire tablet table",
"keyspace", keyspace,
"table", table,
"task ID", id,
)

status, err := w.client.ScyllaWaitTask(ctx, host, id, 0)
if err != nil {
w.scyllaAbortTask(host, id)
return errors.Wrap(err, "get tablet repair task status")
}

switch scyllaclient.ScyllaTaskState(status.State) {
case scyllaclient.ScyllaTaskStateDone:
return nil
case scyllaclient.ScyllaTaskStateFailed:
return errors.Errorf("tablet repair task finished with status %q", scyllaclient.ScyllaTaskStateFailed)
default:
w.scyllaAbortTask(host, id)
return errors.Errorf("unexpected tablet repair task status %q", status.State)
}
return nil
}

func (w *worker) hostFilter(ctx context.Context) ([]string, error) {
Expand Down Expand Up @@ -251,7 +238,8 @@
return errors.Wrap(scheduleTabletRepairErr, "base table is not repaired, use --keyspace flag to either include it or exclude its colocated tables")
}

// TODO: abort ongoing tablet repair tasks on start but only for planned tables

Check failure on line 241 in pkg/service/repair/worker.go

View workflow job for this annotation

GitHub Actions / Various checks

Comment should end in a period (godot)
func (w *worker) scyllaAbortTask(host, id string) {

Check failure on line 242 in pkg/service/repair/worker.go

View workflow job for this annotation

GitHub Actions / Various checks

func (*worker).scyllaAbortTask is unused (unused)
if err := w.client.ScyllaAbortTask(context.Background(), host, id); err != nil {
w.logger.Error(context.Background(), "Failed to abort task",
"host", host,
Expand Down
Loading