Skip to content

Commit e44fadc

Browse files
authored
ddl: use real start ts for recover table snapshot (#68229)
close #68222
1 parent 5634bd6 commit e44fadc

4 files changed

Lines changed: 243 additions & 19 deletions

File tree

pkg/ddl/ddl.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,19 +1331,30 @@ var (
13311331
RunInGoTest bool
13321332
)
13331333

1334+
// GetRecoverSnapshotTS returns the snapshot timestamp for reconstructing
1335+
// metadata from a finished drop/truncate table or drop schema job.
1336+
func GetRecoverSnapshotTS(job *model.Job) uint64 {
1337+
if job.RealStartTS != 0 {
1338+
return job.RealStartTS
1339+
}
1340+
return job.StartTS
1341+
}
1342+
13341343
// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs
13351344
func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
13361345
for _, job := range jobs {
1346+
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
1347+
continue
1348+
}
1349+
1350+
snapshotTS := GetRecoverSnapshotTS(job)
13371351
// Check GC safe point for getting snapshot infoSchema.
1338-
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
1352+
err := gcutil.ValidateSnapshotWithGCSafePoint(snapshotTS, gcSafePoint)
13391353
if err != nil {
13401354
return false, err
13411355
}
1342-
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
1343-
continue
1344-
}
13451356

