Skip to content

feat(1-1-restore): set tombstone_gc mode to 'repair' for views #4377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/service/one2onerestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ type node struct {

// Host contains basic information about Scylla node.
type Host struct {
ID string
DC string
Addr string
ShardCount int
ID string
DC string
Addr string
ShardCount int
SafeDescribeMethod scyllaclient.SafeDescribeMethod
}

// ViewType either Materialized View or Secondary Index.
Expand Down
3 changes: 1 addition & 2 deletions pkg/service/one2onerestore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// to show progress can return some information at this point.
// Sets initial values for download_remaining_bytes and view_build_status metrics.
func (w *worker) initProgressAndMetrics(ctx context.Context, workload []hostWorkload) error {
tablesToRestore := getTablesToRestore(workload)
views, err := w.getViews(ctx, tablesToRestore)
views, err := w.getViews(ctx, workload)
if err != nil {
return errors.Wrap(err, "get views")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/service/one2onerestore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (s *Service) newWorker(ctx context.Context, clusterID, taskID, runID uuid.U

client: client,
clusterSession: clusterSession,
sessionFunc: s.clusterSession,

logger: s.logger,
metrics: s.metrics,
Expand Down
56 changes: 48 additions & 8 deletions pkg/service/one2onerestore/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
WriteData(t, clusterSession, ksName, 10)
mvName := "testmv"
CreateMaterializedView(t, clusterSession, ksName, BigTableName, mvName)
siName := "testsi"
siTableName := siName + "_index"
CreateSecondaryIndex(t, clusterSession, ksName, BigTableName, siName)

srcCnt := rowCount(t, clusterSession, ksName, BigTableName)
if srcCnt == 0 {
Expand All @@ -42,6 +45,10 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
if srcCntMV == 0 {
t.Fatalf("Unexpected row count in materialized view: 0")
}
srcCntSI := rowCount(t, clusterSession, ksName, siTableName)
if srcCntSI == 0 {
t.Fatalf("Unexpected row count in secondary index: 0")
}

Print("Run backup")
loc := []backupspec.Location{testLocation("1-1-restore", "")}
Expand Down Expand Up @@ -75,16 +82,30 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
if srcCntMV != dstCntMV {
t.Fatalf("Expected row count in materialized view %d, but got %d", srcCntMV, dstCntMV)
}
dstCntSI := rowCount(t, clusterSession, ksName, siTableName)
if srcCntSI != dstCntSI {
t.Fatalf("Expected row count in secondary index %d, but got %d", srcCntSI, dstCntSI)
}

// Ensure table's tombstone_gc mode is set to 'repair'
w, _ := newTestWorker(t, ManagedClusterHosts())
mode, err := w.getTableTombstoneGCMode(ksName, BigTableName)
if err != nil {
t.Fatalf("Get table tombstone_gc mode: %v", err)
}
if mode != modeRepair {
t.Fatalf("Expected repair mode, but got %s", string(mode))
}
validateTombstoneGCMode(t, []testTable{
{
ks: ksName,
name: BigTableName,
},
{
ks: ksName,
name: mvName,
isView: true,
},
// It turns out, there is no way to create INDEX with tombstone_gc mode repair,
// so for now I'm skipping this check.
//{
// ks: ksName,
// name: siTableName,
// isView: true,
//},
})

Print("Validate progress")
pr, err := h.restoreSvc.GetProgress(context.Background(), h.clusterID, h.taskID, h.runID, h.props)
Expand All @@ -94,6 +115,25 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
validateGetProgress(t, pr)
}

type testTable struct {
ks, name string
isView bool
}

func validateTombstoneGCMode(t *testing.T, tables []testTable) {
t.Helper()
w, _ := newTestWorker(t, ManagedClusterHosts())
for _, table := range tables {
mode, err := w.getTombstoneGCMode(table.ks, table.name, table.isView)
if err != nil {
t.Fatalf("Get table tombstone_gc mode: %v", err)
}
if mode != modeRepair {
t.Fatalf("Expected repair mode, but got %s, table: %s.%s", string(mode), table.ks, table.name)
}
}
}

func truncateAllTablesInKeyspace(tb testing.TB, session gocqlx.Session, ks string) {
tb.Helper()

Expand Down
51 changes: 40 additions & 11 deletions pkg/service/one2onerestore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ import (
"github.com/scylladb/scylla-manager/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
"github.com/scylladb/scylla-manager/v3/pkg/util/version"
"go.uber.org/multierr"
)

type worker struct {
managerSession gocqlx.Session

client *scyllaclient.Client
clusterSession gocqlx.Session
sessionFunc cluster.SessionFunc // is needed to create cql session to single host

logger log.Logger
metrics metrics.One2OneRestoreMetrics
Expand Down Expand Up @@ -69,7 +70,7 @@ func (w *worker) parseTarget(ctx context.Context, properties json.RawMessage) (T
}

// restore is an actual 1-1-restore stages.
func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Target) (err error) {
func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Target) error {
defer func() {
if err := w.setAutoCompaction(context.Background(), workload, true); err != nil {
w.logger.Error(ctx, "Can't enable auto compaction", "err", err)
Expand Down Expand Up @@ -99,16 +100,16 @@ func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Ta
if err != nil {
return errors.Wrap(err, "drop views")
}
defer func() {
if rErr := w.reCreateViews(ctx, views); rErr != nil {
err = multierr.Combine(
err,
errors.Wrap(rErr, "recreate views"),
)
}
}()

return w.restoreTables(ctx, workload, target.Keyspace)
if err := w.restoreTables(ctx, workload, target.Keyspace); err != nil {
return errors.Wrap(err, "restore tables")
}

if err := w.reCreateViews(ctx, views); err != nil {
return errors.Wrap(err, "recreate views")
}

return nil
}

// getAllSnapshotManifestsAndTargetHosts gets backup(source) cluster node represented by manifests and target cluster nodes.
Expand Down Expand Up @@ -200,6 +201,16 @@ func (w *worker) prepareHostWorkload(ctx context.Context, manifests []*backupspe
return errors.Wrap(err, "read manifest content")
}

nodeInfo, err := w.client.NodeInfo(ctx, h.Addr)
if err != nil {
return errors.Wrapf(err, "get node %s info", h.Addr)
}
method, err := nodeInfo.SupportsSafeDescribeSchemaWithInternals()
if err != nil {
return errors.Wrapf(err, "node %s safe describe method", h.Addr)
}
hw.host.SafeDescribeMethod = method

result[i] = hw

return nil
Expand Down Expand Up @@ -237,6 +248,24 @@ func (w *worker) pinAgentCPU(ctx context.Context, workload []hostWorkload, pin b
})
}

func (w *worker) singleHostCQLSession(ctx context.Context, clusterID uuid.UUID, host string) (gocqlx.Session, error) {
session, err := w.sessionFunc(ctx, clusterID, cluster.SingleHostSessionConfigOption(host))
if err != nil {
return gocqlx.Session{}, errors.Wrap(err, "create cql session")
}
return session, nil
}

func (w *worker) raftReadBarrier(ctx context.Context, session gocqlx.Session, host Host) error {
switch host.SafeDescribeMethod {
case scyllaclient.SafeDescribeMethodReadBarrierAPI:
return w.client.RaftReadBarrier(ctx, host.Addr, "")
case scyllaclient.SafeDescribeMethodReadBarrierCQL:
return query.RaftReadBarrier(session)
}
return errors.Errorf("unsupported method: %s", host.SafeDescribeMethod)
}

// alterSchemaRetryWrapper is useful when executing many statements altering schema,
// as it might take more time for Scylla to process them one after another.
// This wrapper exits on: success, context cancel, op returned non-timeout error or after maxTotalTime has passed.
Expand Down
78 changes: 57 additions & 21 deletions pkg/service/one2onerestore/worker_tgc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,68 @@ const (

// setTombstoneGCModeRepair sets tombstone gc mode to repair to avoid data resurrection issues during restore.
func (w *worker) setTombstoneGCModeRepair(ctx context.Context, workload []hostWorkload) error {
type alterTGCTarget struct {
keyspace string
name string
isView bool
}
var targets []alterTGCTarget
for table := range getTablesToRestore(workload) {
mode, err := w.getTableTombstoneGCMode(table.keyspace, table.table)
targets = append(targets, alterTGCTarget{
keyspace: table.keyspace,
name: table.table,
isView: false,
})
}
views, err := w.getViews(ctx, workload)
if err != nil {
return errors.Wrap(err, "get views")
}
for _, view := range views {
targets = append(targets, alterTGCTarget{
keyspace: view.Keyspace,
name: view.View,
isView: true,
})
}

for _, target := range targets {
mode, err := w.getTombstoneGCMode(target.keyspace, target.name, target.isView)
if err != nil {
return errors.Wrap(err, "get tombstone_gc mode")
return errors.Wrapf(err, "get tombstone_gc mode: %s.%s", target.keyspace, target.name)
}
// No need to change tombstone gc mode.
if mode == modeDisabled || mode == modeImmediate || mode == modeRepair {
w.logger.Info(ctx, "Skipping set tombstone_gc mode", "table", table, "mode", mode)
w.logger.Info(ctx, "Skipping set tombstone_gc mode", "name", target.keyspace+"."+target.name, "mode", mode)
continue
}
if err := w.setTableTombstoneGCMode(ctx, table.keyspace, table.table, modeRepair); err != nil {
return errors.Wrap(err, "set tombstone_gc mode repair")
if err := w.setTombstoneGCMode(ctx, target.keyspace, target.name, target.isView, modeRepair); err != nil {
return errors.Wrapf(err, "set tombstone_gc mode repair: %s.%s", target.keyspace, target.name)
}
}

return nil
}

// getTableTombstoneGCMode returns table's tombstone_gc mode.
func (w *worker) getTableTombstoneGCMode(keyspace, table string) (tombstoneGCMode, error) {
// getTombstoneGCMode returns table's tombstone_gc mode.
func (w *worker) getTombstoneGCMode(keyspace, name string, isView bool) (tombstoneGCMode, error) {
systemSchemaTable := "system_schema.tables"
columnName := "table_name"
if isView {
systemSchemaTable = "system_schema.views"
columnName = "view_name"
}
var ext map[string]string
q := qb.Select("system_schema.tables").
q := qb.Select(systemSchemaTable).
Columns("extensions").
Where(qb.Eq("keyspace_name"), qb.Eq("table_name")).
Where(qb.Eq("keyspace_name"), qb.Eq(columnName)).
Query(w.clusterSession).
Bind(keyspace, table)
Bind(keyspace, name)

defer q.Release()
err := q.Scan(&ext)
if err != nil {
return "", err
return "", errors.Wrap(err, "scan")
}

// Timeout (just using gc_grace_seconds) is the default mode
Expand All @@ -73,21 +104,22 @@ func (w *worker) getTableTombstoneGCMode(keyspace, table string) (tombstoneGCMod
return "", errors.Errorf("unrecognized tombstone_gc mode: %s", mode)
}

// setTableTombstoneGCMode alters 'tombstone_gc' mode.
func (w *worker) setTableTombstoneGCMode(ctx context.Context, keyspace, table string, mode tombstoneGCMode) error {
w.logger.Info(ctx, "Alter table's tombstone_gc mode",
"keyspace", keyspace,
"table", table,
)
// setTombstoneGCMode alters 'tombstone_gc' mode.
func (w *worker) setTombstoneGCMode(ctx context.Context, keyspace, name string, isView bool, mode tombstoneGCMode) error {
logger := w.logger.With("keyspace", keyspace, "name", name, "is_view", isView)

logger.Info(ctx, "Alter tombstone_gc mode")

op := func() error {
return w.clusterSession.ExecStmt(alterTableTombstoneGCStmt(keyspace, table, mode))
stmt := alterTableTombstoneGCStmt(keyspace, name, mode)
if isView {
stmt = alterViewTombstoneGCStmt(keyspace, name, mode)
}
return w.clusterSession.ExecStmt(stmt)
}

notify := func(err error, wait time.Duration) {
w.logger.Info(ctx, "Altering table's tombstone_gc mode failed",
"keyspace", keyspace,
"table", table,
logger.Info(ctx, "Altering tombstone_gc mode failed",
"error", err,
"wait", wait,
)
Expand All @@ -99,3 +131,7 @@ func (w *worker) setTableTombstoneGCMode(ctx context.Context, keyspace, table st
func alterTableTombstoneGCStmt(keyspace, table string, mode tombstoneGCMode) string {
return fmt.Sprintf(`ALTER TABLE %q.%q WITH tombstone_gc = {'mode': '%s'}`, keyspace, table, mode)
}

func alterViewTombstoneGCStmt(keyspace, view string, mode tombstoneGCMode) string {
return fmt.Sprintf(`ALTER MATERIALIZED VIEW %q.%q WITH tombstone_gc = {'mode': '%s'}`, keyspace, view, mode)
}
10 changes: 8 additions & 2 deletions pkg/service/one2onerestore/worker_validate_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import (
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/testutils"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/httpx"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

func TestWorkerValidateClustersIntegration(t *testing.T) {
Expand Down Expand Up @@ -254,8 +257,11 @@ func newTestWorker(t *testing.T, hosts []string) (*worker, *testutils.HackableRo
managerSession: managerSession,
client: sc,
clusterSession: clusterSession,
logger: log.NopLogger,
metrics: metrics.NewOne2OneRestoreMetrics(),
sessionFunc: func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return db.CreateSession(t, sc), nil
},
logger: log.NopLogger,
metrics: metrics.NewOne2OneRestoreMetrics(),
}
return w, hrt
}
Loading