Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
if err = s.makeSinkReady(ctx); err == nil {
err = s.sink.WriteDDLEvent(ctx, ddl)
failpoint.Inject("InjectChangefeedDDLError", func() {
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
err = cerror.ErrExecDDLFailed.GenWithStackByArgs(ddl.Query)
})
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func TestExecDDLError(t *testing.T) {

ddlSink.run(ctx)

mSink.ddlError = cerror.ErrExecDDLFailed.GenWithStackByArgs()
ddl2 := &model.DDLEvent{CommitTs: 2, Query: "create table t2(id int)"}
mSink.ddlError = cerror.ErrExecDDLFailed.GenWithStackByArgs(ddl2.Query)
for {
done, err := ddlSink.emitDDLEvent(ctx, ddl2)
require.Nil(t, err)
Expand Down
91 changes: 10 additions & 81 deletions cdc/sink/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,77 +25,15 @@ import (
"go.uber.org/zap"
)

const timeout = 5 * time.Second

// TODO: Use the flollowing SQL to check the ddl job status after tidb optimize
// the information_schema.ddl_jobs table. Ref: https://github.com/pingcap/tidb/issues/55725
//
// SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
// FROM information_schema.ddl_jobs
var checkRunningAddIndexSQL = `
ADMIN SHOW DDL JOBS 1
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
FROM information_schema.ddl_jobs
WHERE DB_NAME = "%s"
AND TABLE_NAME = "%s"
AND JOB_TYPE LIKE "add index%%"
AND (STATE = "running" OR STATE = "queueing");
`

func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool {
return m.cfg.IsTiDB && ddl.Type == timodel.ActionAddIndex
}

// asyncExecDDL executes ddl in async mode.
// this function only works in TiDB, because TiDB will save ddl jobs
// and execute them asynchronously even if ticdc crashed.
func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error {
done := make(chan error, 1)
// Use a longer timeout to ensure the add index ddl is sent to tidb before executing the next ddl.
tick := time.NewTimer(10 * time.Second)
defer tick.Stop()
log.Info("async exec add index ddl start",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
go func() {
if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil {
log.Error("async exec add index ddl failed",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- err
return
}
log.Info("async exec add index ddl done",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- nil
}()

select {
case <-ctx.Done():
// if the ddl is canceled, we just return nil, if the ddl is not received by tidb,
// the downstream ddl is lost, because the checkpoint ts is forwarded.
log.Info("async add index ddl exits as canceled",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
case err := <-done:
// if the ddl is executed within 2 seconds, we just return the result to the caller.
return err
case <-tick.C:
// if the ddl is still running, we just return nil,
// then if the ddl is failed, the downstream ddl is lost.
// because the checkpoint ts is forwarded.
log.Info("async add index ddl is still running",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
}
}

func (m *DDLSink) needWaitAsyncExecDone(t timodel.ActionType) bool {
if !m.cfg.IsTiDB {
return false
Expand All @@ -110,7 +48,7 @@ func (m *DDLSink) needWaitAsyncExecDone(t timodel.ActionType) bool {
}
}

// Should always wait for async ddl done before executing the next ddl.
// wait for the previous asynchronous DDL to finish before executing the next ddl.
func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
if !m.needWaitAsyncExecDone(ddl.Type) {
return
Expand All @@ -124,10 +62,11 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
tables[ddl.PreTableInfo.TableName] = struct{}{}
}

log.Debug("wait async exec ddl done",
log.Debug("Wait for the previous asynchronous DDL to finish",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Any("tables", tables),
zap.Any("tableInfo", ddl.TableInfo),
zap.Any("preTableInfo", ddl.PreTableInfo),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) {
Expand All @@ -150,7 +89,7 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
}

func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool {
ctx, cancel := context.WithTimeout(ctx, timeout)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout value 5*time.Second is used as a magic number. It's better to define a constant for it at the top of the file to improve readability and maintainability, for example const checkAsyncExecDDLDoneTimeout = 5 * time.Second.

defer cancel()
for table := range tables {
done := m.doCheck(ctx, table)
Expand All @@ -163,16 +102,6 @@ func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.Ta

func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) {
start := time.Now()
if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok {
ddlType := v.(timodel.ActionType)
if ddlType == timodel.ActionAddIndex {
log.Panic("invalid ddl type in lastExecutedNormalDDLCache",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("ddlType", ddlType.String()))
}
return true
}

rows, err := m.db.QueryContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
defer func() {
Expand All @@ -181,15 +110,15 @@ func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool
}
}()
if err != nil {
log.Error("check async exec ddl failed",
log.Error("check previous asynchronous ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
return true
}
rets, err := export.GetSpecifiedColumnValuesAndClose(rows, "JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "STATE")
if err != nil {
log.Error("check async exec ddl failed",
log.Error("check previous asynchronous ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
Expand All @@ -201,7 +130,7 @@ func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool
}
ret := rets[0]
jobID, jobType, schemaState, state := ret[0], ret[1], ret[2], ret[3]
log.Info("async ddl is still running",
log.Info("The previous asynchronous ddl is still running",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Duration("checkDuration", time.Since(start)),
Expand Down
70 changes: 1 addition & 69 deletions cdc/sink/ddlsink/mysql/async_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,8 @@ func TestWaitAsynExecDone(t *testing.T) {
tables := make(map[model.TableName]struct{})
tables[table] = struct{}{}

// Test fast path, ddlSink.lastExecutedNormalDDLCache meet panic
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionAddIndex)
require.Panics(t, func() {
ddlSink.checkAsyncExecDDLDone(ctx, tables)
})

// Test fast path, ddlSink.lastExecutedNormalDDLCache is hit
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionCreateTable)
done := ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

// Clenup the cache, always check the async running state
ddlSink.lastExecutedNormalDDLCache.Remove(table)

// Test has running async ddl job
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
done := ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.False(t, done)

// Test no running async ddl job
Expand All @@ -110,60 +96,6 @@ func TestWaitAsynExecDone(t *testing.T) {
ddlSink.Close()
}

func TestAsyncExecAddIndex(t *testing.T) {
ddlExecutionTime := time.Second * 15
dbConnFactory := pmysql.NewDBConnectionFactoryForTest()
dbConnFactory.SetStandardConnectionFactory(func(ctx context.Context, dsnStr string) (*sql.DB, error) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("Create index idx1 on test.t1(a)").
WillDelayFor(ddlExecutionTime).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectClose()
return db, nil
})
GetDBConnImpl = dbConnFactory

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
changefeed := "test-changefeed"
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID(changefeed), sinkURI, rc)

require.Nil(t, err)

ddl1 := &model.DDLEvent{
StartTs: 1000,
CommitTs: 1010,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
},
},
Type: timodel.ActionAddIndex,
Query: "Create index idx1 on test.t1(a)",
}
start := time.Now()
err = sink.WriteDDLEvent(ctx, ddl1)
require.Nil(t, err)
require.True(t, time.Since(start) < ddlExecutionTime)
require.True(t, time.Since(start) >= 10*time.Second)
sink.Close()
}

func TestNeedWaitAsyncExecDone(t *testing.T) {
sink := &DDLSink{
cfg: &pmysql.Config{
Expand Down
Loading
Loading