1346-
tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID)
1357+
tbl, err := getTable(snapshotTS, job.SchemaID, job.TableID)
13471358
if err != nil {
13481359
if meta.ErrDBNotExists.Equal(err) {
13491360
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,

pkg/ddl/tests/serial/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ go_test(
88
"serial_test.go",
99
],
1010
flaky = True,
11-
shard_count = 20,
11+
shard_count = 22,
1212
deps = [
1313
"//pkg/config",
1414
"//pkg/config/kerneltype",

pkg/ddl/tests/serial/serial_test.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math"
21+
"strconv"
2122
"strings"
2223
"sync"
2324
"sync/atomic"
@@ -643,6 +644,213 @@ func TestRecoverTableByJobID(t *testing.T) {
643644
require.Equal(t, false, gcEnable)
644645
}
645646

647+
func TestRecoverTableUsesRealStartTSForQueuedDropTable(t *testing.T) {
648+
store := createMockStore(t)
649+
tk := testkit.NewTestKit(t, store)
650+
tk.MustExec("create database if not exists test_recover")
651+
tk.MustExec("use test_recover")
652+
tk.MustExec("drop table if exists t_recover_snapshot")
653+
tk.MustExec("create table t_recover_snapshot (id int primary key, col_a int, col_b int)")
654+
tk.MustExec("insert into t_recover_snapshot values (1, 11, 21)")
655+
656+
defer func(originGC bool) {
657+
if originGC {
658+
util.EmulatorGCEnable()
659+
} else {
660+
util.EmulatorGCDisable()
661+
}
662+
}(util.IsEmulatorGCEnable())
663+
util.EmulatorGCDisable()
664+
665+
var pauseSchedule atomic.Bool
666+
waitSchCh := make(chan struct{})
667+
var closeSchedule sync.Once
668+
releaseSchedule := func() {
669+
pauseSchedule.Store(false)
670+
closeSchedule.Do(func() { close(waitSchCh) })
671+
}
672+
t.Cleanup(releaseSchedule)
673+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadAndDeliverJobs", func() {
674+
if pauseSchedule.Load() {
675+
<-waitSchCh
676+
}
677+
})
678+
pauseSchedule.Store(true)
679+
680+
submittedCh := make(chan struct{}, 2)
681+
submitGate := make(chan struct{})
682+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted", func() {
683+
submittedCh <- struct{}{}
684+
<-submitGate
685+
})
686+
waitSubmitted := func() {
687+
select {
688+
case <-submittedCh:
689+
submitGate <- struct{}{}
690+
case <-time.After(5 * time.Second):
691+
require.FailNow(t, "DDL job was not submitted")
692+
}
693+
}
694+
695+
// Two independent sessions are needed so the drop-table job can be queued
696+
// after drop-column is submitted but before drop-column has changed metadata.
697+
tkAlter := testkit.NewTestKit(t, store)
698+
tkAlter.MustExec("use test_recover")
699+
alterDoneCh := make(chan error, 1)
700+
go func() {
701+
_, err := tkAlter.Exec("alter table t_recover_snapshot drop column col_a")
702+
alterDoneCh <- err
703+
}()
704+
waitSubmitted()
705+
706+
tkDrop := testkit.NewTestKit(t, store)
707+
tkDrop.MustExec("use test_recover")
708+
dropDoneCh := make(chan error, 1)
709+
go func() {
710+
_, err := tkDrop.Exec("drop table t_recover_snapshot")
711+
dropDoneCh <- err
712+
}()
713+
waitSubmitted()
714+
715+
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted")
716+
releaseSchedule()
717+
require.NoError(t, <-alterDoneCh)
718+
require.NoError(t, <-dropDoneCh)
719+
720+
getHistoryJobID := func(jobType string) int64 {
721+
rows := tk.MustQuery(fmt.Sprintf(
722+
"admin show ddl jobs where db_name = 'test_recover' and table_name = 't_recover_snapshot' and job_type = '%s'",
723+
jobType,
724+
)).Rows()
725+
require.NotEmpty(t, rows)
726+
jobID, err := strconv.ParseInt(rows[0][0].(string), 10, 64)
727+
require.NoError(t, err)
728+
return jobID
729+
}
730+
731+
dropJobID := getHistoryJobID("drop table")
732+
dropJob, err := ddl.GetHistoryJobByID(tk.Session(), dropJobID)
733+
require.NoError(t, err)
734+
require.NotNil(t, dropJob)
735+
require.Greater(t, dropJob.RealStartTS, dropJob.StartTS)
736+
737+
gcTimeFormat := "20060102-15:04:05 -0700 MST"
738+
timeBeforeDrop := time.Now().Add(-48 * time.Hour).Format(gcTimeFormat)
739+
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
740+
ON DUPLICATE KEY
741+
UPDATE variable_value = '%[1]s'`
742+
tk.MustExec("delete from mysql.tidb where variable_name in ('tikv_gc_safe_point','tikv_gc_enable')")
743+
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
744+
require.NoError(t, gcutil.EnableGC(tk.Session()))
745+
746+
tk.MustExec(fmt.Sprintf("recover table by job %d", dropJobID))
747+
tk.MustQuery("select column_name from information_schema.columns where table_schema = 'test_recover' and table_name = 't_recover_snapshot' order by ordinal_position").Check(testkit.Rows("id", "col_b"))
748+
tk.MustQuery("select id, col_b from t_recover_snapshot").Check(testkit.Rows("1 21"))
749+
750+
recoverJobID := getHistoryJobID("recover table")
751+
recoverJob, err := ddl.GetHistoryJobByID(tk.Session(), recoverJobID)
752+
require.NoError(t, err)
753+
require.NotNil(t, recoverJob)
754+
require.NotNil(t, recoverJob.BinlogInfo.TableInfo)
755+
colNames := make([]string, 0, len(recoverJob.BinlogInfo.TableInfo.Columns))
756+
for _, col := range recoverJob.BinlogInfo.TableInfo.Columns {
757+
colNames = append(colNames, col.Name.L)
758+
}
759+
require.Equal(t, []string{"id", "col_b"}, colNames)
760+
}
761+
762+
func TestFlashbackDatabaseUsesRealStartTSForQueuedDropSchema(t *testing.T) {
763+
store := createMockStore(t)
764+
tk := testkit.NewTestKit(t, store)
765+
tk.MustExec("drop database if exists test_recover_schema_snapshot")
766+
tk.MustExec("create database test_recover_schema_snapshot")
767+
tk.MustExec("create table test_recover_schema_snapshot.t (id int primary key, col_a int, col_b int)")
768+
tk.MustExec("insert into test_recover_schema_snapshot.t values (1, 11, 21)")
769+
770+
defer func(originGC bool) {
771+
if originGC {
772+
util.EmulatorGCEnable()
773+
} else {
774+
util.EmulatorGCDisable()
775+
}
776+
}(util.IsEmulatorGCEnable())
777+
util.EmulatorGCDisable()
778+
779+
var pauseSchedule atomic.Bool
780+
waitSchCh := make(chan struct{})
781+
var closeSchedule sync.Once
782+
releaseSchedule := func() {
783+
pauseSchedule.Store(false)
784+
closeSchedule.Do(func() { close(waitSchCh) })
785+
}
786+
t.Cleanup(releaseSchedule)
787+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadAndDeliverJobs", func() {
788+
if pauseSchedule.Load() {
789+
<-waitSchCh
790+
}
791+
})
792+
pauseSchedule.Store(true)
793+
794+
submittedCh := make(chan struct{}, 2)
795+
submitGate := make(chan struct{})
796+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted", func() {
797+
submittedCh <- struct{}{}
798+
<-submitGate
799+
})
800+
waitSubmitted := func() {
801+
select {
802+
case <-submittedCh:
803+
submitGate <- struct{}{}
804+
case <-time.After(5 * time.Second):
805+
require.FailNow(t, "DDL job was not submitted")
806+
}
807+
}
808+
809+
tkAlter := testkit.NewTestKit(t, store)
810+
tkAlter.MustExec("use test_recover_schema_snapshot")
811+
alterDoneCh := make(chan error, 1)
812+
go func() {
813+
_, err := tkAlter.Exec("alter table t drop column col_a")
814+
alterDoneCh <- err
815+
}()
816+
waitSubmitted()
817+
818+
tkDrop := testkit.NewTestKit(t, store)
819+
dropDoneCh := make(chan error, 1)
820+
go func() {
821+
_, err := tkDrop.Exec("drop database test_recover_schema_snapshot")
822+
dropDoneCh <- err
823+
}()
824+
waitSubmitted()
825+
826+
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted")
827+
releaseSchedule()
828+
require.NoError(t, <-alterDoneCh)
829+
require.NoError(t, <-dropDoneCh)
830+
831+
rows := tk.MustQuery("admin show ddl jobs where db_name = 'test_recover_schema_snapshot' and job_type = 'drop schema'").Rows()
832+
require.NotEmpty(t, rows)
833+
dropJobID, err := strconv.ParseInt(rows[0][0].(string), 10, 64)
834+
require.NoError(t, err)
835+
dropJob, err := ddl.GetHistoryJobByID(tk.Session(), dropJobID)
836+
require.NoError(t, err)
837+
require.NotNil(t, dropJob)
838+
require.Greater(t, dropJob.RealStartTS, dropJob.StartTS)
839+
840+
gcTimeFormat := "20060102-15:04:05 -0700 MST"
841+
timeBeforeDrop := time.Now().Add(-48 * time.Hour).Format(gcTimeFormat)
842+
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
843+
ON DUPLICATE KEY
844+
UPDATE variable_value = '%[1]s'`
845+
tk.MustExec("delete from mysql.tidb where variable_name in ('tikv_gc_safe_point','tikv_gc_enable')")
846+
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
847+
require.NoError(t, gcutil.EnableGC(tk.Session()))
848+
849+
tk.MustExec("flashback database test_recover_schema_snapshot")
850+
tk.MustQuery("select column_name from information_schema.columns where table_schema = 'test_recover_schema_snapshot' and table_name = 't' order by ordinal_position").Check(testkit.Rows("id", "col_b"))
851+
tk.MustQuery("select id, col_b from test_recover_schema_snapshot.t").Check(testkit.Rows("1 21"))
852+
}
853+
646854
func TestRecoverTableByJobIDFail(t *testing.T) {
647855
store := createMockStore(t)
648856
tk := testkit.NewTestKit(t, store)

pkg/executor/ddl.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
449449
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been recover to '%-.192s', can't be recover repeatedly", tblInfo.Name.O, tbl.Meta().Name.O)
450450
}
451451

