Skip to content

Commit

Permalink
ddl: arg v2 for flashback cluster and alter table attributes (#56290)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
joechenrh authored Sep 29, 2024
1 parent 8f0baf4 commit 3ee8765
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 110 deletions.
158 changes: 73 additions & 85 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func closePDSchedule(ctx context.Context) error {
return infosync.SetPDScheduleConfig(ctx, closeMap)
}

func savePDSchedule(ctx context.Context, job *model.Job) error {
func savePDSchedule(ctx context.Context, args *model.FlashbackClusterArgs) error {
retValue, err := infosync.GetPDScheduleConfig(ctx)
if err != nil {
return err
Expand All @@ -90,7 +90,7 @@ func savePDSchedule(ctx context.Context, job *model.Job) error {
for _, key := range pdScheduleKey {
saveValue[key] = retValue[key]
}
job.Args[pdScheduleArgsOffset] = &saveValue
args.PDScheduleValue = saveValue
return nil
}

Expand Down Expand Up @@ -158,40 +158,21 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable)
func getGlobalSysVarAsBool(sess sessionctx.Context, name string) (bool, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(name)
if err != nil {
return "", errors.Trace(err)
return false, errors.Trace(err)
}
return val, nil
return variable.TiDBOptOn(val), nil
}

func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value)
}

func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value)
}

func getTiDBEnableAutoAnalyze(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAutoAnalyze)
if err != nil {
return "", errors.Trace(err)
func setGlobalSysVarFromBool(ctx context.Context, sess sessionctx.Context, name string, value bool) error {
sv := variable.On
if !value {
sv = variable.Off
}
return val, nil
}

func setTiDBSuperReadOnly(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBSuperReadOnly, value)
}

func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, name, sv)
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
Expand Down Expand Up @@ -231,13 +212,13 @@ func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context,
if err = closePDSchedule(ctx); err != nil {
return err
}
if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBEnableAutoAnalyze, false); err != nil {
return err
}
if err = setTiDBSuperReadOnly(ctx, se, variable.On); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBSuperReadOnly, true); err != nil {
return err
}
if err = setTiDBTTLJobEnable(ctx, se, variable.Off); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBTTLJobEnable, false); err != nil {
return err
}

Expand Down Expand Up @@ -354,16 +335,18 @@ type keyRangeMayExclude struct {
exclude bool
}

