Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: arg v2 for flashback cluster and alter table attributes #56290

Merged
merged 10 commits into from
Sep 29, 2024
158 changes: 73 additions & 85 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func closePDSchedule(ctx context.Context) error {
return infosync.SetPDScheduleConfig(ctx, closeMap)
}

func savePDSchedule(ctx context.Context, job *model.Job) error {
func savePDSchedule(ctx context.Context, args *model.FlashbackClusterArgs) error {
retValue, err := infosync.GetPDScheduleConfig(ctx)
if err != nil {
return err
Expand All @@ -90,7 +90,7 @@ func savePDSchedule(ctx context.Context, job *model.Job) error {
for _, key := range pdScheduleKey {
saveValue[key] = retValue[key]
}
job.Args[pdScheduleArgsOffset] = &saveValue
args.PDScheduleValue = saveValue
return nil
}

Expand Down Expand Up @@ -158,40 +158,21 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable)
func getGlobalSysVarAsBool(sess sessionctx.Context, name string) (bool, error) {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(name)
if err != nil {
return "", errors.Trace(err)
return false, errors.Trace(err)
}
return val, nil
return variable.TiDBOptOn(val), nil
}

func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value)
}

func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value)
}

func getTiDBEnableAutoAnalyze(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAutoAnalyze)
if err != nil {
return "", errors.Trace(err)
func setGlobalSysVarFromBool(ctx context.Context, sess sessionctx.Context, name string, value bool) error {
sv := variable.On
if !value {
sv = variable.Off
}
return val, nil
}

func setTiDBSuperReadOnly(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBSuperReadOnly, value)
}