452-
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
452+
snapshotTS := ddl.GetRecoverSnapshotTS(job)
453+
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(snapshotTS)
453454
autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get()
454455
if err != nil {
455456
return err
@@ -459,7 +460,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
459460
SchemaID: job.SchemaID,
460461
TableInfo: tblInfo,
461462
DropJobID: job.ID,
462-
SnapshotTS: job.StartTS,
463+
SnapshotTS: snapshotTS,
463464
AutoIDs: autoIDs,
464465
OldSchemaName: job.SchemaName,
465466
OldTableName: tblInfo.Name.L,
@@ -487,14 +488,15 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, dom *domain.Do
487488
return nil, nil, errors.Errorf("Job %v type is %v, not dropped/truncated table", job.ID, job.Type)
488489
}
489490

491+
snapshotTS := ddl.GetRecoverSnapshotTS(job)
490492
// Check GC safe point for getting snapshot infoSchema.
491-
err = gcutil.ValidateSnapshot(e.Ctx(), job.StartTS)
493+
err = gcutil.ValidateSnapshot(e.Ctx(), snapshotTS)
492494
if err != nil {
493495
return nil, nil, err
494496
}
495497

496498
// Get the snapshot infoSchema before drop table.
497-
snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS)
499+
snapInfo, err := dom.GetSnapshotInfoSchema(snapshotTS)
498500
if err != nil {
499501
return nil, nil, err
500502
}
@@ -514,7 +516,7 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, dom *domain.Do
514516
}
515517

