Skip to content

Commit

Permalink
ddl: replace OnJobUpdated callback with failpoint (#55091)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
lance6716 authored Aug 1, 2024
1 parent 8c7abde commit 68b529e
Show file tree
Hide file tree
Showing 38 changed files with 257 additions and 576 deletions.
7 changes: 0 additions & 7 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type Callback interface {
OnJobRunBefore(job *model.Job)
// OnJobRunAfter is called after running job.
OnJobRunAfter(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand All @@ -50,11 +48,6 @@ func (*BaseCallback) OnJobRunAfter(_ *model.Job) {
// Nothing to do.
}

// OnJobUpdated implements Callback.OnJobUpdated interface.
func (*BaseCallback) OnJobUpdated(_ *model.Job) {
// Nothing to do.
}

// SchemaLoader is used to avoid import loop, the only impl is domain currently.
type SchemaLoader interface {
Reload() error
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ func TestCancel(t *testing.T) {

resetHook := func(h *callback.TestDDLCallback) {
h.OnJobRunBeforeExported = nil
h.OnJobUpdatedExported.Store(nil)
_ = failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/onJobUpdated")
dom.DDL().SetHook(h.Clone())
}
registerHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) {
if onJobRunBefore {
h.OnJobRunBeforeExported = hookFunc
} else {
h.OnJobUpdatedExported.Store(&hookFunc)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", hookFunc)
}
dom.DDL().SetHook(h.Clone())
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

func TestCancelFlashbackCluster(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)

time.Sleep(10 * time.Millisecond)
Expand All @@ -223,7 +222,7 @@ func TestCancelFlashbackCluster(t *testing.T) {
hook := newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateDeleteOnly
})
dom.DDL().SetHook(hook)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", hook.OnJobUpdated)
tk.MustExec("set global tidb_ttl_job_enable = on")
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)
Expand All @@ -236,16 +235,14 @@ func TestCancelFlashbackCluster(t *testing.T) {
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(hook)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", hook.OnJobUpdated)
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)))
hook.MustCancelFailed(t)

rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS"))
}
18 changes: 7 additions & 11 deletions pkg/ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestColumnAdd(t *testing.T) {
)
first := true
var jobID int64
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
jobID = job.ID
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
Expand All @@ -79,9 +80,7 @@ func TestColumnAdd(t *testing.T) {
publicTable = tbl
require.NoError(t, checkAddPublic(ct, writeOnlyTable, publicTable))
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc.Clone())
})
tk.MustExec("alter table t add column c3 int default 3")
tb := publicTable
v := getSchemaVer(t, tk.Session())
Expand All @@ -94,7 +93,7 @@ func TestColumnAdd(t *testing.T) {
dropCol = tbl.VisibleCols()[2]
}
}
onJobUpdatedExportedFunc2 := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if job.NotStarted() {
return
}
Expand All @@ -105,8 +104,7 @@ func TestColumnAdd(t *testing.T) {
require.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
}
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
})
d.SetHook(tc.Clone())
tk.MustExec("alter table t drop column c3")
v = getSchemaVer(t, tk.Session())
Expand All @@ -115,7 +113,7 @@ func TestColumnAdd(t *testing.T) {

// Add column not default.
first = true
onJobUpdatedExportedFunc3 := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
jobID = job.ID
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
Expand All @@ -133,9 +131,7 @@ func TestColumnAdd(t *testing.T) {
_, err = writeOnlyTable.AddRecord(sess.GetTableCtx(), types.MakeDatums(10, 10))
require.NoError(t, err)
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(tc)
})
tk.MustExec("alter table t add column c3 int")
testCheckJobDone(t, store, jobID, true)
}
Expand Down
27 changes: 9 additions & 18 deletions pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -477,7 +478,7 @@ func TestTransactionWithWriteOnlyColumn(t *testing.T) {

// For issue #31735.
func TestAddGeneratedColumnAndInsert(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
store := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -487,13 +488,11 @@ func TestAddGeneratedColumnAndInsert(t *testing.T) {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
hook := &callback.TestDDLCallback{Do: dom}
ctx := mock.NewContext()
ctx.Store = store
times := 0
var checkErr error
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if checkErr != nil {
return
}
Expand All @@ -517,26 +516,23 @@ func TestAddGeneratedColumnAndInsert(t *testing.T) {
times++
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(hook)
})

tk.MustExec("alter table t1 add column gc int as ((a+1))")
tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("4 5", "10 11"))
require.NoError(t, checkErr)
}

func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
store := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

hook := &callback.TestDDLCallback{}
var checkErr error
assertChangingColName := "_col$_c2_0"
assertChangingIdxName := "_idx$_idx_0"
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn {
var (
_newCol *model.ColumnInfo
Expand All @@ -559,10 +555,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
checkErr = errors.New("changing index name is incorrect")
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d := dom.DDL()
d.SetHook(hook)
})

