Skip to content

Commit

Permalink
ddl: integrate fast create table into normal general DDL workflow (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Jul 31, 2024
1 parent b944d5d commit 41bb8f5
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 420 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -225,6 +224,7 @@ go_test(
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_workerpool_test.go",
"executor_nokit_test.go",
"executor_test.go",
"export_test.go",
"fail_test.go",
Expand Down
208 changes: 30 additions & 178 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand All @@ -83,7 +82,7 @@ const (

shardRowIDBitsMax = 15

batchAddingJobs = 10
batchAddingJobs = 100

reorgWorkerCnt = 10
generalWorkerCnt = 10
Expand Down Expand Up @@ -199,6 +198,15 @@ type DDL interface {
GetMinJobIDRefresher() *systable.MinJobIDRefresher
}

type jobSubmitResult struct {
err error
jobID int64
// merged indicates whether the job is merged into another job together with
// other jobs. we only merge multiple create table jobs into one job when fast
// create table is enabled.
merged bool
}

// JobWrapper is used to wrap a job and some other information.
// exported for testing.
type JobWrapper struct {
Expand All @@ -207,9 +215,9 @@ type JobWrapper struct {
// exported for test.
IDAllocated bool
// job submission is run in async, we use this channel to notify the caller.
// for local job we might combine multiple jobs into one, append the ErrChs to
// this slice.
ErrChs []chan error
// when fast create table enabled, we might combine multiple jobs into one, and
// append the channel to this slice.
ResultCh []chan jobSubmitResult
cacheErr error
}

Expand All @@ -219,14 +227,19 @@ func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper {
return &JobWrapper{
Job: job,
IDAllocated: idAllocated,
ErrChs: []chan error{make(chan error)},
ResultCh: []chan jobSubmitResult{make(chan jobSubmitResult)},
}
}

// NotifyError notifies the error to all error channels.
func (t *JobWrapper) NotifyError(err error) {
for _, errCh := range t.ErrChs {
errCh <- err
// NotifyResult notifies the job submit result.
func (t *JobWrapper) NotifyResult(err error) {
merged := len(t.ResultCh) > 1
for _, resultCh := range t.ResultCh {
resultCh <- jobSubmitResult{
err: err,
jobID: t.ID,
merged: merged,
}
}
}

Expand All @@ -235,8 +248,6 @@ type ddl struct {
m sync.RWMutex
wg tidbutil.WaitGroupWrapper // It's only used to deal with data race in restart_test.
limitJobCh chan *JobWrapper
// limitJobChV2 is used to limit the number of jobs being executed in local worker.
limitJobChV2 chan *JobWrapper

*ddlCtx
sessPool *sess.Pool
Expand Down Expand Up @@ -361,32 +372,17 @@ type hookStruct struct {
hook Callback
}

// the schema synchronization mechanism now requires strict incremental schema versions.
// Therefore, we require a distributed lock to ensure the sequential commit of schema diffs from different TiDB nodes.
type etcdLockInfo struct {
se *concurrency.Session
mu *concurrency.Mutex
}

// schemaVersionManager is used to manage the schema version. To prevent the conflicts on this key between different DDL job,
// we use another transaction to update the schema version, so that we need to lock the schema version and unlock it until the job is committed.
// for version2, we use etcd lock to lock the schema version between TiDB nodes now.
type schemaVersionManager struct {
schemaVersionMu sync.Mutex
// lockOwner stores the job ID that is holding the lock.
lockOwner atomicutil.Int64

ctx context.Context
etcdClient *clientv3.Client
lockInfoMaps map[int64]*etcdLockInfo
}

func newSchemaVersionManager(ctx context.Context, etcdClient *clientv3.Client) *schemaVersionManager {
return &schemaVersionManager{
ctx: ctx,
etcdClient: etcdClient,
lockInfoMaps: make(map[int64]*etcdLockInfo),
}
func newSchemaVersionManager() *schemaVersionManager {
return &schemaVersionManager{}
}

func (sv *schemaVersionManager) setSchemaVersion(job *model.Job, store kv.Storage) (schemaVersion int64, err error) {
Expand All @@ -411,17 +407,6 @@ func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) error {
if ownerID != jobID {
sv.schemaVersionMu.Lock()
sv.lockOwner.Store(jobID)
if sv.etcdClient != nil && variable.EnableFastCreateTable.Load() {
se, err := concurrency.NewSession(sv.etcdClient)
if err != nil {
return errors.Trace(err)
}
mu := concurrency.NewMutex(se, ddlSchemaVersionKeyLock)
if err := mu.Lock(sv.ctx); err != nil {
return errors.Trace(err)
}
sv.lockInfoMaps[jobID] = &etcdLockInfo{se: se, mu: mu}
}
}
return nil
}
Expand All @@ -430,24 +415,6 @@ func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) error {
func (sv *schemaVersionManager) unlockSchemaVersion(jobID int64) {
ownerID := sv.lockOwner.Load()
if ownerID == jobID {
if lockInfo, ok := sv.lockInfoMaps[jobID]; ok {
delete(sv.lockInfoMaps, jobID)
err := lockInfo.mu.Unlock(sv.ctx)
outer:
for err != nil {
logutil.DDLLogger().Error("unlock schema version", zap.Error(err))
select {
case <-sv.ctx.Done():
break outer
case <-time.After(time.Second):
}
// retry unlock
err = lockInfo.mu.Unlock(sv.ctx)
}
if err := lockInfo.se.Close(); err != nil {
logutil.DDLLogger().Error("close etcd session", zap.Error(err))
}
}
sv.lockOwner.Store(0)
sv.schemaVersionMu.Unlock()
}
Expand Down Expand Up @@ -588,11 +555,10 @@ func (dc *ddlCtx) initJobDoneCh(jobID int64) {
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
case ch <- struct{}{}:
default:
}
if ch, ok := dc.ddlJobDoneChMap.Delete(jobID); ok {
// broadcast done event as we might merge multiple jobs into one when fast
// create table is enabled.
close(ch)
}
}

Expand Down Expand Up @@ -708,12 +674,11 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
ddlCtx.mu.hook = opt.Hook
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
ddlCtx.schemaVersionManager = newSchemaVersionManager(ddlCtx.ctx, opt.EtcdCli)
ddlCtx.schemaVersionManager = newSchemaVersionManager()

d := &ddl{
ddlCtx: ddlCtx,
limitJobCh: make(chan *JobWrapper, batchAddingJobs),
limitJobChV2: make(chan *JobWrapper, batchAddingJobs),
enableTiFlashPoll: atomicutil.NewBool(true),
ddlJobNotifyCh: make(chan struct{}, 100),
localJobCh: make(chan *JobWrapper, 1),
Expand All @@ -734,7 +699,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
variable.EnableDDL = d.EnableDDL
variable.DisableDDL = d.DisableDDL
variable.SwitchMDL = d.SwitchMDL
variable.SwitchFastCreateTable = d.SwitchFastCreateTable

e := &executor{
ctx: d.ctx,
Expand All @@ -747,7 +711,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
schemaLoader: d.schemaLoader,
lease: d.lease,
ownerManager: d.ownerManager,
limitJobChV2: d.limitJobChV2,
ddlJobDoneChMap: &d.ddlJobDoneChMap,
ddlJobNotifyCh: d.ddlJobNotifyCh,
mu: &d.mu,
Expand Down Expand Up @@ -819,9 +782,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobCh, d.addBatchDDLJobsV1)
})
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs)
})
d.wg.Run(func() {
d.minJobIDRefresher.Start(d.ctx)
})
Expand Down Expand Up @@ -1078,114 +1038,6 @@ func (d *ddl) SwitchMDL(enable bool) error {
return nil
}

// SwitchFastCreateTable switch fast create table
func (d *ddl) SwitchFastCreateTable(val bool) error {
old := variable.EnableFastCreateTable.Load()
if old == val {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

// Check if there is any DDL running.
// This check can not cover every corner cases, so users need to guarantee that there is no DDL running by themselves.
sessCtx, err := d.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer d.sessPool.Put(sessCtx)
se := sess.NewSession(sessCtx)
rows, err := se.Execute(ctx, "select 1 from mysql.tidb_ddl_job", "check job")
if err != nil {
return errors.Trace(err)
}
if len(rows) != 0 {
return errors.New("please wait for all jobs done")
}

if err := d.switchFastCreateTable(val); err != nil {
return errors.Trace(err)
}

variable.EnableFastCreateTable.Store(val)
logutil.DDLLogger().Info("switch fast create table", zap.Bool("val", val))
return nil
}

// disableFastCreateTable disable fast create table feature.
func (*ddl) disableFastCreateTable(m *meta.Meta) error {
fastCreateTableInitialized, err := m.GetFastCreateTableInitialized()
if err != nil {
return errors.Trace(err)
}
if !fastCreateTableInitialized {
return nil
}
if err := m.ClearAllDatabaseNames(); err != nil {
return errors.Trace(err)
}
if err := m.ClearAllTableNames(); err != nil {
return errors.Trace(err)
}
return errors.Trace(m.SetFastCreateTableInitialized(false))
}

// enableFastCreateTable enable fast create table feature.
func (*ddl) enableFastCreateTable(m *meta.Meta) error {
fastCreateTableInitialized, err := m.GetFastCreateTableInitialized()
if err != nil {
return errors.Trace(err)
}
if fastCreateTableInitialized {
return nil
}

if err := m.ClearAllDatabaseNames(); err != nil {
return errors.Trace(err)
}
if err := m.ClearAllTableNames(); err != nil {
return errors.Trace(err)
}

dbs, err := m.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, dbInfo := range dbs {
if err := m.CreateDatabaseName(dbInfo.Name.L, dbInfo.ID); err != nil {
return errors.Trace(err)
}
}

for _, dbInfo := range dbs {
tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return errors.Trace(err)
}
for _, tableInfo := range tables {
if err := m.CreateTableName(dbInfo.Name.L, tableInfo.Name.L, tableInfo.ID); err != nil {
return errors.Trace(err)
}
}
}

return errors.Trace(m.SetFastCreateTableInitialized(true))
}

func (d *ddl) switchFastCreateTable(val bool) (err error) {
return kv.RunInNewTxn(kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL), d.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)

if val {
err = d.enableFastCreateTable(m)
} else {
err = d.disableFastCreateTable(m)
}
return errors.Trace(err)
})
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
Expand Down
Loading

0 comments on commit 41bb8f5

Please sign in to comment.