Skip to content

Commit

Permalink
ddl: combine table/partition/db id allocation with job id (#54669)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Jul 17, 2024
1 parent 2088b34 commit e33dbbc
Show file tree
Hide file tree
Showing 24 changed files with 624 additions and 363 deletions.
4 changes: 2 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Session interface {
ExecuteInternal(ctx context.Context, sql string, args ...any) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
cs ...ddl.CreateTableOption) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -54,7 +54,7 @@ type Session interface {
// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
cs ...ddl.CreateTableOption) error
}

// Progress is an interface recording the current execution progress.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableOption) error {
return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...))
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
table *model.TableInfo, cs ...ddl.CreateTableOption) error {
return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...))
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/gluetidb/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (*mockSession) CreatePlacementPolicy(_ context.Context, _ *model.PolicyInfo

// CreateTables implements glue.BatchCreateTableSession.
func (*mockSession) CreateTables(_ context.Context, _ map[string][]*model.TableInfo,
_ ...ddl.CreateTableWithInfoConfigurier) error {
_ ...ddl.CreateTableOption) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (*mockSession) CreateTable(_ context.Context, _ model.CIStr,
_ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error {
_ *model.TableInfo, _ ...ddl.CreateTableOption) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
41 changes: 26 additions & 15 deletions br/pkg/restore/internal/prealloc_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,26 +280,29 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table,
return nil
}

func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
return func(ti *model.TableInfo) bool {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
return !prealloced
func (db *DB) canReuseTableID(ti *model.TableInfo) bool {
if db.preallocedIDs == nil {
return false
}
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
return prealloced
}

// CreateTables execute a internal CREATE TABLES.
func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
ddlTables map[restore.UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
if batchSession, ok := db.se.(glue.BatchCreateTableSession); ok {
m := map[string][]*model.TableInfo{}
idReusableTbls := map[string][]*model.TableInfo{}
idNonReusableTbls := map[string][]*model.TableInfo{}
for _, table := range tables {
m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info)
if db.canReuseTableID(table.Info) {
idReusableTbls[table.DB.Name.L] = append(idReusableTbls[table.DB.Name.L], table.Info)
} else {
idNonReusableTbls[table.DB.Name.L] = append(idNonReusableTbls[table.DB.Name.L], table.Info)
}
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
Expand All @@ -314,8 +317,15 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
ttlInfo.Enable = false
}
}
if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil {
return err
if len(idReusableTbls) > 0 {
if err := batchSession.CreateTables(ctx, idReusableTbls, ddl.WithIDAllocated(true)); err != nil {
return err
}
}
if len(idNonReusableTbls) > 0 {
if err := batchSession.CreateTables(ctx, idNonReusableTbls); err != nil {
return err
}
}

for _, table := range tables {
Expand Down Expand Up @@ -345,7 +355,8 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
ttlInfo.Enable = false
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter())
reuseID := db.canReuseTableID(table.Info)
err := db.se.CreateTable(ctx, table.DB.Name, table.Info, ddl.WithIDAllocated(reuseID))
if err != nil {
log.Error("create table failed",
zap.Stringer("db", table.DB.Name),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestPreCheckTableClusterIndex(t *testing.T) {
Collate: "utf8mb4_bin",
},
}
err = se.CreateTable(ctx, tables[i].DB.Name, tables[i].Info, ddl.OnExistIgnore)
err = se.CreateTable(ctx, tables[i].DB.Name, tables[i].Info, ddl.WithOnExist(ddl.OnExistIgnore))
require.NoError(t, err)
}

Expand Down
14 changes: 5 additions & 9 deletions pkg/ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func TestCreateTableWithInfo(t *testing.T) {
Name: model.NewCIStr("t"),
}}

require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
})))
require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.WithOnExist(ddl.OnExistError), ddl.WithIDAllocated(true)))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 't'").Check(testkit.Rows("42042"))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)

Expand All @@ -342,9 +340,7 @@ func TestCreateTableWithInfo(t *testing.T) {
Name: model.NewCIStr("tt"),
}}
tk.Session().SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return true
})))
require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.WithOnExist(ddl.OnExistError)))
idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tt'").Rows()[0][0].(string)
require.True(t, ok)
idGenNum, err := strconv.ParseInt(idGen, 10, 64)
Expand Down Expand Up @@ -374,7 +370,7 @@ func TestBatchCreateTable(t *testing.T) {

// correct name
tk.Session().SetValue(sessionctx.QueryString, "skip")
err := d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.OnExistError)
err := d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.WithOnExist(ddl.OnExistError))
require.NoError(t, err)

tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3"))
Expand All @@ -389,7 +385,7 @@ func TestBatchCreateTable(t *testing.T) {
// duplicated name
infos[1].Name = model.NewCIStr("tables_1")
tk.Session().SetValue(sessionctx.QueryString, "skip")
err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.OnExistError)
err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), infos, ddl.WithOnExist(ddl.OnExistError))
require.True(t, terror.ErrorEqual(err, infoschema.ErrTableExists))

newinfo := &model.TableInfo{
Expand Down Expand Up @@ -418,7 +414,7 @@ func TestBatchCreateTable(t *testing.T) {

tk.Session().SetValue(sessionctx.QueryString, "skip")
tk.Session().SetValue(sessionctx.QueryString, "skip")
err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), []*model.TableInfo{newinfo}, ddl.OnExistError)
err = d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), []*model.TableInfo{newinfo}, ddl.WithOnExist(ddl.OnExistError))
require.NoError(t, err)
}

Expand Down
Loading

0 comments on commit e33dbbc

Please sign in to comment.