From e33dbbc53afdbfb55e6e43f49dbed8d9f9612bec Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 17 Jul 2024 18:31:30 +0800 Subject: [PATCH] ddl: combine table/partition/db id allocation with job id (#54669) ref pingcap/tidb#54436 --- br/pkg/glue/glue.go | 4 +- br/pkg/gluetidb/glue.go | 4 +- br/pkg/gluetidb/mock/mock.go | 4 +- br/pkg/restore/internal/prealloc_db/db.go | 41 ++-- br/pkg/task/restore_test.go | 2 +- pkg/ddl/db_table_test.go | 14 +- pkg/ddl/ddl.go | 141 +++++++----- pkg/ddl/ddl_api.go | 132 ++++------- pkg/ddl/ddl_worker.go | 258 ++++++++++++++-------- pkg/ddl/ddl_worker_test.go | 4 +- pkg/ddl/executor_test.go | 247 +++++++++++++++++++-- pkg/ddl/foreign_key_test.go | 4 +- pkg/ddl/job_table.go | 26 ++- pkg/ddl/partition_test.go | 4 +- pkg/ddl/placement_policy_test.go | 4 +- pkg/ddl/placement_sql_test.go | 4 +- pkg/ddl/restart_test.go | 2 +- pkg/ddl/schema_test.go | 10 +- pkg/ddl/schematracker/checker.go | 9 +- pkg/ddl/schematracker/dm_tracker.go | 15 +- pkg/ddl/table_test.go | 24 +- pkg/executor/brie.go | 4 +- pkg/executor/brie_utils.go | 10 +- pkg/executor/brie_utils_test.go | 20 +- 24 files changed, 624 insertions(+), 363 deletions(-) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 482f8cf1e5d1b..5d5f611fa39e6 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -44,7 +44,7 @@ type Session interface { ExecuteInternal(ctx context.Context, sql string, args ...any) error CreateDatabase(ctx context.Context, schema *model.DBInfo) error CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, - cs ...ddl.CreateTableWithInfoConfigurier) error + cs ...ddl.CreateTableOption) error CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error Close() GetGlobalVariable(name string) (string, error) @@ -54,7 +54,7 @@ type Session interface { // BatchCreateTableSession is an interface to batch create table parallelly type BatchCreateTableSession interface { CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, - cs ...ddl.CreateTableWithInfoConfigurier) error + cs ...ddl.CreateTableOption) error } // Progress is an interface recording the current execution progress. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 4c308d58afaa7..95ac50220f88c 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -212,13 +212,13 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model. // CreateTables implements glue.BatchCreateTableSession. func (gs *tidbSession) CreateTables(_ context.Context, - tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + tables map[string][]*model.TableInfo, cs ...ddl.CreateTableOption) error { return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...)) } // CreateTable implements glue.Session. func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr, - table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + table *model.TableInfo, cs ...ddl.CreateTableOption) error { return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...)) } diff --git a/br/pkg/gluetidb/mock/mock.go b/br/pkg/gluetidb/mock/mock.go index 0fe4615519e4f..42cbd4a814657 100644 --- a/br/pkg/gluetidb/mock/mock.go +++ b/br/pkg/gluetidb/mock/mock.go @@ -79,14 +79,14 @@ func (*mockSession) CreatePlacementPolicy(_ context.Context, _ *model.PolicyInfo // CreateTables implements glue.BatchCreateTableSession. func (*mockSession) CreateTables(_ context.Context, _ map[string][]*model.TableInfo, - _ ...ddl.CreateTableWithInfoConfigurier) error { + _ ...ddl.CreateTableOption) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } // CreateTable implements glue.Session. func (*mockSession) CreateTable(_ context.Context, _ model.CIStr, - _ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error { + _ *model.TableInfo, _ ...ddl.CreateTableOption) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } diff --git a/br/pkg/restore/internal/prealloc_db/db.go b/br/pkg/restore/internal/prealloc_db/db.go index 7c9a661d8b895..8664771b486f9 100644 --- a/br/pkg/restore/internal/prealloc_db/db.go +++ b/br/pkg/restore/internal/prealloc_db/db.go @@ -280,26 +280,29 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, return nil } -func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf { - return func(ti *model.TableInfo) bool { - if db.preallocedIDs == nil { - return true - } - prealloced := db.preallocedIDs.PreallocedFor(ti) - if prealloced { - log.Info("reusing table ID", zap.Stringer("table", ti.Name)) - } - return !prealloced +func (db *DB) canReuseTableID(ti *model.TableInfo) bool { + if db.preallocedIDs == nil { + return false + } + prealloced := db.preallocedIDs.PreallocedFor(ti) + if prealloced { + log.Info("reusing table ID", zap.Stringer("table", ti.Name)) } + return prealloced } // CreateTables execute a internal CREATE TABLES. func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ddlTables map[restore.UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error { if batchSession, ok := db.se.(glue.BatchCreateTableSession); ok { - m := map[string][]*model.TableInfo{} + idReusableTbls := map[string][]*model.TableInfo{} + idNonReusableTbls := map[string][]*model.TableInfo{} for _, table := range tables { - m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info) + if db.canReuseTableID(table.Info) { + idReusableTbls[table.DB.Name.L] = append(idReusableTbls[table.DB.Name.L], table.Info) + } else { + idNonReusableTbls[table.DB.Name.L] = append(idNonReusableTbls[table.DB.Name.L], table.Info) + } if !supportPolicy { log.Info("set placementPolicyRef to nil when target tidb not support policy", zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name)) @@ -314,8 +317,15 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ttlInfo.Enable = false } } - if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil { - return err + if len(idReusableTbls) > 0 { + if err := batchSession.CreateTables(ctx, idReusableTbls, ddl.WithIDAllocated(true)); err != nil { + return err + } + } + if len(idNonReusableTbls) > 0 { + if err := batchSession.CreateTables(ctx, idNonReusableTbls); err != nil { + return err + } } for _, table := range tables { @@ -345,7 +355,8 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ttlInfo.Enable = false } - err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter()) + reuseID := db.canReuseTableID(table.Info) + err := db.se.CreateTable(ctx, table.DB.Name, table.Info, ddl.WithIDAllocated(reuseID)) if err != nil { log.Error("create table failed", zap.Stringer("db", table.DB.Name), diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 4f9098a27fdb9..4f50946acd449 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -134,7 +134,7 @@ func TestPreCheckTableClusterIndex(t *testing.T) { Collate: "utf8mb4_bin", }, } - err = se.CreateTable(ctx, tables[i].DB.Name, tables[i].Info, ddl.OnExistIgnore) + err = se.CreateTable(ctx, tables[i].DB.Name, tables[i].Info, ddl.WithOnExist(ddl.OnExistIgnore)) require.NoError(t, err) } diff --git a/pkg/ddl/db_table_test.go b/pkg/ddl/db_table_test.go index 07a524a3c348a..f24e20eefd400 100644 --- a/pkg/ddl/db_table_test.go +++ b/pkg/ddl/db_table_test.go @@ -322,9 +322,7 @@ func TestCreateTableWithInfo(t *testing.T) { Name: model.NewCIStr("t"), }} - require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - }))) + require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.WithOnExist(ddl.OnExistError), ddl.WithIDAllocated(true))) tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 't'").Check(testkit.Rows("42042")) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) @@ -342,9 +340,7 @@ func TestCreateTableWithInfo(t *testing.T) { Name: model.NewCIStr("tt"), }} tk.Session().SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return true - }))) + require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.WithOnExist(ddl.OnExistError))) idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tt'").Rows()[0][0].(string) require.True(t, ok) idGenNum, err := strconv.ParseInt(idGen, 10, 64) @@ -374,7 +370,7 @@ func TestBatchCreateTable(t *testing.T) { // correct name tk.Session().SetValue(sessionctx.QueryString, "skip") - err := d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.OnExistError) + err := d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.WithOnExist(ddl.OnExistError)) require.NoError(t, err) tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) @@ -389,7 +385,7 @@ func TestBatchCreateTable(t *testing.T) { // duplicated name infos[1].Name = model.NewCIStr("tables_1") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.OnExistError) + err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.WithOnExist(ddl.OnExistError)) require.True(t, terror.ErrorEqual(err, infoschema.ErrTableExists)) newinfo := &model.TableInfo{ @@ -418,7 +414,7 @@ func TestBatchCreateTable(t *testing.T) { tk.Session().SetValue(sessionctx.QueryString, "skip") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), []*model.TableInfo{newinfo}, ddl.OnExistError) + err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), []*model.TableInfo{newinfo}, ddl.WithOnExist(ddl.OnExistError)) require.NoError(t, err) } diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 36dd46691f0f5..e4e6061d1df1c 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -104,45 +104,44 @@ const ( // OnExist specifies what to do when a new object has a name collision. type OnExist uint8 -// AllocTableIDIf specifies whether to retain the old table ID. -// If this returns "false", then we would assume the table ID has been -// allocated before calling `CreateTableWithInfo` family. -type AllocTableIDIf func(*model.TableInfo) bool - -// CreateTableWithInfoConfig is the configuration of `CreateTableWithInfo`. -type CreateTableWithInfoConfig struct { - OnExist OnExist - ShouldAllocTableID AllocTableIDIf +// CreateTableConfig is the configuration of `CreateTableWithInfo`. +type CreateTableConfig struct { + OnExist OnExist + // IDAllocated indicates whether the job has allocated all IDs for tables affected + // in the job, if true, DDL will not allocate IDs for them again, it's only used + // by BR now. By reusing IDs BR can save a lot of works such as rewriting table + // IDs in backed up KVs. + IDAllocated bool } -// CreateTableWithInfoConfigurier is the "diff" which can be applied to the -// CreateTableWithInfoConfig, currently implementations are "OnExist" and "AllocTableIDIf". -type CreateTableWithInfoConfigurier interface { - // Apply the change over the config. - Apply(*CreateTableWithInfoConfig) -} +// CreateTableOption is the option for creating table. +type CreateTableOption func(*CreateTableConfig) -// GetCreateTableWithInfoConfig applies the series of configurier from default config +// GetCreateTableConfig applies the series of config options from default config // and returns the final config. -func GetCreateTableWithInfoConfig(cs []CreateTableWithInfoConfigurier) CreateTableWithInfoConfig { - config := CreateTableWithInfoConfig{} +func GetCreateTableConfig(cs []CreateTableOption) CreateTableConfig { + cfg := CreateTableConfig{} for _, c := range cs { - c.Apply(&config) + c(&cfg) } - if config.ShouldAllocTableID == nil { - config.ShouldAllocTableID = func(*model.TableInfo) bool { return true } - } - return config + return cfg } -// Apply implements Configurier. -func (o OnExist) Apply(c *CreateTableWithInfoConfig) { - c.OnExist = o +// WithOnExist applies the OnExist option. +func WithOnExist(o OnExist) CreateTableOption { + return func(cfg *CreateTableConfig) { + cfg.OnExist = o + } } -// Apply implements Configurier. -func (a AllocTableIDIf) Apply(c *CreateTableWithInfoConfig) { - c.ShouldAllocTableID = a +// WithIDAllocated applies the IDAllocated option. +// WARNING!!!: if idAllocated == true, DDL will NOT allocate IDs by itself. That +// means if the caller can not promise ID is unique, then we got inconsistency. +// This option is only exposed to be used by BR. +func WithIDAllocated(idAllocated bool) CreateTableOption { + return func(cfg *CreateTableConfig) { + cfg.IDAllocated = idAllocated + } } const ( @@ -216,13 +215,13 @@ type DDL interface { schema model.CIStr, info *model.TableInfo, involvingRef []model.InvolvingSchemaInfo, - cs ...CreateTableWithInfoConfigurier) error + cs ...CreateTableOption) error // BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables. BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, - cs ...CreateTableWithInfoConfigurier) error + cs ...CreateTableOption) error // CreatePlacementPolicyWithInfo creates a placement policy // @@ -263,18 +262,39 @@ type DDL interface { GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema // DoDDLJob does the DDL job, it's exported for test. DoDDLJob(ctx sessionctx.Context, job *model.Job) error + // DoDDLJobWrapper similar to DoDDLJob, but with JobWrapper as input. + // exported for testing. + // TODO remove it after decouple components of DDL. + DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) error +} + +// JobWrapper is used to wrap a job and some other information. +// exported for testing. +type JobWrapper struct { + *model.Job + // IDAllocated see config of same name in CreateTableConfig. + // exported for test. + IDAllocated bool + // job submission is run in async, we use this channel to notify the caller. + // for local job we might combine multiple jobs into one, append the ErrChs to + // this slice. + ErrChs []chan error + cacheErr error } -type limitJobTask struct { - job *model.Job - // when we combine multiple jobs into one task, - // append the errChs to this slice. - errChs []chan error - cacheErr error +// NewJobWrapper creates a new JobWrapper. +// exported for testing. +func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper { + return &JobWrapper{ + Job: job, + IDAllocated: idAllocated, + ErrChs: []chan error{make(chan error)}, + } } -func (t *limitJobTask) NotifyError(err error) { - for _, errCh := range t.errChs { +// NotifyError notifies the error to all error channels. +func (t *JobWrapper) NotifyError(err error) { + for _, errCh := range t.ErrChs { errCh <- err } } @@ -283,9 +303,9 @@ func (t *limitJobTask) NotifyError(err error) { type ddl struct { m sync.RWMutex wg tidbutil.WaitGroupWrapper // It's only used to deal with data race in restart_test. - limitJobCh chan *limitJobTask + limitJobCh chan *JobWrapper // limitJobChV2 is used to limit the number of jobs being executed in local worker. - limitJobChV2 chan *limitJobTask + limitJobChV2 chan *JobWrapper *ddlCtx sessPool *sess.Pool @@ -297,7 +317,7 @@ type ddl struct { ddlJobNotifyCh chan struct{} // localJobCh is used to delivery job in local TiDB nodes. - localJobCh chan *limitJobTask + localJobCh chan *JobWrapper // globalIDLocal locks global id to reduce write conflict. globalIDLock sync.Mutex } @@ -756,11 +776,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl { d := &ddl{ ddlCtx: ddlCtx, - limitJobCh: make(chan *limitJobTask, batchAddingJobs), - limitJobChV2: make(chan *limitJobTask, batchAddingJobs), + limitJobCh: make(chan *JobWrapper, batchAddingJobs), + limitJobChV2: make(chan *JobWrapper, batchAddingJobs), enableTiFlashPoll: atomicutil.NewBool(true), ddlJobNotifyCh: make(chan struct{}, 100), - localJobCh: make(chan *limitJobTask, 1), + localJobCh: make(chan *JobWrapper, 1), } taskexecutor.RegisterTaskType(proto.Backfill, @@ -1003,12 +1023,6 @@ func (d *ddl) genGlobalIDs(count int) ([]int64, error) { d.globalIDLock.Lock() defer d.globalIDLock.Unlock() err := kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { - failpoint.Inject("mockGenGlobalIDFail", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(errors.New("gofail genGlobalIDs error")) - } - }) - m := meta.NewMeta(txn) var err error ret, err = m.GenGlobalIDs(count) @@ -1152,8 +1166,8 @@ func setDDLJobMode(job *model.Job) { job.LocalMode = false } -func (d *ddl) deliverJobTask(task *limitJobTask) { - if task.job.LocalMode { +func (d *ddl) deliverJobTask(task *JobWrapper) { + if task.LocalMode { d.limitJobChV2 <- task } else { d.limitJobCh <- task @@ -1165,6 +1179,11 @@ func (d *ddl) deliverJobTask(task *limitJobTask) { // - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel // - other: found in history DDL job and return that job error func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { + return d.DoDDLJobWrapper(ctx, NewJobWrapper(job, false)) +} + +func (d *ddl) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) error { + job := jobW.Job job.TraceInfo = &model.TraceInfo{ ConnectionID: ctx.GetSessionVars().ConnectionID, SessionAlias: ctx.GetSessionVars().SessionAlias, @@ -1177,23 +1196,23 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { // Get a global job ID and put the DDL job in the queue. setDDLJobQuery(ctx, job) setDDLJobMode(job) - task := &limitJobTask{job, []chan error{make(chan error)}, nil} - d.deliverJobTask(task) + d.deliverJobTask(jobW) failpoint.Inject("mockParallelSameDDLJobTwice", func(val failpoint.Value) { if val.(bool) { - <-task.errChs[0] + <-jobW.ErrChs[0] // The same job will be put to the DDL queue twice. job = job.Clone() - task1 := &limitJobTask{job, []chan error{make(chan error)}, nil} - d.deliverJobTask(task1) + newJobW := NewJobWrapper(job, jobW.IDAllocated) + d.deliverJobTask(newJobW) // The second job result is used for test. - task = task1 + jobW = newJobW } }) - // worker should restart to continue handling tasks in limitJobCh, and send back through task.err - err := <-task.errChs[0] + // worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err + err := <-jobW.ErrChs[0] + // job.ID must be allocated after previous channel receive returns nil. defer d.delJobDoneCh(job.ID) if err != nil { // The transaction of enqueuing job is failed. diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index 08230de72fc7f..d64a75339007b 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -209,15 +209,7 @@ func (d *ddl) CreateSchemaWithInfo( return errors.Trace(err) } - // FIXME: support `tryRetainID`. - genIDs, err := d.genGlobalIDs(1) - if err != nil { - return errors.Trace(err) - } - dbInfo.ID = genIDs[0] - job := &model.Job{ - SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, Type: model.ActionCreateSchema, BinlogInfo: &model.HistoryInfo{}, @@ -236,7 +228,7 @@ func (d *ddl) CreateSchemaWithInfo( }) } - err = d.DoDDLJob(ctx, job) + err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) if infoschema.ErrDatabaseExists.Equal(err) && onExist == OnExistIgnore { @@ -2716,15 +2708,6 @@ func BuildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh return tbInfo, nil } -func (d *ddl) assignTableID(tbInfo *model.TableInfo) error { - genIDs, err := d.genGlobalIDs(1) - if err != nil { - return errors.Trace(err) - } - tbInfo.ID = genIDs[0] - return nil -} - func (d *ddl) assignPartitionIDs(defs []model.PartitionDefinition) error { genIDs, err := d.genGlobalIDs(len(defs)) if err != nil { @@ -2788,7 +2771,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, involvingRef, onExist) + return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, involvingRef, WithOnExist(onExist)) } func setTemporaryType(_ sessionctx.Context, tbInfo *model.TableInfo, s *ast.CreateTableStmt) error { @@ -2809,15 +2792,12 @@ func setTemporaryType(_ sessionctx.Context, tbInfo *model.TableInfo, s *ast.Crea // createTableWithInfoJob returns the table creation job. // WARNING: it may return a nil job, which means you don't need to submit any DDL job. -// WARNING!!!: if retainID == true, it will not allocate ID by itself. That means if the caller -// can not promise ID is unique, then we got inconsistency. func (d *ddl) createTableWithInfoJob( ctx sessionctx.Context, dbName model.CIStr, tbInfo *model.TableInfo, involvingRef []model.InvolvingSchemaInfo, onExist OnExist, - retainID bool, ) (job *model.Job, err error) { is := d.GetInfoSchemaWithInterceptor(ctx) schema, ok := is.SchemaByName(dbName) @@ -2852,18 +2832,6 @@ func (d *ddl) createTableWithInfoJob( } } - if !retainID { - if err := d.assignTableID(tbInfo); err != nil { - return nil, errors.Trace(err) - } - - if tbInfo.Partition != nil { - if err := d.assignPartitionIDs(tbInfo.Partition.Definitions); err != nil { - return nil, errors.Trace(err) - } - } - } - if err := checkTableInfoValidExtra(tbInfo); err != nil { return nil, err } @@ -2896,7 +2864,6 @@ func (d *ddl) createTableWithInfoJob( job = &model.Job{ SchemaID: schema.ID, - TableID: tbInfo.ID, SchemaName: schema.Name.L, TableName: tbInfo.Name.L, Type: actionType, @@ -2972,12 +2939,12 @@ func (d *ddl) CreateTableWithInfo( dbName model.CIStr, tbInfo *model.TableInfo, involvingRef []model.InvolvingSchemaInfo, - cs ...CreateTableWithInfoConfigurier, + cs ...CreateTableOption, ) (err error) { - c := GetCreateTableWithInfoConfig(cs) + c := GetCreateTableConfig(cs) job, err := d.createTableWithInfoJob( - ctx, dbName, tbInfo, involvingRef, c.OnExist, !c.ShouldAllocTableID(tbInfo), + ctx, dbName, tbInfo, involvingRef, c.OnExist, ) if err != nil { return err @@ -2986,7 +2953,9 @@ func (d *ddl) CreateTableWithInfo( return nil } - err = d.DoDDLJob(ctx, job) + jobW := NewJobWrapper(job, c.IDAllocated) + + err = d.DoDDLJobWrapper(ctx, jobW) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. if c.OnExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { @@ -3004,7 +2973,7 @@ func (d *ddl) CreateTableWithInfo( func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, dbName model.CIStr, infos []*model.TableInfo, - cs ...CreateTableWithInfoConfigurier, + cs ...CreateTableOption, ) error { failpoint.Inject("RestoreBatchCreateTableEntryTooLarge", func(val failpoint.Value) { injectBatchSize := val.(int) @@ -3012,21 +2981,23 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, failpoint.Return(kv.ErrEntryTooLarge) } }) - c := GetCreateTableWithInfoConfig(cs) + c := GetCreateTableConfig(cs) - jobs := &model.Job{ - BinlogInfo: &model.HistoryInfo{}, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, - } + jobW := NewJobWrapper( + &model.Job{ + BinlogInfo: &model.HistoryInfo{}, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + SQLMode: ctx.GetSessionVars().SQLMode, + }, + c.IDAllocated, + ) args := make([]*model.TableInfo, 0, len(infos)) var err error - // 1. counts how many IDs are there - // 2. if there is any duplicated table name - totalID := 0 + // check if there are any duplicated table names duplication := make(map[string]struct{}) + // TODO filter those duplicated info out. for _, info := range infos { if _, ok := duplication[info.Name.L]; ok { err = infoschema.ErrTableExists.FastGenByArgs("can not batch create tables with same name") @@ -3040,31 +3011,10 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, } duplication[info.Name.L] = struct{}{} - - totalID++ - parts := info.GetPartitionInfo() - if parts != nil { - totalID += len(parts.Definitions) - } - } - - genIDs, err := d.genGlobalIDs(totalID) - if err != nil { - return errors.Trace(err) } for _, info := range infos { - if c.ShouldAllocTableID(info) { - info.ID, genIDs = genIDs[0], genIDs[1:] - - if parts := info.GetPartitionInfo(); parts != nil { - for i := range parts.Definitions { - parts.Definitions[i].ID, genIDs = genIDs[0], genIDs[1:] - } - } - } - - job, err := d.createTableWithInfoJob(ctx, dbName, info, nil, c.OnExist, true) + job, err := d.createTableWithInfoJob(ctx, dbName, info, nil, c.OnExist) if err != nil { return errors.Trace(err) } @@ -3072,12 +3022,12 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, continue } - // if jobs.Type == model.ActionCreateTables, it is initialized - // if not, initialize jobs by job.XXXX - if jobs.Type != model.ActionCreateTables { - jobs.Type = model.ActionCreateTables - jobs.SchemaID = job.SchemaID - jobs.SchemaName = job.SchemaName + // if jobW.Type == model.ActionCreateTables, it is initialized + // if not, initialize jobW by job.XXXX + if jobW.Type != model.ActionCreateTables { + jobW.Type = model.ActionCreateTables + jobW.SchemaID = job.SchemaID + jobW.SchemaName = job.SchemaName } // append table job args @@ -3086,37 +3036,37 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, return errors.Trace(fmt.Errorf("except table info")) } args = append(args, info) + jobW.InvolvingSchemaInfo = append(jobW.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ + Database: dbName.L, + Table: info.Name.L, + }) if sharedInv := getSharedInvolvingSchemaInfo(info); len(sharedInv) > 0 { - jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ - Database: dbName.L, - Table: info.Name.L, - }) - jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, sharedInv...) + jobW.InvolvingSchemaInfo = append(jobW.InvolvingSchemaInfo, sharedInv...) } } if len(args) == 0 { return nil } - jobs.Args = append(jobs.Args, args) - jobs.Args = append(jobs.Args, ctx.GetSessionVars().ForeignKeyChecks) + jobW.Args = append(jobW.Args, args) + jobW.Args = append(jobW.Args, ctx.GetSessionVars().ForeignKeyChecks) - err = d.DoDDLJob(ctx, jobs) + err = d.DoDDLJobWrapper(ctx, jobW) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. if c.OnExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } - return errors.Trace(d.callHookOnChanged(jobs, err)) + return errors.Trace(d.callHookOnChanged(jobW.Job, err)) } for j := range args { - if err = d.createTableWithInfoPost(ctx, args[j], jobs.SchemaID); err != nil { - return errors.Trace(d.callHookOnChanged(jobs, err)) + if err = d.createTableWithInfoPost(ctx, args[j], jobW.SchemaID); err != nil { + return errors.Trace(d.callHookOnChanged(jobW.Job, err)) } } - return d.callHookOnChanged(jobs, err) + return d.callHookOnChanged(jobW.Job, err) } // BuildQueryStringFromJobs takes a slice of Jobs and concatenates their @@ -3389,7 +3339,7 @@ func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err err onExist = OnExistReplace } - return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, onExist) + return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, WithOnExist(onExist)) } // BuildViewInfo builds a ViewInfo structure from an ast.CreateViewStmt. @@ -8769,7 +8719,7 @@ func (d *ddl) CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStm onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, nil, onExist) + return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, nil, WithOnExist(onExist)) } func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error { diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 97a705761f02a..111aeca1946d3 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -216,21 +216,21 @@ func asyncNotify(ch chan struct{}) { } } -func (d *ddl) limitDDLJobs(ch chan *limitJobTask, handler func(tasks []*limitJobTask)) { +func (d *ddl) limitDDLJobs(ch chan *JobWrapper, handler func([]*JobWrapper)) { defer tidbutil.Recover(metrics.LabelDDL, "limitDDLJobs", nil, true) - tasks := make([]*limitJobTask, 0, batchAddingJobs) + jobWs := make([]*JobWrapper, 0, batchAddingJobs) for { select { // the channel is never closed - case task := <-ch: - tasks = tasks[:0] + case jobW := <-ch: + jobWs = jobWs[:0] jobLen := len(ch) - tasks = append(tasks, task) + jobWs = append(jobWs, jobW) for i := 0; i < jobLen; i++ { - tasks = append(tasks, <-ch) + jobWs = append(jobWs, <-ch) } - handler(tasks) + handler(jobWs) case <-d.ctx.Done(): return } @@ -238,51 +238,51 @@ func (d *ddl) limitDDLJobs(ch chan *limitJobTask, handler func(tasks []*limitJob } // addBatchDDLJobsV1 gets global job IDs and puts the DDL jobs in the DDL queue. -func (d *ddl) addBatchDDLJobsV1(tasks []*limitJobTask) { +func (d *ddl) addBatchDDLJobsV1(jobWs []*JobWrapper) { startTime := time.Now() var err error // DDLForce2Queue is a flag to tell DDL worker to always push the job to the DDL queue. toTable := !variable.DDLForce2Queue.Load() if toTable { - err = d.addBatchDDLJobs(tasks) + err = d.addBatchDDLJobs(jobWs) } else { - err = d.addBatchDDLJobs2Queue(tasks) + err = d.addBatchDDLJobs2Queue(jobWs) } var jobs string - for _, task := range tasks { + for _, jobW := range jobWs { if err == nil { - err = task.cacheErr + err = jobW.cacheErr } - task.NotifyError(err) - jobs += task.job.String() + "; " - metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(), + jobW.NotifyError(err) + jobs += jobW.Job.String() + "; " + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, jobW.Job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) } if err != nil { logutil.DDLLogger().Warn("add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) } else { logutil.DDLLogger().Info("add DDL jobs", - zap.Int("batch count", len(tasks)), + zap.Int("batch count", len(jobWs)), zap.String("jobs", jobs), zap.Bool("table", toTable)) } } // addBatchLocalDDLJobs gets global job IDs and delivery the DDL jobs to local TiDB -func (d *ddl) addBatchLocalDDLJobs(tasks []*limitJobTask) { - if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil { - tasks = newTasks +func (d *ddl) addBatchLocalDDLJobs(jobWs []*JobWrapper) { + if newJobWs, err := combineBatchCreateTableJobs(jobWs); err == nil { + jobWs = newJobWs } - err := d.addBatchDDLJobs(tasks) + err := d.addBatchDDLJobs(jobWs) if err != nil { - for _, task := range tasks { - task.NotifyError(err) + for _, jobW := range jobWs { + jobW.NotifyError(err) } logutil.DDLLogger().Error("add DDL jobs failed", zap.Bool("local_mode", true), zap.Error(err)) } else { logutil.DDLLogger().Info("add DDL jobs", zap.Bool("local_mode", true), - zap.Int("batch count", len(tasks))) + zap.Int("batch count", len(jobWs))) } } @@ -319,14 +319,14 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error { return nil } -func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error { +func (d *ddl) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) // lock to reduce conflict d.globalIDLock.Lock() defer d.globalIDLock.Unlock() return kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - ids, err := t.GenGlobalIDs(len(tasks)) + ids, err := t.GenGlobalIDs(len(jobWs)) if err != nil { return errors.Trace(err) } @@ -335,8 +335,8 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error { return errors.Trace(err) } - for i, task := range tasks { - job := task.job + for i, jobW := range jobWs { + job := jobW.Job job.Version = currentVersion job.StartTS = txn.StartTS() job.ID = ids[i] @@ -348,7 +348,6 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error { if job.MayNeedReorg() { jobListKey = meta.AddIndexJobListKey } - injectModifyJobArgFailPoint(job) if err = t.EnQueueDDLJob(job, jobListKey); err != nil { return errors.Trace(err) } @@ -375,16 +374,21 @@ func (*ddl) checkFlashbackJobInQueue(t *meta.Meta) error { return nil } -func injectModifyJobArgFailPoint(job *model.Job) { +// TODO this failpoint is only checking how job scheduler handle +// corrupted job args, we should test it there by UT, not here. +func injectModifyJobArgFailPoint(jobWs []*JobWrapper) { failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { if val.(bool) { - // Corrupt the DDL job argument. - if job.Type == model.ActionMultiSchemaChange { - if len(job.MultiSchemaInfo.SubJobs) > 0 && len(job.MultiSchemaInfo.SubJobs[0].Args) > 0 { - job.MultiSchemaInfo.SubJobs[0].Args[0] = 1 + for _, jobW := range jobWs { + job := jobW.Job + // Corrupt the DDL job argument. + if job.Type == model.ActionMultiSchemaChange { + if len(job.MultiSchemaInfo.SubJobs) > 0 && len(job.MultiSchemaInfo.SubJobs[0].Args) > 0 { + job.MultiSchemaInfo.SubJobs[0].Args[0] = 1 + } + } else if len(job.Args) > 0 { + job.Args[0] = 1 } - } else if len(job.Args) > 0 { - job.Args[0] = 1 } } }) @@ -400,10 +404,10 @@ func setJobStateToQueueing(job *model.Job) { } // addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL job table or local worker. -func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { +func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error { var err error - if len(tasks) == 0 { + if len(jobWs) == 0 { return nil } @@ -448,9 +452,8 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { return errors.Trace(err) } - jobs := make([]*model.Job, 0, len(tasks)) - for _, task := range tasks { - job := task.job + for _, jobW := range jobWs { + job := jobW.Job job.Version = currentVersion job.StartTS = startTS job.BDRRole = bdrRole @@ -474,75 +477,152 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) && !job.LocalMode { if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil { logutil.DDLUpgradingLogger().Warn("pause user DDL by system failed", zap.Stringer("job", job), zap.Error(err)) - task.cacheErr = err + jobW.cacheErr = err continue } logutil.DDLUpgradingLogger().Info("pause user DDL by system successful", zap.Stringer("job", job)) } - - if _, err := job.Encode(true); err != nil { - return err - } - - jobs = append(jobs, job) - injectModifyJobArgFailPoint(job) } se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) ddlSe := sess.NewSession(se) - localMode := tasks[0].job.LocalMode + localMode := jobWs[0].Job.LocalMode if localMode { - if err = fillJobIDs(ctx, ddlSe, tasks); err != nil { + if err = fillJobRelatedGIDs(ctx, ddlSe, jobWs); err != nil { return err } - for _, task := range tasks { - d.localJobCh <- task + for _, jobW := range jobWs { + if _, err := jobW.Encode(true); err != nil { + return err + } + d.localJobCh <- jobW } return nil } - if err = GenIDAndInsertJobsWithRetry(ctx, ddlSe, jobs); err != nil { + if err = GenGIDAndInsertJobsWithRetry(ctx, ddlSe, jobWs); err != nil { return errors.Trace(err) } - for _, job := range jobs { - d.initJobDoneCh(job.ID) + for _, jobW := range jobWs { + d.initJobDoneCh(jobW.ID) } return nil } -// GenIDAndInsertJobsWithRetry generate job ID and inserts DDL jobs to the DDL job +// GenGIDAndInsertJobsWithRetry generate job related global ID and inserts DDL jobs to the DDL job // table with retry. job id allocation and job insertion are in the same transaction, // as we want to make sure DDL jobs are inserted in id order, then we can query from // a min job ID when scheduling DDL jobs to mitigate https://github.com/pingcap/tidb/issues/52905. -// so this function has side effect, it will set the job id of 'jobs'. -func GenIDAndInsertJobsWithRetry(ctx context.Context, ddlSe *sess.Session, jobs []*model.Job) error { - return genIDAndCallWithRetry(ctx, ddlSe, len(jobs), func(ids []int64) error { - for idx := range jobs { - jobs[idx].ID = ids[idx] - } - return insertDDLJobs2Table(ctx, ddlSe, jobs...) +// so this function has side effect, it will set table/db/job id of 'jobs'. +func GenGIDAndInsertJobsWithRetry(ctx context.Context, ddlSe *sess.Session, jobWs []*JobWrapper) error { + count := getRequiredGIDCount(jobWs) + return genGIDAndCallWithRetry(ctx, ddlSe, count, func(ids []int64) error { + failpoint.Inject("mockGenGlobalIDFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.New("gofail genGlobalIDs error")) + } + }) + assignGIDsForJobs(jobWs, ids) + injectModifyJobArgFailPoint(jobWs) + return insertDDLJobs2Table(ctx, ddlSe, jobWs...) }) } -func fillJobIDs(ctx context.Context, ddlSe *sess.Session, tasks []*limitJobTask) error { +// fillJobRelatedGIDs similar to GenGIDAndInsertJobsWithRetry, but only fill job related global IDs. +func fillJobRelatedGIDs(ctx context.Context, ddlSe *sess.Session, jobWs []*JobWrapper) error { var allocatedIDs []int64 - if err := genIDAndCallWithRetry(ctx, ddlSe, len(tasks), func(ids []int64) error { + count := getRequiredGIDCount(jobWs) + if err := genGIDAndCallWithRetry(ctx, ddlSe, count, func(ids []int64) error { allocatedIDs = ids return nil }); err != nil { return errors.Trace(err) } - for i, task := range tasks { - task.job.ID = allocatedIDs[i] - } + assignGIDsForJobs(jobWs, allocatedIDs) return nil } -// genIDAndCallWithRetry generates global IDs and calls the function with retry. +// getRequiredGIDCount returns the count of required global IDs for the jobs. it's calculated +// as: the count of jobs + the count of IDs for the jobs which do NOT have pre-allocated ID. +func getRequiredGIDCount(jobWs []*JobWrapper) int { + count := len(jobWs) + idCountForTable := func(info *model.TableInfo) int { + c := 1 + if partitionInfo := info.GetPartitionInfo(); partitionInfo != nil { + c += len(partitionInfo.Definitions) + } + return c + } + for _, jobW := range jobWs { + if jobW.IDAllocated { + continue + } + switch jobW.Type { + case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: + info := jobW.Args[0].(*model.TableInfo) + count += idCountForTable(info) + case model.ActionCreateTables: + infos := jobW.Args[0].([]*model.TableInfo) + for _, info := range infos { + count += idCountForTable(info) + } + case model.ActionCreateSchema: + count++ + } + // TODO support other type of jobs + } + return count +} + +// assignGIDsForJobs should be used with getRequiredGIDCount, and len(ids) must equal +// what getRequiredGIDCount returns. +func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { + idx := 0 + + assignIDsForTable := func(info *model.TableInfo) { + info.ID = ids[idx] + idx++ + if partitionInfo := info.GetPartitionInfo(); partitionInfo != nil { + for i := range partitionInfo.Definitions { + partitionInfo.Definitions[i].ID = ids[idx] + idx++ + } + } + } + for _, jobW := range jobWs { + switch jobW.Type { + case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: + info := jobW.Args[0].(*model.TableInfo) + if !jobW.IDAllocated { + assignIDsForTable(info) + } + jobW.TableID = info.ID + case model.ActionCreateTables: + if !jobW.IDAllocated { + infos := jobW.Args[0].([]*model.TableInfo) + for _, info := range infos { + assignIDsForTable(info) + } + } + case model.ActionCreateSchema: + dbInfo := jobW.Args[0].(*model.DBInfo) + if !jobW.IDAllocated { + dbInfo.ID = ids[idx] + idx++ + } + jobW.SchemaID = dbInfo.ID + } + // TODO support other type of jobs + jobW.ID = ids[idx] + idx++ + } +} + +// genGIDAndCallWithRetry generates global IDs and calls the function with retry. // generate ID and call function runs in the same transaction. -func genIDAndCallWithRetry(ctx context.Context, ddlSe *sess.Session, count int, fn func(ids []int64) error) error { +func genGIDAndCallWithRetry(ctx context.Context, ddlSe *sess.Session, count int, fn func(ids []int64) error) error { var resErr error for i := uint(0); i < kv.MaxRetryCnt; i++ { resErr = func() (err error) { @@ -631,36 +711,40 @@ func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transactio // combineBatchCreateTableJobs combine batch jobs to another batch jobs. // currently it only support combine CreateTable to CreateTables. -func combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) { - if len(tasks) <= 1 { - return tasks, nil +func combineBatchCreateTableJobs(jobWs []*JobWrapper) ([]*JobWrapper, error) { + if len(jobWs) <= 1 { + return jobWs, nil } var schemaName string - jobs := make([]*model.Job, 0, len(tasks)) - for i, task := range tasks { - if task.job.Type != model.ActionCreateTable { - return tasks, nil + jobs := make([]*model.Job, 0, len(jobWs)) + for i, jobW := range jobWs { + // we don't merge jobs with ID pre-allocated. + if jobW.Job.Type != model.ActionCreateTable || jobW.IDAllocated { + return jobWs, nil } if i == 0 { - schemaName = task.job.SchemaName - } else if task.job.SchemaName != schemaName { - return tasks, nil + schemaName = jobW.Job.SchemaName + } else if jobW.Job.SchemaName != schemaName { + return jobWs, nil } - jobs = append(jobs, task.job) + jobs = append(jobs, jobW.Job) } job, err := BatchCreateTableWithJobs(jobs) if err != nil { - return tasks, err + return jobWs, err } - logutil.DDLLogger().Info("combine jobs to batch create table job", zap.Int("len", len(tasks))) + logutil.DDLLogger().Info("combine jobs to batch create table job", zap.Int("len", len(jobWs))) - jobTask := &limitJobTask{job, []chan error{}, nil} + newJobW := &JobWrapper{ + Job: job, + ErrChs: []chan error{}, + } // combine the error chans. - for _, j := range tasks { - jobTask.errChs = append(jobTask.errChs, j.errChs...) + for _, j := range jobWs { + newJobW.ErrChs = append(newJobW.ErrChs, j.ErrChs...) } - return []*limitJobTask{jobTask}, nil + return []*JobWrapper{newJobW}, nil } func injectFailPointForGetJob(job *model.Job) { diff --git a/pkg/ddl/ddl_worker_test.go b/pkg/ddl/ddl_worker_test.go index 9fbf8d092e26f..89ff93d93e7c2 100644 --- a/pkg/ddl/ddl_worker_test.go +++ b/pkg/ddl/ddl_worker_test.go @@ -54,7 +54,7 @@ func TestInvalidDDLJob(t *testing.T) { } ctx := testNewContext(store) ctx.SetValue(sessionctx.QueryString, "skip") - err := dom.DDL().DoDDLJob(ctx, job) + err := dom.DDL().DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.ErrorContains(t, err, "[ddl:8204]invalid ddl job type: none") } @@ -66,7 +66,7 @@ func TestAddBatchJobError(t *testing.T) { // Test the job runner should not hang forever. job := &model.Job{SchemaID: 1, TableID: 1} ctx.SetValue(sessionctx.QueryString, "skip") - err := dom.DDL().DoDDLJob(ctx, job) + err := dom.DDL().DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.Error(t, err) require.Equal(t, err.Error(), "mockAddBatchDDLJobsErr") require.Nil(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockAddBatchDDLJobsErr")) diff --git a/pkg/ddl/executor_test.go b/pkg/ddl/executor_test.go index c2b99d70ea505..1b862339b2eaf 100644 --- a/pkg/ddl/executor_test.go +++ b/pkg/ddl/executor_test.go @@ -35,6 +35,18 @@ import ( "github.com/stretchr/testify/require" ) +func getGlobalID(ctx context.Context, t *testing.T, store kv.Storage) int64 { + res := int64(0) + require.NoError(t, kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + id, err := m.GetGlobalID() + require.NoError(t, err) + res = id + return nil + })) + return res +} + func TestGenIDAndInsertJobsWithRetry(t *testing.T) { store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore)) // disable DDL to avoid it interfere the test @@ -50,40 +62,227 @@ func TestGenIDAndInsertJobsWithRetry(t *testing.T) { kv.MaxRetryCnt = bak }) - jobs := []*model.Job{ - { + jobs := []*ddl.JobWrapper{{ + Job: &model.Job{ Type: model.ActionCreateTable, SchemaName: "test", TableName: "t1", + Args: []any{&model.TableInfo{}}, }, - } + }} + initialGID := getGlobalID(ctx, t, store) + threads, iterations := 10, 500 var wg util.WaitGroupWrapper - for i := 0; i < 10; i++ { + for i := 0; i < threads; i++ { wg.Run(func() { kit := testkit.NewTestKit(t, store) ddlSe := sess.NewSession(kit.Session()) - for i := 0; i < 1000; i++ { - require.NoError(t, ddl.GenIDAndInsertJobsWithRetry(ctx, ddlSe, jobs)) + for j := 0; j < iterations; j++ { + require.NoError(t, ddl.GenGIDAndInsertJobsWithRetry(ctx, ddlSe, jobs)) } }) } wg.Wait() - jobs, err := ddl.GetAllDDLJobs(tk.Session()) + jobCount := threads * iterations + gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) require.NoError(t, err) - require.Len(t, jobs, 10000) - var maxID, currUsedGID int64 - for _, j := range jobs { - maxID = max(maxID, j.ID) + require.Len(t, gotJobs, jobCount) + currGID := getGlobalID(ctx, t, store) + require.Greater(t, currGID-initialGID, int64(jobCount)) + uniqueJobIDs := make(map[int64]struct{}, jobCount) + for _, j := range gotJobs { + require.Greater(t, j.ID, initialGID) + uniqueJobIDs[j.ID] = struct{}{} } - require.NoError(t, kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { - m := meta.NewMeta(txn) - currUsedGID, err = m.GetGlobalID() + require.Len(t, uniqueJobIDs, jobCount) +} + +type idAllocationCase struct { + jobW *ddl.JobWrapper + requiredIDCount int +} + +func TestCombinedIDAllocation(t *testing.T) { + store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore)) + // disable DDL to avoid it interfere the test + tk := testkit.NewTestKit(t, store) + dom := domain.GetDomain(tk.Session()) + dom.DDL().OwnerManager().CampaignCancel() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + + // avoid outer retry + bak := kv.MaxRetryCnt + kv.MaxRetryCnt = 1 + t.Cleanup(func() { + kv.MaxRetryCnt = bak + }) + + genTblInfo := func(partitionCnt int) *model.TableInfo { + info := &model.TableInfo{Partition: &model.PartitionInfo{}} + for i := 0; i < partitionCnt; i++ { + info.Partition.Enable = true + info.Partition.Definitions = append(info.Partition.Definitions, model.PartitionDefinition{}) + } + return info + } + + genCreateTblJob := func(tp model.ActionType, partitionCnt int) *model.Job { + return &model.Job{ + Type: tp, + Args: []any{genTblInfo(partitionCnt)}, + } + } + + genCreateTblsJob := func(partitionCounts ...int) *model.Job { + infos := make([]*model.TableInfo, 0, len(partitionCounts)) + for _, c := range partitionCounts { + infos = append(infos, genTblInfo(c)) + } + return &model.Job{ + Type: model.ActionCreateTables, + Args: []any{infos}, + } + } + + genCreateDBJob := func() *model.Job { + info := &model.DBInfo{} + return &model.Job{ + Type: model.ActionCreateSchema, + Args: []any{info}, + } + } + + cases := []idAllocationCase{ + { + jobW: ddl.NewJobWrapper(genCreateTblsJob(1, 2, 0), false), + requiredIDCount: 1 + 3 + 1 + 2, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblsJob(3, 4), true), + requiredIDCount: 1, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 3), false), + requiredIDCount: 1 + 1 + 3, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 0), false), + requiredIDCount: 1 + 1, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 8), true), + requiredIDCount: 1, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateSequence, 0), false), + requiredIDCount: 2, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateSequence, 0), true), + requiredIDCount: 1, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateView, 0), false), + requiredIDCount: 2, + }, + { + jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateView, 0), true), + requiredIDCount: 1, + }, + { + jobW: ddl.NewJobWrapper(genCreateDBJob(), false), + requiredIDCount: 2, + }, + { + jobW: ddl.NewJobWrapper(genCreateDBJob(), true), + requiredIDCount: 1, + }, + } + + t.Run("process one by one", func(t *testing.T) { + tk.MustExec("delete from mysql.tidb_ddl_job") + for _, c := range cases { + currentGlobalID := getGlobalID(ctx, t, store) + require.NoError(t, ddl.GenGIDAndInsertJobsWithRetry(ctx, sess.NewSession(tk.Session()), []*ddl.JobWrapper{c.jobW})) + require.Equal(t, currentGlobalID+int64(c.requiredIDCount), getGlobalID(ctx, t, store)) + } + gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) require.NoError(t, err) - return nil - })) - require.Greater(t, currUsedGID, int64(10000)) - require.Equal(t, currUsedGID, maxID) + require.Len(t, gotJobs, len(cases)) + }) + + t.Run("process together", func(t *testing.T) { + tk.MustExec("delete from mysql.tidb_ddl_job") + + totalRequiredCnt := 0 + jobWs := make([]*ddl.JobWrapper, 0, len(cases)) + for _, c := range cases { + totalRequiredCnt += c.requiredIDCount + jobWs = append(jobWs, c.jobW) + } + currentGlobalID := getGlobalID(ctx, t, store) + require.NoError(t, ddl.GenGIDAndInsertJobsWithRetry(ctx, sess.NewSession(tk.Session()), jobWs)) + require.Equal(t, currentGlobalID+int64(totalRequiredCnt), getGlobalID(ctx, t, store)) + + gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + require.NoError(t, err) + require.Len(t, gotJobs, len(cases)) + }) + + t.Run("process IDAllocated = false", func(t *testing.T) { + tk.MustExec("delete from mysql.tidb_ddl_job") + + initialGlobalID := getGlobalID(ctx, t, store) + allocIDCaseCount, allocatedIDCount := 0, 0 + for _, c := range cases { + if !c.jobW.IDAllocated { + allocIDCaseCount++ + allocatedIDCount += c.requiredIDCount + require.NoError(t, ddl.GenGIDAndInsertJobsWithRetry(ctx, sess.NewSession(tk.Session()), []*ddl.JobWrapper{c.jobW})) + } + } + require.EqualValues(t, 6, allocIDCaseCount) + uniqueIDs := make(map[int64]struct{}, len(cases)) + checkTableInfo := func(info *model.TableInfo) { + uniqueIDs[info.ID] = struct{}{} + require.Greater(t, info.ID, initialGlobalID) + if pInfo := info.GetPartitionInfo(); pInfo != nil { + for _, def := range pInfo.Definitions { + uniqueIDs[def.ID] = struct{}{} + require.Greater(t, def.ID, initialGlobalID) + } + } + } + gotJobs, err := ddl.GetAllDDLJobs(tk.Session()) + require.NoError(t, err) + require.Len(t, gotJobs, allocIDCaseCount) + for _, j := range gotJobs { + uniqueIDs[j.ID] = struct{}{} + require.Greater(t, j.ID, initialGlobalID) + switch j.Type { + case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence: + require.Greater(t, j.TableID, initialGlobalID) + info := &model.TableInfo{} + require.NoError(t, j.DecodeArgs(info)) + require.Equal(t, j.TableID, info.ID) + checkTableInfo(info) + case model.ActionCreateTables: + var infos []*model.TableInfo + require.NoError(t, j.DecodeArgs(&infos)) + for _, info := range infos { + checkTableInfo(info) + } + case model.ActionCreateSchema: + require.Greater(t, j.SchemaID, initialGlobalID) + info := &model.DBInfo{} + require.NoError(t, j.DecodeArgs(info)) + uniqueIDs[info.ID] = struct{}{} + require.Equal(t, j.SchemaID, info.ID) + } + } + require.Len(t, uniqueIDs, allocatedIDCount) + }) } var ( @@ -102,15 +301,15 @@ func TestGenIDAndInsertJobsWithRetryQPS(t *testing.T) { dom.DDL().OwnerManager().CampaignCancel() ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - payload := []byte(strings.Repeat("a", payloadSize)) - jobs := []*model.Job{ - { + payload := strings.Repeat("a", payloadSize) + jobs := []*ddl.JobWrapper{{ + Job: &model.Job{ Type: model.ActionCreateTable, SchemaName: "test", TableName: "t1", - Args: []any{payload}, + Args: []any{&model.TableInfo{Comment: payload}}, }, - } + }} counters := make([]atomic.Int64, thread+1) var wg util.WaitGroupWrapper for i := 0; i < thread; i++ { @@ -119,7 +318,7 @@ func TestGenIDAndInsertJobsWithRetryQPS(t *testing.T) { kit := testkit.NewTestKit(t, store) ddlSe := sess.NewSession(kit.Session()) for i := 0; i < iterationPerThread; i++ { - require.NoError(t, ddl.GenIDAndInsertJobsWithRetry(ctx, ddlSe, jobs)) + require.NoError(t, ddl.GenGIDAndInsertJobsWithRetry(ctx, ddlSe, jobs)) counters[0].Add(1) counters[index+1].Add(1) diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index 24e0459ae231c..663348ae99422 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -68,7 +68,7 @@ func testCreateForeignKey(t *testing.T, d ddl.DDL, ctx sessionctx.Context, dbInf err := sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) return job } @@ -84,7 +84,7 @@ func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo Args: []any{model.NewCIStr(foreignKeyName)}, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index b265b2ae952ae..5276657a25704 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -252,11 +252,11 @@ func (d *ddl) startLocalWorkerLoop() { select { case <-d.ctx.Done(): return - case task, ok := <-d.localJobCh: + case jobW, ok := <-d.localJobCh: if !ok { return } - d.delivery2LocalWorker(d.localWorkerPool, task) + d.delivery2LocalWorker(d.localWorkerPool, jobW) } } } @@ -469,11 +469,11 @@ func (s *jobScheduler) mustReloadSchemas() { // delivery2LocalWorker runs the DDL job of v2 in local. // send the result to the error channels in the task. // delivery2Localworker owns the worker, need to put it back to the pool in this function. -func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { - job := task.job +func (d *ddl) delivery2LocalWorker(pool *workerPool, jobW *JobWrapper) { + job := jobW.Job wk, err := pool.get() if err != nil { - task.NotifyError(err) + jobW.NotifyError(err) return } for wk == nil { @@ -484,7 +484,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { } wk, err = pool.get() if err != nil { - task.NotifyError(err) + jobW.NotifyError(err) return } } @@ -507,7 +507,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) { if err != nil { logutil.DDLLogger().Info("handle ddl job failed", zap.Error(err), zap.Stringer("job", job)) } - task.NotifyError(err) + jobW.NotifyError(err) }) } @@ -667,26 +667,28 @@ const ( updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d" ) -func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobs ...*model.Job) error { +func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error { failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { if val.(bool) { failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) } }) - if len(jobs) == 0 { + if len(jobWs) == 0 { return nil } var sql bytes.Buffer sql.WriteString(addDDLJobSQL) - for i, job := range jobs { - b, err := job.Encode(true) + for i, jobW := range jobWs { + b, err := jobW.Encode(true) if err != nil { return err } if i != 0 { sql.WriteString(",") } - fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", job.ID, job.MayNeedReorg(), strconv.Quote(job2SchemaIDs(job)), strconv.Quote(job2TableIDs(job)), util.WrapKey2String(b), job.Type, !job.NotStarted()) + fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(), + strconv.Quote(job2SchemaIDs(jobW.Job)), strconv.Quote(job2TableIDs(jobW.Job)), + util.WrapKey2String(b), jobW.Type, !jobW.NotStarted()) } se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) _, err := se.Execute(ctx, sql.String(), "insert_job") diff --git a/pkg/ddl/partition_test.go b/pkg/ddl/partition_test.go index 1bcd9dc642166..487cd497d16c9 100644 --- a/pkg/ddl/partition_test.go +++ b/pkg/ddl/partition_test.go @@ -121,7 +121,7 @@ func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partN func testDropPartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job { job := buildDropPartitionJob(dbInfo, tblInfo, partNames) ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) @@ -144,7 +144,7 @@ func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, p func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64, newIDs []int64) *model.Job { job := buildTruncatePartitionJob(dbInfo, tblInfo, pids, newIDs) ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 9acf217f41e3f..9251635ed4565 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -770,7 +770,7 @@ func TestCreateTableWithInfoPlacement(t *testing.T) { tk.MustExec("drop placement policy p1") tk.MustExec("create placement policy p1 followers=2") tk.Session().SetValue(sessionctx.QueryString, "skip") - require.Nil(t, dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl, nil, ddl.OnExistError)) + require.Nil(t, dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl, nil, ddl.WithOnExist(ddl.OnExistError))) tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) @@ -791,7 +791,7 @@ func TestCreateTableWithInfoPlacement(t *testing.T) { tbl2.Name = model.NewCIStr("t3") tbl2.PlacementPolicyRef.Name = model.NewCIStr("pxx") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl2, nil, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl2, nil, ddl.WithOnExist(ddl.OnExistError)) require.Equal(t, "[schema:8239]Unknown placement policy 'pxx'", err.Error()) } diff --git a/pkg/ddl/placement_sql_test.go b/pkg/ddl/placement_sql_test.go index 6d56ca874bdc5..62f16cd69ad65 100644 --- a/pkg/ddl/placement_sql_test.go +++ b/pkg/ddl/placement_sql_test.go @@ -535,7 +535,7 @@ func TestPlacementMode(t *testing.T) { require.NotNil(t, tbl.PlacementPolicyRef) tbl.Name = model.NewCIStr("t2") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.WithOnExist(ddl.OnExistError)) require.NoError(t, err) tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" + " `id` int(11) DEFAULT NULL\n" + @@ -549,7 +549,7 @@ func TestPlacementMode(t *testing.T) { tbl.Name = model.NewCIStr("t2") tbl.PlacementPolicyRef.Name = model.NewCIStr("pxx") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.WithOnExist(ddl.OnExistError)) require.NoError(t, err) tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" + " `id` int(11) DEFAULT NULL\n" + diff --git a/pkg/ddl/restart_test.go b/pkg/ddl/restart_test.go index ee55e18cb07a8..56b4fa4de742e 100644 --- a/pkg/ddl/restart_test.go +++ b/pkg/ddl/restart_test.go @@ -67,7 +67,7 @@ func runInterruptedJob(t *testing.T, store kv.Storage, d ddl.DDL, job *model.Job ctx := testkit.NewTestKit(t, store).Session() ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) if errors.Is(err, context.Canceled) { endlessLoopTime := time.Now().Add(time.Minute) for history == nil { diff --git a/pkg/ddl/schema_test.go b/pkg/ddl/schema_test.go index 2134a1590fa79..5a3ef801ba94a 100644 --- a/pkg/ddl/schema_test.go +++ b/pkg/ddl/schema_test.go @@ -49,7 +49,7 @@ func testCreateTable(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *mo Args: []any{tblInfo}, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -146,7 +146,7 @@ func testCreateSchema(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *m }}, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v := getSchemaVer(t, ctx) dbInfo.State = model.StatePublic @@ -171,7 +171,7 @@ func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job { func testDropSchema(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbInfo *model.DBInfo) (*model.Job, int64) { job := buildDropSchemaJob(dbInfo) ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) ver := getSchemaVer(t, ctx) return job, ver @@ -277,7 +277,7 @@ func TestSchema(t *testing.T) { } ctx := testkit.NewTestKit(t, store).Session() ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.True(t, terror.ErrorEqual(err, infoschema.ErrDatabaseDropExists), "err %v", err) // Drop a database without a table. @@ -355,7 +355,7 @@ func doDDLJobErr( } // TODO: check error detail ctx.SetValue(sessionctx.QueryString, "skip") - require.Error(t, d.DoDDLJob(ctx, job)) + require.Error(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) testCheckJobCancelled(t, store, job, nil) return job diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index c19082b0847ad..04135e4a191bc 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -463,13 +463,13 @@ func (d *Checker) CreateSchemaWithInfo(ctx sessionctx.Context, info *model.DBInf } // CreateTableWithInfo implements the DDL interface. -func (*Checker) CreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ *model.TableInfo, _ []model.InvolvingSchemaInfo, _ ...ddl.CreateTableWithInfoConfigurier) error { +func (*Checker) CreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ *model.TableInfo, _ []model.InvolvingSchemaInfo, _ ...ddl.CreateTableOption) error { //TODO implement me panic("implement me") } // BatchCreateTableWithInfo implements the DDL interface. -func (*Checker) BatchCreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ []*model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error { +func (*Checker) BatchCreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ []*model.TableInfo, _ ...ddl.CreateTableOption) error { //TODO implement me panic("implement me") } @@ -560,6 +560,11 @@ func (d *Checker) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return d.realDDL.DoDDLJob(ctx, job) } +// DoDDLJobWrapper implements the DDL interface. +func (d *Checker) DoDDLJobWrapper(ctx sessionctx.Context, jobW *ddl.JobWrapper) error { + return d.realDDL.DoDDLJobWrapper(ctx, jobW) +} + type storageAndMore interface { kv.Storage kv.EtcdBackend diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index c31b5269acaae..19c5836a99fd7 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -230,7 +230,7 @@ func (d SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStm onExist = ddl.OnExistIgnore } - return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, nil, onExist) + return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, nil, ddl.WithOnExist(onExist)) } // CreateTableWithInfo implements the DDL interface. @@ -239,9 +239,9 @@ func (d SchemaTracker) CreateTableWithInfo( dbName model.CIStr, info *model.TableInfo, _ []model.InvolvingSchemaInfo, - cs ...ddl.CreateTableWithInfoConfigurier, + cs ...ddl.CreateTableOption, ) error { - c := ddl.GetCreateTableWithInfoConfig(cs) + c := ddl.GetCreateTableConfig(cs) schema := d.SchemaByName(dbName) if schema == nil { @@ -291,7 +291,7 @@ func (d SchemaTracker) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) onExist = ddl.OnExistReplace } - return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, onExist) + return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, ddl.WithOnExist(onExist)) } // DropTable implements the DDL interface. @@ -1188,7 +1188,7 @@ func (SchemaTracker) AlterResourceGroup(_ sessionctx.Context, _ *ast.AlterResour } // BatchCreateTableWithInfo implements the DDL interface, it will call CreateTableWithInfo for each table. -func (d SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { +func (d SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableOption) error { for _, tableInfo := range info { if err := d.CreateTableWithInfo(ctx, schema, tableInfo, nil, cs...); err != nil { return err @@ -1275,3 +1275,8 @@ func (SchemaTracker) GetInfoSchemaWithInterceptor(_ sessionctx.Context) infosche func (SchemaTracker) DoDDLJob(_ sessionctx.Context, _ *model.Job) error { return nil } + +// DoDDLJobWrapper implements the DDL interface, it's no-op in DM's case. +func (SchemaTracker) DoDDLJobWrapper(_ sessionctx.Context, _ *ddl.JobWrapper) error { + return nil +} diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index e96fdcdf1c7f6..be89bc9d4ff49 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -65,7 +65,7 @@ func testRenameTable( }, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v := getSchemaVer(t, ctx) tblInfo.State = model.StatePublic @@ -86,7 +86,7 @@ func testRenameTables(t *testing.T, ctx sessionctx.Context, d ddl.DDL, oldSchema }, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil}) @@ -120,7 +120,7 @@ func testLockTable( }, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -162,7 +162,7 @@ func testTruncateTable(t *testing.T, ctx sessionctx.Context, store kv.Storage, d Args: []any{newTableID}, } ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -296,7 +296,7 @@ func TestCreateView(t *testing.T) { Args: []any{newTblInfo0}, } ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -319,7 +319,7 @@ func TestCreateView(t *testing.T) { Args: []any{newTblInfo1, true, newTblInfo0.ID}, } ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v = getSchemaVer(t, ctx) @@ -342,7 +342,7 @@ func TestCreateView(t *testing.T) { Args: []any{newTblInfo2, true, newTblInfo0.ID}, } ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) // The non-existing table id in job args will not be considered anymore. require.NoError(t, err) } @@ -391,7 +391,7 @@ func testAlterCacheTable( }, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -418,7 +418,7 @@ func testAlterNoCacheTable( }, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v}) @@ -508,7 +508,7 @@ func TestCreateTables(t *testing.T) { *errP = errors.New("mock get job by ID failed") }) }) - err = d.DoDDLJob(ctx, job) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) require.NoError(t, err) testGetTable(t, domain, genIDs[0]) @@ -567,7 +567,7 @@ func TestAlterTTL(t *testing.T) { }}, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v := getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil}) @@ -588,7 +588,7 @@ func TestAlterTTL(t *testing.T) { Args: []any{true}, } ctx.SetValue(sessionctx.QueryString, "skip") - require.NoError(t, d.DoDDLJob(ctx, job)) + require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) v = getSchemaVer(t, ctx) checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil}) diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index d876e5ee43e79..6744477d4ad0e 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -781,13 +781,13 @@ func (gs *tidbGlueSession) CreateDatabase(_ context.Context, schema *model.DBInf } // CreateTable implements glue.Session -func (gs *tidbGlueSession) CreateTable(_ context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { +func (gs *tidbGlueSession) CreateTable(_ context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableOption) error { return BRIECreateTable(gs.se, dbName, table, "", cs...) } // CreateTables implements glue.BatchCreateTableSession. func (gs *tidbGlueSession) CreateTables(_ context.Context, - tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + tables map[string][]*model.TableInfo, cs ...ddl.CreateTableOption) error { return BRIECreateTables(gs.se, tables, "", cs...) } diff --git a/pkg/executor/brie_utils.go b/pkg/executor/brie_utils.go index e7511b755ba79..57941f00e80b6 100644 --- a/pkg/executor/brie_utils.go +++ b/pkg/executor/brie_utils.go @@ -90,7 +90,7 @@ func BRIECreateTable( dbName model.CIStr, table *model.TableInfo, brComment string, - cs ...ddl.CreateTableWithInfoConfigurier, + cs ...ddl.CreateTableOption, ) error { d := domain.GetDomain(sctx).DDL() query, err := showRestoredCreateTable(sctx, table, brComment) @@ -109,7 +109,7 @@ func BRIECreateTable( table = table.Clone() - return d.CreateTableWithInfo(sctx, dbName, table, nil, append(cs, ddl.OnExistIgnore)...) + return d.CreateTableWithInfo(sctx, dbName, table, nil, append(cs, ddl.WithOnExist(ddl.OnExistIgnore))...) } // BRIECreateTables creates the tables with OnExistIgnore option in batch @@ -117,7 +117,7 @@ func BRIECreateTables( sctx sessionctx.Context, tables map[string][]*model.TableInfo, brComment string, - cs ...ddl.CreateTableWithInfoConfigurier, + cs ...ddl.CreateTableOption, ) error { // Disable foreign key check when batch create tables. originForeignKeyChecks := sctx.GetSessionVars().ForeignKeyChecks @@ -159,10 +159,10 @@ func BRIECreateTables( // The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation // TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all. func splitBatchCreateTable(sctx sessionctx.Context, schema model.CIStr, - infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + infos []*model.TableInfo, cs ...ddl.CreateTableOption) error { var err error d := domain.GetDomain(sctx).DDL() - err = d.BatchCreateTableWithInfo(sctx, schema, infos, append(cs, ddl.OnExistIgnore)...) + err = d.BatchCreateTableWithInfo(sctx, schema, infos, append(cs, ddl.WithOnExist(ddl.OnExistIgnore))...) if kv.ErrEntryTooLarge.Equal(err) { log.Info("entry too large, split batch create table", zap.Int("num table", len(infos))) if len(infos) == 1 { diff --git a/pkg/executor/brie_utils_test.go b/pkg/executor/brie_utils_test.go index 4489c1d280158..761eaf0c1fabc 100644 --- a/pkg/executor/brie_utils_test.go +++ b/pkg/executor/brie_utils_test.go @@ -59,9 +59,7 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) { // keep/reused table id verification sctx.SetValue(sessionctx.QueryString, "skip") - err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos1, ddl.WithIDAllocated(true)) require.NoError(t, err) require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) @@ -90,9 +88,7 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) { }) tk.Session().SetValue(sessionctx.QueryString, "skip") - err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return true - })) + err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos2) require.NoError(t, err) require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) @@ -108,9 +104,7 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) { infos3 := []*model.TableInfo{} originQueryString := sctx.Value(sessionctx.QueryString) - err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) + err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos3, ddl.WithIDAllocated(true)) require.NoError(t, err) require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString)) } @@ -146,9 +140,7 @@ func TestSplitBatchCreateTable(t *testing.T) { // keep/reused table id verification tk.Session().SetValue(sessionctx.QueryString, "skip") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)")) - err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.WithIDAllocated(true)) require.NoError(t, err) require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) @@ -214,9 +206,7 @@ func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) { tk.Session().SetValue(sessionctx.QueryString, "skip") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)")) - err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return true - })) + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos) require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) require.True(t, kv.ErrEntryTooLarge.Equal(err))