diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index b06d69cb2f..2eb09fef4f 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -2870,15 +2870,9 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte if allFiltered { return commonEvent.DDLEvent{}, false, err } - querys, err := commonEvent.SplitQueries(rawEvent.Query) + querys, err := splitCreateTablesQueries(rawEvent) if err != nil { - log.Panic("split queries failed", zap.Error(err)) - } - if len(querys) != len(rawEvent.MultipleTableInfos) { - log.Panic("query count not match table count", - zap.Int("queryCount", len(querys)), - zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), - zap.String("query", rawEvent.Query)) + return commonEvent.DDLEvent{}, false, err } ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount) addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount) @@ -2938,6 +2932,24 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte return ddlEvent, true, err } +func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) { + querys, err := commonEvent.SplitQueries(rawEvent.Query) + if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) { + return querys, nil + } + fields := []zap.Field{ + zap.Int("tableCount", len(rawEvent.MultipleTableInfos)), + zap.String("query", rawEvent.Query), + } + if err != nil { + fields = append(fields, zap.Error(err)) + } else { + fields = append(fields, zap.Int("queryCount", len(querys))) + } + log.Warn("create tables query count not match table count", fields...) + return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("create tables query count not match table count")) +} + func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) if err != nil { diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index b85a56cd57..db7676c93a 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -3654,6 +3655,47 @@ func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T) require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs) } +func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) { + tableInfo1 := newEligibleTableInfoForTest(201, "t_26") + tableInfo2 := newEligibleTableInfoForTest(202, "t_27") + tableInfo1.State = model.StatePublic + tableInfo2.State = model.StatePublic + tableInfo1.Columns[0].AddFlag(mysql.NotNullFlag) + tableInfo2.Columns[0].AddFlag(mysql.NotNullFlag) + for _, column := range tableInfo1.Columns { + column.State = model.StatePublic + } + for _, column := range tableInfo2.Columns { + column.State = model.StatePublic + } + + job := &model.Job{ + Type: model.ActionCreateTables, + SchemaID: 100, + Query: "CREATE TABLE `t_26` (`a` INT PRIMARY KEY,`b` INT)", + BinlogInfo: &model.HistoryInfo{ + MultipleTableInfos: []*model.TableInfo{ + tableInfo1, + tableInfo2, + }, + }, + } + originalQueries, err := commonEvent.SplitQueries(job.Query) + require.NoError(t, err) + require.Len(t, originalQueries, 1) + + rawEvent := buildPersistedDDLEventForCreateTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{100: {Name: "test"}}, + }) + require.Equal(t, job.Query, rawEvent.Query) + + ddlEvent, ok, err := buildDDLEventForCreateTables(&rawEvent, nil, 0) + require.Error(t, err) + require.False(t, ok) + require.Equal(t, commonEvent.DDLEvent{}, ddlEvent) +} + func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) { args := updateDDLHistoryFuncArgs{ ddlEvent: &PersistedDDLEvent{ diff --git a/tests/integration_tests/_utils/test_prepare b/tests/integration_tests/_utils/test_prepare index 4d087151fd..48a7dbea36 100644 --- a/tests/integration_tests/_utils/test_prepare +++ b/tests/integration_tests/_utils/test_prepare @@ -162,6 +162,7 @@ function skip_if_tidb_version_less_than() { fi if tidb_version_less_than "$tidbReleaseVersion" "$minVersion"; then + stop_tidb_cluster echo "[$(date)] <<<<<< skip test case ${testName}, TiDB version ${tidbReleaseVersion} is less than ${minVersion} >>>>>>" exit 0 fi diff --git a/tests/integration_tests/batch_add_table/run.sh b/tests/integration_tests/batch_add_table/run.sh index e50b367c97..0f196802b3 100755 --- a/tests/integration_tests/batch_add_table/run.sh +++ b/tests/integration_tests/batch_add_table/run.sh @@ -99,7 +99,15 @@ function run_without_fast_create_table() { trap 'stop_test $WORK_DIR' EXIT run_without_fast_create_table $* +tidb_release_version=$(get_tidb_release_version || true) stop_tidb_cluster -run_with_fast_create_table $* +if [ -z "$tidb_release_version" ]; then + echo "[$(date)] failed to parse TiDB release version, continue fast create table test case" + run_with_fast_create_table $* +elif tidb_version_less_than "$tidb_release_version" "v8.5.0"; then + echo "[$(date)] <<<<<< skip fast create table part of $TEST_NAME, TiDB version ${tidb_release_version} is less than v8.5.0 >>>>>>" +else + run_with_fast_create_table $* +fi check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/generate_column/run.sh b/tests/integration_tests/generate_column/run.sh index 44e8f629b1..fe55ea9009 100755 --- a/tests/integration_tests/generate_column/run.sh +++ b/tests/integration_tests/generate_column/run.sh @@ -12,6 +12,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_tidb_version_less_than "v8.5.0" # record tso before we create tables to skip the system table DDLs start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) diff --git a/tests/integration_tests/partial_index/run.sh b/tests/integration_tests/partial_index/run.sh index abc1d99c64..17fa4e3f45 100644 --- a/tests/integration_tests/partial_index/run.sh +++ b/tests/integration_tests/partial_index/run.sh @@ -12,6 +12,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_tidb_version_less_than "v8.5.0" cd $WORK_DIR diff --git a/tests/integration_tests/same_upstream_downstream/run.sh b/tests/integration_tests/same_upstream_downstream/run.sh index e8d126e45f..d4f5c4f366 100644 --- a/tests/integration_tests/same_upstream_downstream/run.sh +++ b/tests/integration_tests/same_upstream_downstream/run.sh @@ -10,6 +10,29 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function skip_if_not_tidb_8_5() { + local tidbReleaseVersion + tidbReleaseVersion=$(get_tidb_release_version || true) + if [ -z "$tidbReleaseVersion" ]; then + echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME" + return + fi + + local versionTriplet + if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then + echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME" + return + fi + + local major minor patch + read -r major minor patch <<<"$versionTriplet" + if [ "$major" != "8" ] || [ "$minor" != "5" ]; then + stop_tidb_cluster + echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>" + exit 0 + fi +} + function run() { # Only meaningful for MySQL/TiDB sink. if [ "$SINK_TYPE" != "mysql" ]; then @@ -19,6 +42,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_not_tidb_8_5 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY UP_SINK_URI="mysql://root@${UP_TIDB_HOST}:${UP_TIDB_PORT}/" diff --git a/tests/integration_tests/tidb_mysql_test/run.sh b/tests/integration_tests/tidb_mysql_test/run.sh index d8b2ea75fa..fbe80981d7 100755 --- a/tests/integration_tests/tidb_mysql_test/run.sh +++ b/tests/integration_tests/tidb_mysql_test/run.sh @@ -8,11 +8,35 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function skip_if_not_tidb_8_5() { + local tidbReleaseVersion + tidbReleaseVersion=$(get_tidb_release_version || true) + if [ -z "$tidbReleaseVersion" ]; then + echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME" + return + fi + + local versionTriplet + if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then + echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME" + return + fi + + local major minor patch + read -r major minor patch <<<"$versionTriplet" + if [ "$major" != "8" ] || [ "$minor" != "5" ]; then + stop_tidb_cluster + echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>" + exit 0 + fi +} + function prepare() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR stop_tidb_cluster start_tidb_cluster --workdir $WORK_DIR + skip_if_not_tidb_8_5 # record tso before we create tables to skip the system table DDLs start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})