tk.MustExec("create table if not exists t(c1 varchar(256), c2 bigint, `_col$_c2` varchar(10), unique _idx$_idx(c1), unique idx(c2));")
tk.MustExec("alter table test.t change column c2 cC2 tinyint after `_col$_c2`")
Expand Down Expand Up @@ -593,7 +586,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
assertChangingColName2 := "_col$__col$__col$_c1_0_1"
query1 := "alter table t modify column _col$_c1 tinyint"
query2 := "alter table t modify column _col$__col$_c1_0 tinyint"
onJobUpdatedExportedFunc2 := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if (job.Query == query1 || job.Query == query2) && job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn {
var (
_newCol *model.ColumnInfo
Expand All @@ -616,9 +609,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
checkErr = errors.New("changing column name is incorrect")
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
d.SetHook(hook)
})

tk.MustExec("drop table if exists t")
tk.MustExec("create table if not exists t(c1 bigint, _col$_c1 bigint, _col$__col$_c1_0 bigint, _col$__col$__col$_c1_0_0 bigint)")
Expand Down
35 changes: 9 additions & 26 deletions pkg/ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -643,8 +644,6 @@ func testGetColumn(t table.Table, name string, isExist bool) error {
func TestAddColumn(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t, mockstore.WithDDLChecker())

d := dom.DDL()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int, c2 int, c3 int);")
Expand Down Expand Up @@ -672,8 +671,7 @@ func TestAddColumn(t *testing.T) {

checkOK := false

tc := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if checkOK {
return
}
Expand All @@ -689,9 +687,7 @@ func TestAddColumn(t *testing.T) {
if newCol.State == model.StatePublic {
checkOK = true
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)
})

jobID := testCreateColumn(tk, t, testkit.NewTestKit(t, store).Session(), tableID, newColName, "", defaultColValue, dom)
testCheckJobDone(t, store, jobID, true)
Expand All @@ -705,8 +701,6 @@ func TestAddColumn(t *testing.T) {
func TestAddColumns(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t, mockstore.WithDDLChecker())

d := dom.DDL()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int, c2 int, c3 int);")
Expand Down Expand Up @@ -740,8 +734,7 @@ func TestAddColumns(t *testing.T) {
err = txn.Commit(context.Background())
require.NoError(t, err)

tc := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -761,9 +754,7 @@ func TestAddColumns(t *testing.T) {
checkOK = true
}
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)
})

jobID := testCreateColumns(tk, t, testkit.NewTestKit(t, store).Session(), tableID, newColNames, positions, defaultColValue, dom)

Expand Down Expand Up @@ -809,9 +800,7 @@ func TestDropColumnInColumnTest(t *testing.T) {
var hookErr error
var mu sync.Mutex

d := dom.DDL()
tc := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -823,9 +812,7 @@ func TestDropColumnInColumnTest(t *testing.T) {
checkOK = true
return
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)
})

jobID := testDropColumnInternal(tk, t, testkit.NewTestKit(t, store).Session(), tableID, colName, false, dom)
testCheckJobDone(t, store, jobID, false)
Expand Down Expand Up @@ -871,9 +858,7 @@ func TestDropColumns(t *testing.T) {
var hookErr error
var mu sync.Mutex

d := dom.DDL()
tc := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -887,9 +872,7 @@ func TestDropColumns(t *testing.T) {
return
}
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)
})

jobID := testDropColumns(tk, t, testkit.NewTestKit(t, store).Session(), tableID, colNames, false, dom)
testCheckJobDone(t, store, jobID, false)
Expand Down
13 changes: 5 additions & 8 deletions pkg/ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -305,7 +306,7 @@ func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) {
// It's good because the insert / update logic will cast the related column to changing column rather than use
// origin default value directly.
func TestChangingColOriginDefaultValue(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
Expand All @@ -319,14 +320,12 @@ func TestChangingColOriginDefaultValue(t *testing.T) {
tk.MustExec("insert into t values(2, 2)")

tbl := external.GetTableByName(t, tk, "test", "t")
originalHook := dom.DDL().GetHook()
hook := &callback.TestDDLCallback{Do: dom}
var (
once bool
checkErr error
)
i := 0
onJobUpdatedExportedFunc := func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
if checkErr != nil {
return
}
Expand Down Expand Up @@ -371,11 +370,9 @@ func TestChangingColOriginDefaultValue(t *testing.T) {
}
i++
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
dom.DDL().SetHook(hook)
})
tk.MustExec("alter table t modify column b tinyint NOT NULL")
dom.DDL().SetHook(originalHook)
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated")
require.NoError(t, checkErr)
// Since getReorgInfo will stagnate StateWriteReorganization for a ddl round, so insert should exec 3 times.
tk.MustQuery("select * from t order by a").Check(testkit.Rows("1 -1", "2 -2", "3 3", "4 4", "5 5"))
Expand Down
Loading

0 comments on commit 68b529e

Please sign in to comment.