516518
// GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs,
517-
// it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information.
519+
// it will use the recover snapshot of DDL job to get the dropped/truncated table information.
518520
func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
519521
getTable := func(startTS uint64, schemaID int64, tableID int64) (*model.TableInfo, error) {
520522
snapMeta := dom.GetSnapshotMeta(startTS)
@@ -613,7 +615,8 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
613615
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been flashback to '%-.192s', can't be flashback repeatedly", s.Table.Name.O, tbl.Meta().Name.O)
614616
}
615617

616-
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
618+
snapshotTS := ddl.GetRecoverSnapshotTS(job)
619+
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(snapshotTS)
617620
autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get()
618621
if err != nil {
619622
return err
@@ -623,7 +626,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
623626
SchemaID: job.SchemaID,
624627
TableInfo: tblInfo,
625628
DropJobID: job.ID,
626-
SnapshotTS: job.StartTS,
629+
SnapshotTS: snapshotTS,
627630
AutoIDs: autoIDs,
628631
OldSchemaName: job.SchemaName,
629632
OldTableName: s.Table.Name.L,
@@ -672,15 +675,17 @@ func (e *DDLExec) getRecoverDBByName(schemaName ast.CIStr) (recoverSchemaInfo *m
672675
dom := domain.GetDomain(e.Ctx())
673676
fn := func(jobs []*model.Job) (bool, error) {
674677
for _, job := range jobs {
678+
if job.Type != model.ActionDropSchema {
679+
continue
680+
}
681+
682+
snapshotTS := ddl.GetRecoverSnapshotTS(job)
675683
// Check GC safe point for getting snapshot infoSchema.
676-
err = gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
684+
err = gcutil.ValidateSnapshotWithGCSafePoint(snapshotTS, gcSafePoint)
677685
if err != nil {
678686
return false, err
679687
}
680-
if job.Type != model.ActionDropSchema {
681-
continue
682-
}
683-
snapMeta := dom.GetSnapshotMeta(job.StartTS)
688+
snapMeta := dom.GetSnapshotMeta(snapshotTS)
684689
schemaInfo, err := snapMeta.GetDatabase(job.SchemaID)
685690
if err != nil {
686691
return false, err
@@ -698,7 +703,7 @@ func (e *DDLExec) getRecoverDBByName(schemaName ast.CIStr) (recoverSchemaInfo *m
698703
DBInfo: schemaInfo,
699704
LoadTablesOnExecute: true,
700705
DropJobID: job.ID,
701-
SnapshotTS: job.StartTS,
706+
SnapshotTS: snapshotTS,
702707
OldSchemaName: schemaName,
703708
}
704709
return true, nil

0 commit comments

Comments
 (0)