From 313fe33c6f920d6288058681bbe9bd9f8917ed53 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 10 Mar 2026 10:50:55 +0800 Subject: [PATCH 01/11] alpha version --- .../persist_storage_ddl_handlers.go | 51 ++++++++++++++++--- .../schemastore/persist_storage_test.go | 38 ++++++++++++++ 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index b06d69cb2f..26dfa37c83 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -14,6 +14,7 @@ package schemastore import ( + "bytes" "errors" "fmt" "strings" @@ -23,9 +24,12 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/util/mock" "go.uber.org/zap" ) @@ -2870,15 +2874,9 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte if allFiltered { return commonEvent.DDLEvent{}, false, err } - querys, err := commonEvent.SplitQueries(rawEvent.Query) + querys, err := splitCreateTablesQueries(rawEvent) if err != nil { - log.Panic("split queries failed", zap.Error(err)) - } - if len(querys) != len(rawEvent.MultipleTableInfos) { - log.Panic("query count not match table count", - zap.Int("queryCount", len(querys)), - zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), - zap.String("query", rawEvent.Query)) + return commonEvent.DDLEvent{}, false, err } ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount) addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount) @@ -2938,6 +2936,43 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte return ddlEvent, true, err } +func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { + querys, err := commonEvent.SplitQueries(rawEvent.Query) + if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) { + return querys, nil + } + + fields := []zap.Field{ + zap.Int("queryCount", len(querys)), + zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), + zap.String("query", rawEvent.Query), + } + if err != nil { + fields = append(fields, zap.Error(err)) + } + log.Warn("create tables query count not match table count rebuild queries by table info", fields...) + + ctx := mock.NewContext() + rebuiltQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos)) + for _, tableInfo := range rawEvent.MultipleTableInfos { + if tableInfo == nil { + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil table info in create tables ddl")) + } + + var queryBuilder bytes.Buffer + if err := executor.ConstructResultOfShowCreateTable(ctx, tableInfo, autoid.Allocators{}, &queryBuilder); err != nil { + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) + } + + query := queryBuilder.String() + if !strings.HasSuffix(query, ";") { + query += ";" + } + rebuiltQuerys = append(rebuiltQuerys, query) + } + return rebuiltQuerys, nil +} + func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) if err != nil { diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index b85a56cd57..886997277e 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3654,6 +3654,44 @@ func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T) require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs) } +func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { + tableInfo1 := newEligibleTableInfoForTest(201, "t_26") + tableInfo2 := newEligibleTableInfoForTest(202, "t_27") + tableInfo1.State = model.StatePublic + tableInfo2.State = model.StatePublic + for _, column := range tableInfo1.Columns { + column.State = model.StatePublic + } + for _, column := range tableInfo2.Columns { + column.State = model.StatePublic + } + + rawEvent := &PersistedDDLEvent{ + Type: byte(model.ActionCreateTables), + SchemaID: 100, + SchemaName: "test", + Query: "CREATE TABLE `t_26` (`a` INT PRIMARY KEY,`b` INT)", + MultipleTableInfos: []*model.TableInfo{ + tableInfo1, + tableInfo2, + }, + } + + ddlEvent, ok, err := buildDDLEventForCreateTables(rawEvent, nil, 0) + require.NoError(t, err) + require.True(t, ok) + + querys, err := commonEvent.SplitQueries(ddlEvent.Query) + require.NoError(t, err) + require.Len(t, querys, 2) + require.Contains(t, querys[0], "CREATE TABLE `t_26`") + require.Contains(t, querys[1], "CREATE TABLE `t_27`") + + require.Len(t, ddlEvent.NeedAddedTables, 2) + require.Equal(t, int64(201), ddlEvent.NeedAddedTables[0].TableID) + require.Equal(t, int64(202), ddlEvent.NeedAddedTables[1].TableID) +} + func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) { args := updateDDLHistoryFuncArgs{ ddlEvent: &PersistedDDLEvent{ From 7dd16a5ab064189e4c1b1018c32e3aee54ba052b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 14:56:52 +0800 Subject: [PATCH 02/11] fix --- logservice/schemastore/persist_storage.go | 9 ++-- .../persist_storage_ddl_handlers.go | 42 +++++++++++++------ .../schemastore/persist_storage_test.go | 32 +++++++++++++- 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index d772415187..657321a508 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -459,7 +459,7 @@ func (p *persistentStorage) fetchTableDDLEvents(dispatcherID common.DispatcherID events := make([]commonEvent.DDLEvent, 0, len(allTargetTs)) for _, ts := range allTargetTs { rawEvent := readPersistedDDLEvent(storageSnap, ts) - ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, tableID) + ddlEvent, ok, err := p.buildDDLEvent(&rawEvent, tableFilter, tableID) if err != nil { return nil, errors.Trace(err) } @@ -530,7 +530,7 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter for _, ts := range allTargetTs { rawEvent := readPersistedDDLEvent(storageSnap, ts) // the tableID of buildDDLEvent is not used in this function, set it to 0 - ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, 0) + ddlEvent, ok, err := p.buildDDLEvent(&rawEvent, tableFilter, 0) if err != nil { return nil, errors.Trace(err) } @@ -881,10 +881,13 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool { // NOTE: tableID is only used in fetchTableDDLEvents to fetch exchange table partition and rename tables DDL // for the corresponding dispatcher. // It's not used in fetchTableTriggerDDLEvents, so it can be 0. -func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { +func (p *persistentStorage) buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { handler, ok := allDDLHandlers[model.ActionType(rawEvent.Type)] if !ok { log.Panic("unknown ddl type", zap.Any("ddlType", rawEvent.Type), zap.String("query", rawEvent.Query)) } + if handler.buildDDLEventWithStorageFunc != nil { + return handler.buildDDLEventWithStorageFunc(rawEvent, tableFilter, tableID, p.kvStorage) + } return handler.buildDDLEventFunc(rawEvent, tableFilter, tableID) } diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 26dfa37c83..c88ac14090 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -25,11 +25,12 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/util/mock" + "github.com/pingcap/tidb/pkg/session" "go.uber.org/zap" ) @@ -119,6 +120,8 @@ type persistStorageDDLHandler struct { // see the details in buildDDLEventForExchangeTablePartition and buildDDLEventForRenameTables. // For other DDLs, tableID is not used and can be set to 0. buildDDLEventFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) + // buildDDLEventWithStorageFunc is used by DDLs whose event construction needs access to kv storage. + buildDDLEventWithStorageFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64, storage kv.Storage) (commonEvent.DDLEvent, bool, error) } var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ @@ -402,13 +405,14 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForRenameTables, }, model.ActionCreateTables: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTables, - updateDDLHistoryFunc: updateDDLHistoryForCreateTables, - updateFullTableInfoFunc: updateFullTableInfoForMultiTablesDDL, - updateSchemaMetadataFunc: updateSchemaMetadataForCreateTables, - iterateEventTablesFunc: iterateEventTablesForCreateTables, - extractTableInfoFunc: extractTableInfoFuncForCreateTables, - buildDDLEventFunc: buildDDLEventForCreateTables, + buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTables, + updateDDLHistoryFunc: updateDDLHistoryForCreateTables, + updateFullTableInfoFunc: updateFullTableInfoForMultiTablesDDL, + updateSchemaMetadataFunc: updateSchemaMetadataForCreateTables, + iterateEventTablesFunc: iterateEventTablesForCreateTables, + extractTableInfoFunc: extractTableInfoFuncForCreateTables, + buildDDLEventFunc: buildDDLEventForCreateTables, + buildDDLEventWithStorageFunc: buildDDLEventForCreateTablesWithStorage, }, model.ActionMultiSchemaChange: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, @@ -2847,6 +2851,12 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { + return buildDDLEventForCreateTablesWithStorage(rawEvent, tableFilter, tableID, nil) +} + +func buildDDLEventForCreateTablesWithStorage( + rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64, storage kv.Storage, +) (commonEvent.DDLEvent, bool, error) { ddlEvent, _, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err @@ -2874,7 +2884,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte if allFiltered { return commonEvent.DDLEvent{}, false, err } - querys, err := splitCreateTablesQueries(rawEvent) + querys, err := splitCreateTablesQueries(rawEvent, storage) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2936,7 +2946,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte return ddlEvent, true, err } -func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { +func splitCreateTablesQueries(rawEvent *PersistedDDLEvent, storage kv.Storage) ([]string, error) { querys, err := commonEvent.SplitQueries(rawEvent.Query) if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) { return querys, nil @@ -2952,7 +2962,15 @@ func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { } log.Warn("create tables query count not match table count rebuild queries by table info", fields...) - ctx := mock.NewContext() + if storage == nil { + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil kv storage when rebuilding create tables queries")) + } + se, err := session.CreateSession(storage) + if err != nil { + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) + } + defer se.Close() + rebuiltQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos)) for _, tableInfo := range rawEvent.MultipleTableInfos { if tableInfo == nil { @@ -2960,7 +2978,7 @@ func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { } var queryBuilder bytes.Buffer - if err := executor.ConstructResultOfShowCreateTable(ctx, tableInfo, autoid.Allocators{}, &queryBuilder); err != nil { + if err := executor.ConstructResultOfShowCreateTable(se, tableInfo, autoid.Allocators{}, &queryBuilder); err != nil { return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 886997277e..d9cc12c706 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -27,8 +27,13 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -3659,6 +3664,8 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { tableInfo2 := newEligibleTableInfoForTest(202, "t_27") tableInfo1.State = model.StatePublic tableInfo2.State = model.StatePublic + tableInfo1.Columns[0].AddFlag(mysql.NotNullFlag) + tableInfo2.Columns[0].AddFlag(mysql.NotNullFlag) for _, column := range tableInfo1.Columns { column.State = model.StatePublic } @@ -3676,8 +3683,20 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { tableInfo2, }, } + originalQueries, err := commonEvent.SplitQueries(rawEvent.Query) + require.NoError(t, err) + require.Len(t, originalQueries, 1) + + store, err := mockstore.NewMockStore() + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + dom, err := session.BootstrapSession(store) + require.NoError(t, err) + defer dom.Close() - ddlEvent, ok, err := buildDDLEventForCreateTables(rawEvent, nil, 0) + ddlEvent, ok, err := buildDDLEventForCreateTablesWithStorage(rawEvent, nil, 0, store) require.NoError(t, err) require.True(t, ok) @@ -3690,6 +3709,17 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { require.Len(t, ddlEvent.NeedAddedTables, 2) require.Equal(t, int64(201), ddlEvent.NeedAddedTables[0].TableID) require.Equal(t, int64(202), ddlEvent.NeedAddedTables[1].TableID) + parser := parser.New() + for i, query := range querys { + tableInfo, err := dbutiltest.GetTableInfoBySQL(query, parser) + require.NoError(t, err) + require.Equal(t, rawEvent.MultipleTableInfos[i].Name.O, tableInfo.Name.O) + require.Len(t, tableInfo.Columns, 2) + require.Equal(t, "a", tableInfo.Columns[0].Name.O) + require.Equal(t, "b", tableInfo.Columns[1].Name.O) + require.NotNil(t, tableInfo.GetPkColInfo()) + require.Equal(t, "a", tableInfo.GetPkColInfo().Name.O) + } } func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) { From 2c3a1d10ac2f03e19c9d5a7a4c0febc0e3cdc3bf Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 15:35:06 +0800 Subject: [PATCH 03/11] fix --- logservice/schemastore/persist_storage.go | 10 ++-- .../persist_storage_ddl_handlers.go | 53 ++++++++++--------- .../schemastore/persist_storage_test.go | 27 ++++++---- 3 files changed, 49 insertions(+), 41 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 657321a508..867e483e85 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -459,7 +459,7 @@ func (p *persistentStorage) fetchTableDDLEvents(dispatcherID common.DispatcherID events := make([]commonEvent.DDLEvent, 0, len(allTargetTs)) for _, ts := range allTargetTs { rawEvent := readPersistedDDLEvent(storageSnap, ts) - ddlEvent, ok, err := p.buildDDLEvent(&rawEvent, tableFilter, tableID) + ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, tableID) if err != nil { return nil, errors.Trace(err) } @@ -530,7 +530,7 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter for _, ts := range allTargetTs { rawEvent := readPersistedDDLEvent(storageSnap, ts) // the tableID of buildDDLEvent is not used in this function, set it to 0 - ddlEvent, ok, err := p.buildDDLEvent(&rawEvent, tableFilter, 0) + ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, 0) if err != nil { return nil, errors.Trace(err) } @@ -747,6 +747,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { ddlEvent := handler.buildPersistedDDLEventFunc(buildPersistedDDLEventFuncArgs{ job: job, + kvStorage: p.kvStorage, databaseMap: p.databaseMap, tableMap: p.tableMap, partitionMap: p.partitionMap, @@ -881,13 +882,10 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool { // NOTE: tableID is only used in fetchTableDDLEvents to fetch exchange table partition and rename tables DDL // for the corresponding dispatcher. // It's not used in fetchTableTriggerDDLEvents, so it can be 0. -func (p *persistentStorage) buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { +func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { handler, ok := allDDLHandlers[model.ActionType(rawEvent.Type)] if !ok { log.Panic("unknown ddl type", zap.Any("ddlType", rawEvent.Type), zap.String("query", rawEvent.Query)) } - if handler.buildDDLEventWithStorageFunc != nil { - return handler.buildDDLEventWithStorageFunc(rawEvent, tableFilter, tableID, p.kvStorage) - } return handler.buildDDLEventFunc(rawEvent, tableFilter, tableID) } diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index c88ac14090..e17e378558 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -43,6 +43,7 @@ const ( type buildPersistedDDLEventFuncArgs struct { job *model.Job + kvStorage kv.Storage databaseMap map[int64]*BasicDatabaseInfo tableMap map[int64]*BasicTableInfo partitionMap map[int64]BasicPartitionInfo @@ -120,8 +121,6 @@ type persistStorageDDLHandler struct { // see the details in buildDDLEventForExchangeTablePartition and buildDDLEventForRenameTables. // For other DDLs, tableID is not used and can be set to 0. buildDDLEventFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) - // buildDDLEventWithStorageFunc is used by DDLs whose event construction needs access to kv storage. - buildDDLEventWithStorageFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64, storage kv.Storage) (commonEvent.DDLEvent, bool, error) } var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ @@ -405,14 +404,13 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForRenameTables, }, model.ActionCreateTables: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTables, - updateDDLHistoryFunc: updateDDLHistoryForCreateTables, - updateFullTableInfoFunc: updateFullTableInfoForMultiTablesDDL, - updateSchemaMetadataFunc: updateSchemaMetadataForCreateTables, - iterateEventTablesFunc: iterateEventTablesForCreateTables, - extractTableInfoFunc: extractTableInfoFuncForCreateTables, - buildDDLEventFunc: buildDDLEventForCreateTables, - buildDDLEventWithStorageFunc: buildDDLEventForCreateTablesWithStorage, + buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTables, + updateDDLHistoryFunc: updateDDLHistoryForCreateTables, + updateFullTableInfoFunc: updateFullTableInfoForMultiTablesDDL, + updateSchemaMetadataFunc: updateSchemaMetadataForCreateTables, + iterateEventTablesFunc: iterateEventTablesForCreateTables, + extractTableInfoFunc: extractTableInfoFuncForCreateTables, + buildDDLEventFunc: buildDDLEventForCreateTables, }, model.ActionMultiSchemaChange: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, @@ -1002,6 +1000,14 @@ func buildPersistedDDLEventForCreateTables(args buildPersistedDDLEventFuncArgs) event := buildPersistedDDLEventCommon(args) event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos + querys, err := commonEvent.SplitQueries(event.Query) + if err != nil || len(querys) != len(event.MultipleTableInfos) { + rebuiltQuerys, rebuildErr := rebuildCreateTablesQueries(event.Query, event.MultipleTableInfos, args.kvStorage) + if rebuildErr != nil { + log.Panic("rebuild create tables queries failed", zap.Error(rebuildErr), zap.String("query", event.Query)) + } + event.Query = strings.Join(rebuiltQuerys, "") + } return event } @@ -2851,12 +2857,6 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - return buildDDLEventForCreateTablesWithStorage(rawEvent, tableFilter, tableID, nil) -} - -func buildDDLEventForCreateTablesWithStorage( - rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64, storage kv.Storage, -) (commonEvent.DDLEvent, bool, error) { ddlEvent, _, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err @@ -2884,7 +2884,7 @@ func buildDDLEventForCreateTablesWithStorage( if allFiltered { return commonEvent.DDLEvent{}, false, err } - querys, err := splitCreateTablesQueries(rawEvent, storage) + querys, err := splitCreateTablesQueries(rawEvent) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2946,22 +2946,25 @@ func buildDDLEventForCreateTablesWithStorage( return ddlEvent, true, err } -func splitCreateTablesQueries(rawEvent *PersistedDDLEvent, storage kv.Storage) ([]string, error) { +func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { querys, err := commonEvent.SplitQueries(rawEvent.Query) if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) { return querys, nil } + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("create tables query count not match table count")) +} +func rebuildCreateTablesQueries(query string, tableInfos []*model.TableInfo, storage kv.Storage) ([]string, error) { fields := []zap.Field{ - zap.Int("queryCount", len(querys)), - zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), - zap.String("query", rawEvent.Query), + zap.Int("tableCount", len(tableInfos)), + zap.String("query", query), } - if err != nil { + if splitQueries, err := commonEvent.SplitQueries(query); err != nil { fields = append(fields, zap.Error(err)) + } else { + fields = append(fields, zap.Int("queryCount", len(splitQueries))) } log.Warn("create tables query count not match table count rebuild queries by table info", fields...) - if storage == nil { return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil kv storage when rebuilding create tables queries")) } @@ -2971,8 +2974,8 @@ func splitCreateTablesQueries(rawEvent *PersistedDDLEvent, storage kv.Storage) ( } defer se.Close() - rebuiltQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos)) - for _, tableInfo := range rawEvent.MultipleTableInfos { + rebuiltQuerys := make([]string, 0, len(tableInfos)) + for _, tableInfo := range tableInfos { if tableInfo == nil { return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil table info in create tables ddl")) } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index d9cc12c706..7c184b1c0d 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3673,17 +3673,18 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { column.State = model.StatePublic } - rawEvent := &PersistedDDLEvent{ - Type: byte(model.ActionCreateTables), - SchemaID: 100, - SchemaName: "test", - Query: "CREATE TABLE `t_26` (`a` INT PRIMARY KEY,`b` INT)", - MultipleTableInfos: []*model.TableInfo{ - tableInfo1, - tableInfo2, + job := &model.Job{ + Type: model.ActionCreateTables, + SchemaID: 100, + Query: "CREATE TABLE `t_26` (`a` INT PRIMARY KEY,`b` INT)", + BinlogInfo: &model.HistoryInfo{ + MultipleTableInfos: []*model.TableInfo{ + tableInfo1, + tableInfo2, + }, }, } - originalQueries, err := commonEvent.SplitQueries(rawEvent.Query) + originalQueries, err := commonEvent.SplitQueries(job.Query) require.NoError(t, err) require.Len(t, originalQueries, 1) @@ -3696,7 +3697,13 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { require.NoError(t, err) defer dom.Close() - ddlEvent, ok, err := buildDDLEventForCreateTablesWithStorage(rawEvent, nil, 0, store) + rawEvent := buildPersistedDDLEventForCreateTables(buildPersistedDDLEventFuncArgs{ + job: job, + kvStorage: store, + databaseMap: map[int64]*BasicDatabaseInfo{100: {Name: "test"}}, + }) + + ddlEvent, ok, err := buildDDLEventForCreateTables(&rawEvent, nil, 0) require.NoError(t, err) require.True(t, ok) From 57c292cd419c4d2976e470d3458e3447dad8f2ff Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 16:00:29 +0800 Subject: [PATCH 04/11] fix --- logservice/schemastore/persist_storage_ddl_handlers.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index e17e378558..f7d22ede1b 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "strings" + "unicode" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" @@ -2985,7 +2986,7 @@ func rebuildCreateTablesQueries(query string, tableInfos []*model.TableInfo, sto return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) } - query := queryBuilder.String() + query := strings.TrimRightFunc(queryBuilder.String(), unicode.IsSpace) if !strings.HasSuffix(query, ";") { query += ";" } From d785f8782fc243ea05d3c920f1400d7e7f32f446 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 17:15:15 +0800 Subject: [PATCH 05/11] fix --- logservice/schemastore/persist_storage.go | 1 - .../persist_storage_ddl_handlers.go | 57 ++----------------- .../schemastore/persist_storage_test.go | 41 ++----------- tests/integration_tests/_utils/run_sql | 2 +- tests/integration_tests/_utils/run_sql_file | 2 +- .../_utils/run_sql_ignore_error | 2 +- .../integration_tests/batch_add_table/run.sh | 16 ++++++ 7 files changed, 29 insertions(+), 92 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 867e483e85..d772415187 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -747,7 +747,6 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { ddlEvent := handler.buildPersistedDDLEventFunc(buildPersistedDDLEventFuncArgs{ job: job, - kvStorage: p.kvStorage, databaseMap: p.databaseMap, tableMap: p.tableMap, partitionMap: p.partitionMap, diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index f7d22ede1b..2eb09fef4f 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -14,24 +14,18 @@ package schemastore import ( - "bytes" "errors" "fmt" "strings" - "unicode" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" - "github.com/pingcap/tidb/pkg/executor" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/session" "go.uber.org/zap" ) @@ -44,7 +38,6 @@ const ( type buildPersistedDDLEventFuncArgs struct { job *model.Job - kvStorage kv.Storage databaseMap map[int64]*BasicDatabaseInfo tableMap map[int64]*BasicTableInfo partitionMap map[int64]BasicPartitionInfo @@ -1001,14 +994,6 @@ func buildPersistedDDLEventForCreateTables(args buildPersistedDDLEventFuncArgs) event := buildPersistedDDLEventCommon(args) event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos - querys, err := commonEvent.SplitQueries(event.Query) - if err != nil || len(querys) != len(event.MultipleTableInfos) { - rebuiltQuerys, rebuildErr := rebuildCreateTablesQueries(event.Query, event.MultipleTableInfos, args.kvStorage) - if rebuildErr != nil { - log.Panic("rebuild create tables queries failed", zap.Error(rebuildErr), zap.String("query", event.Query)) - } - event.Query = strings.Join(rebuiltQuerys, "") - } return event } @@ -2952,47 +2937,17 @@ func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) { return querys, nil } - return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("create tables query count not match table count")) -} - -func rebuildCreateTablesQueries(query string, tableInfos []*model.TableInfo, storage kv.Storage) ([]string, error) { fields := []zap.Field{ - zap.Int("tableCount", len(tableInfos)), - zap.String("query", query), + zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), + zap.String("query", rawEvent.Query), } - if splitQueries, err := commonEvent.SplitQueries(query); err != nil { + if err != nil { fields = append(fields, zap.Error(err)) } else { - fields = append(fields, zap.Int("queryCount", len(splitQueries))) - } - log.Warn("create tables query count not match table count rebuild queries by table info", fields...) - if storage == nil { - return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil kv storage when rebuilding create tables queries")) - } - se, err := session.CreateSession(storage) - if err != nil { - return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) + fields = append(fields, zap.Int("queryCount", len(querys))) } - defer se.Close() - - rebuiltQuerys := make([]string, 0, len(tableInfos)) - for _, tableInfo := range tableInfos { - if tableInfo == nil { - return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("nil table info in create tables ddl")) - } - - var queryBuilder bytes.Buffer - if err := executor.ConstructResultOfShowCreateTable(se, tableInfo, autoid.Allocators{}, &queryBuilder); err != nil { - return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, err) - } - - query := strings.TrimRightFunc(queryBuilder.String(), unicode.IsSpace) - if !strings.HasSuffix(query, ";") { - query += ";" - } - rebuiltQuerys = append(rebuiltQuerys, query) - } - return rebuiltQuerys, nil + log.Warn("create tables query count not match table count", fields...) + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("create tables query count not match table count")) } func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 7c184b1c0d..db7676c93a 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -27,13 +27,9 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/session" - "github.com/pingcap/tidb/pkg/store/mockstore" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -3688,45 +3684,16 @@ func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { require.NoError(t, err) require.Len(t, originalQueries, 1) - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - require.NoError(t, store.Close()) - }() - dom, err := session.BootstrapSession(store) - require.NoError(t, err) - defer dom.Close() - rawEvent := buildPersistedDDLEventForCreateTables(buildPersistedDDLEventFuncArgs{ job: job, - kvStorage: store, databaseMap: map[int64]*BasicDatabaseInfo{100: {Name: "test"}}, }) + require.Equal(t, job.Query, rawEvent.Query) ddlEvent, ok, err := buildDDLEventForCreateTables(&rawEvent, nil, 0) - require.NoError(t, err) - require.True(t, ok) - - querys, err := commonEvent.SplitQueries(ddlEvent.Query) - require.NoError(t, err) - require.Len(t, querys, 2) - require.Contains(t, querys[0], "CREATE TABLE `t_26`") - require.Contains(t, querys[1], "CREATE TABLE `t_27`") - - require.Len(t, ddlEvent.NeedAddedTables, 2) - require.Equal(t, int64(201), ddlEvent.NeedAddedTables[0].TableID) - require.Equal(t, int64(202), ddlEvent.NeedAddedTables[1].TableID) - parser := parser.New() - for i, query := range querys { - tableInfo, err := dbutiltest.GetTableInfoBySQL(query, parser) - require.NoError(t, err) - require.Equal(t, rawEvent.MultipleTableInfos[i].Name.O, tableInfo.Name.O) - require.Len(t, tableInfo.Columns, 2) - require.Equal(t, "a", tableInfo.Columns[0].Name.O) - require.Equal(t, "b", tableInfo.Columns[1].Name.O) - require.NotNil(t, tableInfo.GetPkColInfo()) - require.Equal(t, "a", tableInfo.GetPkColInfo().Name.O) - } + require.Error(t, err) + require.False(t, ok) + require.Equal(t, commonEvent.DDLEvent{}, ddlEvent) } func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) { diff --git a/tests/integration_tests/_utils/run_sql b/tests/integration_tests/_utils/run_sql index e3d84b271d..6c4b174f9e 100755 --- a/tests/integration_tests/_utils/run_sql +++ b/tests/integration_tests/_utils/run_sql @@ -25,7 +25,7 @@ if [ $# -gt 1 ]; then other=${*} fi -prepare="set global tidb_enable_clustered_index = 'int_only';" +prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" diff --git a/tests/integration_tests/_utils/run_sql_file b/tests/integration_tests/_utils/run_sql_file index 4d1f1421e7..65af628ce4 100755 --- a/tests/integration_tests/_utils/run_sql_file +++ b/tests/integration_tests/_utils/run_sql_file @@ -5,7 +5,7 @@ set -eu -prepare="set global tidb_enable_clustered_index = 'int_only';" +prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" diff --git a/tests/integration_tests/_utils/run_sql_ignore_error b/tests/integration_tests/_utils/run_sql_ignore_error index a959dd2b18..b039980312 100755 --- a/tests/integration_tests/_utils/run_sql_ignore_error +++ b/tests/integration_tests/_utils/run_sql_ignore_error @@ -23,7 +23,7 @@ if [ $# -gt 1 ]; then other=${*} fi -prepare="set global tidb_enable_clustered_index = 'int_only';" +prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index e50b367c97..9c35c41ab7 100755 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -8,6 +8,16 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function get_tidb_release_version() { + mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e "select tidb_version()\G" | grep "Release Version:" | awk -F': ' '{print $2}' | tr -d '[:space:]' +} + +function version_lt() { + local lhs=$(echo "$1" | sed -E 's/^v//; s/[^0-9.].*$//') + local rhs=$(echo "$2" | sed -E 's/^v//; s/[^0-9.].*$//') + [[ "$lhs" != "$rhs" && "$(printf '%s\n%s\n' "$lhs" "$rhs" | sort -V | head -n1)" == "$lhs" ]] +} + function run_with_fast_create_table() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -66,6 +76,12 @@ function run_without_fast_create_table() { start_tidb_cluster --workdir $WORK_DIR + local tidb_release_version=$(get_tidb_release_version) + if version_lt "$tidb_release_version" "v8.5.0"; then + echo "[$(date)] skip $TEST_NAME because TiDB release version $tidb_release_version is smaller than v8.5.0" + exit 0 + fi + run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY From cbb382e53b37b0f438a430b0df4d104edbaee5a8 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 17:31:11 +0800 Subject: [PATCH 06/11] fix --- tests/integration_tests/batch_add_table/run.sh | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index 9c35c41ab7..b2113c45e6 100755 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -8,16 +8,6 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -function get_tidb_release_version() { - mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e "select tidb_version()\G" | grep "Release Version:" | awk -F': ' '{print $2}' | tr -d '[:space:]' -} - -function version_lt() { - local lhs=$(echo "$1" | sed -E 's/^v//; s/[^0-9.].*$//') - local rhs=$(echo "$2" | sed -E 's/^v//; s/[^0-9.].*$//') - [[ "$lhs" != "$rhs" && "$(printf '%s\n%s\n' "$lhs" "$rhs" | sort -V | head -n1)" == "$lhs" ]] -} - function run_with_fast_create_table() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -76,11 +66,7 @@ function run_without_fast_create_table() { start_tidb_cluster --workdir $WORK_DIR - local tidb_release_version=$(get_tidb_release_version) - if version_lt "$tidb_release_version" "v8.5.0"; then - echo "[$(date)] skip $TEST_NAME because TiDB release version $tidb_release_version is smaller than v8.5.0" - exit 0 - fi + skip_if_tidb_version_less_than "v8.5.0" run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT} From 293c9919536c1f965bf74da43a8831c089954e12 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 17:40:11 +0800 Subject: [PATCH 07/11] fix --- tests/integration_tests/_utils/run_sql | 2 +- tests/integration_tests/_utils/run_sql_file | 2 +- tests/integration_tests/_utils/run_sql_ignore_error | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/_utils/run_sql b/tests/integration_tests/_utils/run_sql index 6c4b174f9e..e3d84b271d 100755 --- a/tests/integration_tests/_utils/run_sql +++ b/tests/integration_tests/_utils/run_sql @@ -25,7 +25,7 @@ if [ $# -gt 1 ]; then other=${*} fi -prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" +prepare="set global tidb_enable_clustered_index = 'int_only';" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" diff --git a/tests/integration_tests/_utils/run_sql_file b/tests/integration_tests/_utils/run_sql_file index 65af628ce4..4d1f1421e7 100755 --- a/tests/integration_tests/_utils/run_sql_file +++ b/tests/integration_tests/_utils/run_sql_file @@ -5,7 +5,7 @@ set -eu -prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" +prepare="set global tidb_enable_clustered_index = 'int_only';" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" diff --git a/tests/integration_tests/_utils/run_sql_ignore_error b/tests/integration_tests/_utils/run_sql_ignore_error index b039980312..a959dd2b18 100755 --- a/tests/integration_tests/_utils/run_sql_ignore_error +++ b/tests/integration_tests/_utils/run_sql_ignore_error @@ -23,7 +23,7 @@ if [ $# -gt 1 ]; then other=${*} fi -prepare="set global tidb_enable_clustered_index = 'int_only'; set global tidb_enable_fast_create_table = OFF;" +prepare="set global tidb_enable_clustered_index = 'int_only';" # Check if OUT_DIR contains TEST_NAME, if so, truncate OUT_DIR to the parent directory ADJUSTED_OUT_DIR="$OUT_DIR" From 1949467b6a60e7a996af5cf6da7eaa772d9158a8 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 17:58:14 +0800 Subject: [PATCH 08/11] fix --- tests/integration_tests/batch_add_table/run.sh | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index b2113c45e6..0f196802b3 100755 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -66,8 +66,6 @@ function run_without_fast_create_table() { start_tidb_cluster --workdir $WORK_DIR - skip_if_tidb_version_less_than "v8.5.0" - run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY @@ -101,7 +99,15 @@ function run_without_fast_create_table() { trap 'stop_test $WORK_DIR' EXIT run_without_fast_create_table $* +tidb_release_version=$(get_tidb_release_version || true) stop_tidb_cluster -run_with_fast_create_table $* +if [ -z "$tidb_release_version" ]; then + echo "[$(date)] failed to parse TiDB release version, continue fast create table test case" + run_with_fast_create_table $* +elif tidb_version_less_than "$tidb_release_version" "v8.5.0"; then + echo "[$(date)] <<<<<< skip fast create table part of $TEST_NAME, TiDB version ${tidb_release_version} is less than v8.5.0 >>>>>>" +else + run_with_fast_create_table $* +fi check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 0ef363d7178f90bc5eec96bd9201bab747952a0c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 20:19:06 +0800 Subject: [PATCH 09/11] fix --- tests/integration_tests/_utils/test_prepare | 1 + tests/integration_tests/partial_index/run.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/integration_tests/_utils/test_prepare b/tests/integration_tests/_utils/test_prepare index 4d087151fd..48a7dbea36 100644 --- a/tests/integration_tests/_utils/test_prepare +++ b/tests/integration_tests/_utils/test_prepare @@ -162,6 +162,7 @@ function skip_if_tidb_version_less_than() { fi if tidb_version_less_than "$tidbReleaseVersion" "$minVersion"; then + stop_tidb_cluster echo "[$(date)] <<<<<< skip test case ${testName}, TiDB version ${tidbReleaseVersion} is less than ${minVersion} >>>>>>" exit 0 fi diff --git a/tests/integration_tests/partial_index/run.sh b/tests/integration_tests/partial_index/run.sh index abc1d99c64..17fa4e3f45 100644 --- a/tests/integration_tests/partial_index/run.sh +++ b/tests/integration_tests/partial_index/run.sh @@ -12,6 +12,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_tidb_version_less_than "v8.5.0" cd $WORK_DIR From b0990eada4831de9a65b5e892a6e26af41ca5f1d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 12 Mar 2026 23:24:53 +0800 Subject: [PATCH 10/11] try fix --- .../same_upstream_downstream/run.sh | 24 +++++++++++++++++++ .../integration_tests/tidb_mysql_test/run.sh | 24 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/tests/integration_tests/same_upstream_downstream/run.sh b/tests/integration_tests/same_upstream_downstream/run.sh index e8d126e45f..d4f5c4f366 100644 --- a/tests/integration_tests/same_upstream_downstream/run.sh +++ b/tests/integration_tests/same_upstream_downstream/run.sh @@ -10,6 +10,29 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function skip_if_not_tidb_8_5() { + local tidbReleaseVersion + tidbReleaseVersion=$(get_tidb_release_version || true) + if [ -z "$tidbReleaseVersion" ]; then + echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME" + return + fi + + local versionTriplet + if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then + echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME" + return + fi + + local major minor patch + read -r major minor patch <<<"$versionTriplet" + if [ "$major" != "8" ] || [ "$minor" != "5" ]; then + stop_tidb_cluster + echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>" + exit 0 + fi +} + function run() { # Only meaningful for MySQL/TiDB sink. if [ "$SINK_TYPE" != "mysql" ]; then @@ -19,6 +42,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_not_tidb_8_5 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY UP_SINK_URI="mysql://root@${UP_TIDB_HOST}:${UP_TIDB_PORT}/" diff --git a/tests/integration_tests/tidb_mysql_test/run.sh b/tests/integration_tests/tidb_mysql_test/run.sh index d8b2ea75fa..fbe80981d7 100755 --- a/tests/integration_tests/tidb_mysql_test/run.sh +++ b/tests/integration_tests/tidb_mysql_test/run.sh @@ -8,11 +8,35 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function skip_if_not_tidb_8_5() { + local tidbReleaseVersion + tidbReleaseVersion=$(get_tidb_release_version || true) + if [ -z "$tidbReleaseVersion" ]; then + echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME" + return + fi + + local versionTriplet + if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then + echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME" + return + fi + + local major minor patch + read -r major minor patch <<<"$versionTriplet" + if [ "$major" != "8" ] || [ "$minor" != "5" ]; then + stop_tidb_cluster + echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>" + exit 0 + fi +} + function prepare() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR stop_tidb_cluster start_tidb_cluster --workdir $WORK_DIR + skip_if_not_tidb_8_5 # record tso before we create tables to skip the system table DDLs start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) From 63803ebab88cbbd6c71960fdc1090bf169479913 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 14 Mar 2026 17:02:01 +0800 Subject: [PATCH 11/11] fix --- tests/integration_tests/generate_column/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration_tests/generate_column/run.sh b/tests/integration_tests/generate_column/run.sh index 44e8f629b1..fe55ea9009 100755 --- a/tests/integration_tests/generate_column/run.sh +++ b/tests/integration_tests/generate_column/run.sh @@ -12,6 +12,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_tidb_version_less_than "v8.5.0" # record tso before we create tables to skip the system table DDLs start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})