// appendContinuousKeyRanges merges not exclude continuous key ranges and appends
// mergeContinuousKeyRanges merges not exclude continuous key ranges and appends
// to given []kv.KeyRange, assuming the gap between key ranges has no data.
//
// Precondition: schemaKeyRanges is sorted by start key. schemaKeyRanges are
// non-overlapping.
func appendContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
func mergeContinuousKeyRanges(schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
var (
continuousStart, continuousEnd kv.Key
)

result := make([]kv.KeyRange, 0, 1)

for _, r := range schemaKeyRanges {
if r.exclude {
if continuousStart != nil {
Expand Down Expand Up @@ -398,9 +381,6 @@ func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashba
is := sess.GetDomainInfoSchema().(infoschema.InfoSchema)
schemas := is.AllSchemas()

// The semantic of keyRanges(output).
keyRanges := make([]kv.KeyRange, 0)

// get snapshot schema IDs.
flashbackSnapshotMeta := meta.NewReader(sess.GetStore().GetSnapshot(kv.NewVersion(flashbackTS)))
snapshotSchemas, err := flashbackSnapshotMeta.ListDatabases()
Expand Down Expand Up @@ -453,7 +433,7 @@ func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashba
return bytes.Compare(a.r.StartKey, b.r.StartKey)
})

keyRanges = appendContinuousKeyRanges(keyRanges, schemaKeyRanges)
keyRanges := mergeContinuousKeyRanges(schemaKeyRanges)

startKey := tablecodec.EncodeMetaKeyPrefix([]byte("DBs"))
keyRanges = append(keyRanges, kv.KeyRange{
Expand Down Expand Up @@ -681,7 +661,7 @@ func flashbackToVersion(
).RunOnRange(ctx, startKey, endKey)
}

func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []kv.KeyRange) {
func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []model.KeyRange) {
if s, ok := store.(kv.SplittableStore); ok {
for _, keys := range keyRanges {
for {
Expand Down Expand Up @@ -713,18 +693,14 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env")
}

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]any
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
var keyRanges []kv.KeyRange
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue, &keyRanges); err != nil {
args, err := model.GetFlashbackClusterArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

var totalRegions, completedRegions atomic.Uint64
totalRegions.Store(lockedRegions)
totalRegions.Store(args.LockedRegionCnt)

sess, err := w.sessPool.Get()
if err != nil {
Expand All @@ -736,54 +712,63 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and update job args.
case model.StateNone:
if err = savePDSchedule(w.ctx, job); err != nil {
if err = savePDSchedule(w.ctx, args); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
gcEnableValue, err := gcutil.CheckGCEnable(sess)

args.EnableGC, err = gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledOffset] = &gcEnableValue
autoAnalyzeValue, err = getTiDBEnableAutoAnalyze(sess)

args.EnableAutoAnalyze, err = getGlobalSysVarAsBool(sess, variable.TiDBEnableAutoAnalyze)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[autoAnalyzeOffset] = &autoAnalyzeValue
readOnlyValue, err = getTiDBSuperReadOnly(sess)

args.SuperReadOnly, err = getGlobalSysVarAsBool(sess, variable.TiDBSuperReadOnly)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyOffset] = &readOnlyValue
ttlJobEnableValue, err = getTiDBTTLJobEnable(sess)

args.EnableTTLJob, err = getGlobalSysVarAsBool(sess, variable.TiDBTTLJobEnable)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue

job.FillArgs(args)
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, jobCtx.metaMut, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, jobCtx.metaMut, job, args.FlashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
startTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.StartTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
keyRanges, err = getFlashbackKeyRanges(w.ctx, sess, flashbackTS)
keyRanges, err := getFlashbackKeyRanges(w.ctx, sess, args.FlashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
job.Args[keyRangesOffset] = keyRanges
args.FlashbackKeyRanges = make([]model.KeyRange, len(keyRanges))
for i, keyRange := range keyRanges {
args.FlashbackKeyRanges[i] = model.KeyRange{
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
}
}

job.FillArgs(args)
job.SchemaState = model.StateWriteOnly
return updateSchemaVersion(jobCtx, job)
// Stage 3, lock related key ranges.
Expand All @@ -794,27 +779,27 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
return updateSchemaVersion(jobCtx, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(w.ctx, jobCtx.store, keyRanges)
splitRegionsByKeyRanges(w.ctx, jobCtx.store, args.FlashbackKeyRanges)
totalRegions.Store(0)
for _, r := range keyRanges {
for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), flashbackTS, startTS, r)
stats, err := SendPrepareFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}, r.StartKey, r.EndKey); err != nil {
logutil.DDLLogger().Warn("Get error when do flashback", zap.Error(err))
return ver, err
}
}
job.Args[totalLockedRegionsOffset] = totalRegions.Load()
args.LockedRegionCnt = totalRegions.Load()

// We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC.
commitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.CommitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return ver, errors.Trace(err)
}
job.Args[commitTSOffset] = commitTS
job.FillArgs(args)
job.SchemaState = model.StateWriteReorganization
return ver, nil
// Stage 4, get key ranges and send flashback RPC.
Expand All @@ -827,11 +812,11 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
return ver, nil
}

for _, r := range keyRanges {
for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), flashbackTS, startTS, commitTS, r)
stats, err := SendFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, args.CommitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
logutil.DDLLogger().Info("flashback cluster stats",
zap.Uint64("complete regions", completedRegions.Load()),
Expand All @@ -858,44 +843,47 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
return nil
}

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]any
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
args, err := model.GetFlashbackClusterArgs(job)
if err != nil {
return errors.Trace(err)
}

sess, err := w.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.Put(sess)

err = kv.RunInNewTxn(w.ctx, w.store, true, func(context.Context, kv.Transaction) error {
if err = recoverPDSchedule(w.ctx, pdScheduleValue); err != nil {
return err
if err = recoverPDSchedule(w.ctx, args.PDScheduleValue); err != nil {
return errors.Trace(err)
}
if gcEnabled {

if args.EnableGC {
if err = gcutil.EnableGC(sess); err != nil {
return err
return errors.Trace(err)
}
}
if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil {
return err

if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBSuperReadOnly, args.SuperReadOnly); err != nil {
return errors.Trace(err)
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil {
return err
if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBTTLJobEnable, args.EnableTTLJob); err != nil {
return errors.Trace(err)
}
}

return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue)
if err := setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBEnableAutoAnalyze, args.EnableAutoAnalyze); err != nil {
return errors.Trace(err)
}

return nil
})
if err != nil {
return err
return errors.Trace(err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestGetTableDataKeyRanges(t *testing.T) {
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestAppendContinuousKeyRanges(t *testing.T) {
func TestMergeContinuousKeyRanges(t *testing.T) {
cases := []struct {
input []keyRangeMayExclude
expect []kv.KeyRange
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestAppendContinuousKeyRanges(t *testing.T) {
}

for i, ca := range cases {
ranges := appendContinuousKeyRanges([]kv.KeyRange{}, ca.input)
ranges := mergeContinuousKeyRanges(ca.input)
require.Equal(t, ca.expect, ranges, "case %d", i)
}
}
Expand Down
Loading

0 comments on commit 3ee8765

Please sign in to comment.