Skip to content

Commit adc9684

Browse files
feat(tablet_repair): handle scylla tablet repair task aborts
We need to make sure that leftover scylla tablet repair tasks are not running, as scheduling new scylla tablet repair tasks on a table with an ongoing tablet repair ends with an error. On the other hand, this is just best effort, because we can't ensure that new scylla tablet repair tasks won't be created in the meantime or that the task that we were trying to abort has just finished on its own.
1 parent f44dece commit adc9684

File tree

3 files changed

+79
-0
lines changed

3 files changed

+79
-0
lines changed

pkg/scyllaclient/client_scylla.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,3 +1547,30 @@ func (c *Client) ScyllaSetUserTaskTTL(ctx context.Context, host string, ttlSecon
15471547
})
15481548
return err
15491549
}
1550+
1551+
// ScyllaTaskModule describes scylla task modules.
1552+
type ScyllaTaskModule string
1553+
1554+
// ScyllaTaskModuleTablets describes tablets ScyllaTaskModule.
1555+
const ScyllaTaskModuleTablets ScyllaTaskModule = "tablets"
1556+
1557+
// ScyllaTaskType describes scylla task types.
1558+
type ScyllaTaskType string
1559+
1560+
// ScyllaTaskTypeUserRepair describes user repair ScyllaTaskType.
1561+
const ScyllaTaskTypeUserRepair ScyllaTaskType = "user_repair"
1562+
1563+
// ScyllaListTasks lists Scylla tasks of given module.
1564+
func (c *Client) ScyllaListTasks(ctx context.Context, host string, module ScyllaTaskModule) ([]*models.TaskStats, error) {
1565+
if host != "" {
1566+
ctx = forceHost(ctx, host)
1567+
}
1568+
tasks, err := c.scyllaOps.TaskManagerListModuleTasksModuleGet(&operations.TaskManagerListModuleTasksModuleGetParams{
1569+
Context: ctx,
1570+
Module: string(module),
1571+
})
1572+
if err != nil {
1573+
return nil, err
1574+
}
1575+
return tasks.GetPayload(), nil
1576+
}

pkg/service/repair/tablet/service_integration_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,4 +357,22 @@ func TestTabletRepairIntegration(t *testing.T) {
357357
t.Fatal(err)
358358
}
359359
})
360+
361+
t.Run("Ongoing scylla tablet repair tasks", func(t *testing.T) {
362+
testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Minute)
363+
defer testCancel()
364+
365+
h.Hrt.SetInterceptor(nil)
366+
// Start lots of scylla tablet repair tasks
367+
for ft := range allTables {
368+
_, err = h.Client.TabletRepair(testCtx, ft.ks, ft.tab, "", nil, nil, scyllaclient.IncrementalModeDisabled)
369+
if err != nil {
370+
if _, _, ok := scyllaclient.IsColocatedTableErr(err); !ok {
371+
t.Fatal(err)
372+
}
373+
}
374+
}
375+
// Verify that those scylla tablet repair tasks won't break SM tablet repair task
376+
smokeTest(t, testCtx)
377+
})
360378
}

pkg/service/repair/tablet/worker.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func (s *Service) newWorker(ctx context.Context, clusterID, taskID, runID uuid.U
5656

5757
func (w *worker) repairAll(ctx context.Context, target Target) error {
5858
w.init(ctx, target)
59+
// We need to make sure that leftover scylla tablet repair tasks are not running,
60+
// as scheduling new scylla tablet repair tasks on a table with an ongoing tablet repair
61+
// ends with an error. On the other hand, this is just best effort, because we can't
62+
// ensure that new scylla tablet repair tasks won't be created in the meantime or that
63+
// the task that we were trying to abort has just finished on its own.
64+
w.abortAllRepairTasks(ctx)
5965

6066
for ks, tabs := range target.KsTabs {
6167
for _, tab := range tabs {
@@ -78,6 +84,22 @@ func (w *worker) init(ctx context.Context, target Target) {
7884
}
7985
}
8086

87+
func (w *worker) abortAllRepairTasks(ctx context.Context) {
88+
tasks, err := w.client.ScyllaListTasks(ctx, "", scyllaclient.ScyllaTaskModuleTablets)
89+
if err != nil {
90+
w.logger.Error(ctx, "Failed to list scylla tablet tasks", "error", err)
91+
return
92+
}
93+
for _, t := range tasks {
94+
if t == nil {
95+
continue
96+
}
97+
if scyllaclient.ScyllaTaskType(t.Type) == scyllaclient.ScyllaTaskTypeUserRepair {
98+
w.abortRepairTask(ctx, t.TaskID)
99+
}
100+
}
101+
}
102+
81103
func (w *worker) repairTable(ctx context.Context, client *scyllaclient.Client, ks, tab string) (err error) {
82104
pr := newRunProgress(w.clusterID, w.taskID, w.runID, ks, tab)
83105

@@ -115,6 +137,7 @@ func (w *worker) repairTable(ctx context.Context, client *scyllaclient.Client, k
115137

116138
status, err := client.ScyllaWaitTask(ctx, "", id, 0)
117139
if err != nil {
140+
w.abortRepairTask(context.Background(), id)
118141
return errors.Wrap(err, "get tablet repair task status")
119142
}
120143
switch scyllaclient.ScyllaTaskState(status.State) {
@@ -134,6 +157,17 @@ func (w *worker) incrementalRepairMode() scyllaclient.IncrementalMode {
134157
return ""
135158
}
136159

160+
func (w *worker) abortRepairTask(ctx context.Context, id string) {
161+
if err := w.client.ScyllaAbortTask(ctx, "", id); err != nil {
162+
w.logger.Error(context.Background(), "Failed to abort scylla repair task",
163+
"id", id,
164+
"error", err,
165+
)
166+
} else {
167+
w.logger.Info(ctx, "Aborted scylla tablet repair task", "id", id)
168+
}
169+
}
170+
137171
func (w *worker) upsertTableProgress(ctx context.Context, pr RunProgress) {
138172
q := table.TabletRepairRunProgress.InsertQuery(w.smSession)
139173
defer q.Release()

0 commit comments

Comments
 (0)