func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, name, sv)
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
Expand Down Expand Up @@ -231,13 +212,13 @@ func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context,
if err = closePDSchedule(ctx); err != nil {
return err
}
if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBEnableAutoAnalyze, false); err != nil {
return err
}
if err = setTiDBSuperReadOnly(ctx, se, variable.On); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBSuperReadOnly, true); err != nil {
return err
}
if err = setTiDBTTLJobEnable(ctx, se, variable.Off); err != nil {
if err = setGlobalSysVarFromBool(ctx, se, variable.TiDBTTLJobEnable, false); err != nil {
return err
}

Expand Down Expand Up @@ -354,16 +335,18 @@ type keyRangeMayExclude struct {
exclude bool
}

// appendContinuousKeyRanges merges not exclude continuous key ranges and appends
// mergeContinuousKeyRanges merges not exclude continuous key ranges and appends
// to given []kv.KeyRange, assuming the gap between key ranges has no data.
//
// Precondition: schemaKeyRanges is sorted by start key. schemaKeyRanges are
// non-overlapping.
func appendContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
func mergeContinuousKeyRanges(schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
var (
continuousStart, continuousEnd kv.Key
)

result := make([]kv.KeyRange, 0, 1)

for _, r := range schemaKeyRanges {
if r.exclude {
if continuousStart != nil {
Expand Down Expand Up @@ -398,9 +381,6 @@ func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashba
is := sess.GetDomainInfoSchema().(infoschema.InfoSchema)
schemas := is.AllSchemas()

// The semantic of keyRanges(output).
keyRanges := make([]kv.KeyRange, 0)

// get snapshot schema IDs.
flashbackSnapshotMeta := meta.NewReader(sess.GetStore().GetSnapshot(kv.NewVersion(flashbackTS)))
snapshotSchemas, err := flashbackSnapshotMeta.ListDatabases()
Expand Down Expand Up @@ -453,7 +433,7 @@ func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashba
return bytes.Compare(a.r.StartKey, b.r.StartKey)
})

keyRanges = appendContinuousKeyRanges(keyRanges, schemaKeyRanges)
keyRanges := mergeContinuousKeyRanges(schemaKeyRanges)

startKey := tablecodec.EncodeMetaKeyPrefix([]byte("DBs"))
keyRanges = append(keyRanges, kv.KeyRange{
Expand Down Expand Up @@ -681,7 +661,7 @@ func flashbackToVersion(
).RunOnRange(ctx, startKey, endKey)
}

func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []kv.KeyRange) {
func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []model.KeyRange) {
if s, ok := store.(kv.SplittableStore); ok {
for _, keys := range keyRanges {
for {
Expand Down Expand Up @@ -713,18 +693,14 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env")
}

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]any
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
var keyRanges []kv.KeyRange
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue, &keyRanges); err != nil {
args, err := model.GetFlashbackClusterArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

var totalRegions, completedRegions atomic.Uint64
totalRegions.Store(lockedRegions)
totalRegions.Store(args.LockedRegionCnt)

sess, err := w.sessPool.Get()
if err != nil {
Expand All @@ -736,54 +712,63 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and update job args.
case model.StateNone:
if err = savePDSchedule(w.ctx, job); err != nil {
if err = savePDSchedule(w.ctx, args); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
gcEnableValue, err := gcutil.CheckGCEnable(sess)

args.EnableGC, err = gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledOffset] = &gcEnableValue
autoAnalyzeValue, err = getTiDBEnableAutoAnalyze(sess)

args.EnableAutoAnalyze, err = getGlobalSysVarAsBool(sess, variable.TiDBEnableAutoAnalyze)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[autoAnalyzeOffset] = &autoAnalyzeValue
readOnlyValue, err = getTiDBSuperReadOnly(sess)

args.SuperReadOnly, err = getGlobalSysVarAsBool(sess, variable.TiDBSuperReadOnly)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyOffset] = &readOnlyValue
ttlJobEnableValue, err = getTiDBTTLJobEnable(sess)

args.EnableTTLJob, err = getGlobalSysVarAsBool(sess, variable.TiDBTTLJobEnable)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue

job.FillArgs(args)
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, t, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, t, job, args.FlashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
startTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.StartTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
keyRanges, err = getFlashbackKeyRanges(w.ctx, sess, flashbackTS)
keyRanges, err := getFlashbackKeyRanges(w.ctx, sess, args.FlashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
job.Args[keyRangesOffset] = keyRanges
args.FlashbackKeyRanges = make([]model.KeyRange, len(keyRanges))
for i, keyRange := range keyRanges {
args.FlashbackKeyRanges[i] = model.KeyRange{
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
}
}

job.FillArgs(args)
job.SchemaState = model.StateWriteOnly
return updateSchemaVersion(jobCtx, t, job)
// Stage 3, lock related key ranges.
Expand All @@ -794,27 +779,27 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
return updateSchemaVersion(jobCtx, t, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(w.ctx, jobCtx.store, keyRanges)
splitRegionsByKeyRanges(w.ctx, jobCtx.store, args.FlashbackKeyRanges)
totalRegions.Store(0)
for _, r := range keyRanges {
for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), flashbackTS, startTS, r)
stats, err := SendPrepareFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}, r.StartKey, r.EndKey); err != nil {
logutil.DDLLogger().Warn("Get error when do flashback", zap.Error(err))
return ver, err
}
}
job.Args[totalLockedRegionsOffset] = totalRegions.Load()
args.LockedRegionCnt = totalRegions.Load()

// We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC.
commitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.CommitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return ver, errors.Trace(err)
}
job.Args[commitTSOffset] = commitTS
job.FillArgs(args)
job.SchemaState = model.StateWriteReorganization
return ver, nil
// Stage 4, get key ranges and send flashback RPC.
Expand All @@ -827,11 +812,11 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *mo
return ver, nil
}

for _, r := range keyRanges {
for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), flashbackTS, startTS, commitTS, r)
stats, err := SendFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, args.CommitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
logutil.DDLLogger().Info("flashback cluster stats",
zap.Uint64("complete regions", completedRegions.Load()),
Expand All @@ -858,44 +843,47 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
return nil
}

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]any
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
args, err := model.GetFlashbackClusterArgs(job)
if err != nil {
return errors.Trace(err)
}

sess, err := w.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.Put(sess)

err = kv.RunInNewTxn(w.ctx, w.store, true, func(context.Context, kv.Transaction) error {
if err = recoverPDSchedule(w.ctx, pdScheduleValue); err != nil {
return err
if err = recoverPDSchedule(w.ctx, args.PDScheduleValue); err != nil {
return errors.Trace(err)
}
if gcEnabled {

if args.EnableGC {
if err = gcutil.EnableGC(sess); err != nil {
return err
return errors.Trace(err)
}
}
if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil {
return err

if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBSuperReadOnly, args.SuperReadOnly); err != nil {
return errors.Trace(err)
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil {
return err
if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBTTLJobEnable, args.EnableTTLJob); err != nil {
return errors.Trace(err)
}
}

return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue)
if err := setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBEnableAutoAnalyze, args.EnableAutoAnalyze); err != nil {
return errors.Trace(err)
}

return nil
})
if err != nil {
return err
return errors.Trace(err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestGetTableDataKeyRanges(t *testing.T) {
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestAppendContinuousKeyRanges(t *testing.T) {
func TestMergeContinuousKeyRanges(t *testing.T) {
cases := []struct {
input []keyRangeMayExclude
expect []kv.KeyRange
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestAppendContinuousKeyRanges(t *testing.T) {
}

for i, ca := range cases {
ranges := appendContinuousKeyRanges([]kv.KeyRange{}, ca.input)
ranges := mergeContinuousKeyRanges(ca.input)
require.Equal(t, ca.expect, ranges, "case %d", i)
}
}
Expand Down
Loading