Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2870,15 +2870,9 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
if allFiltered {
return commonEvent.DDLEvent{}, false, err
}
querys, err := commonEvent.SplitQueries(rawEvent.Query)
querys, err := splitCreateTablesQueries(rawEvent)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(querys) != len(rawEvent.MultipleTableInfos) {
log.Panic("query count not match table count",
zap.Int("queryCount", len(querys)),
zap.Int("tableCount", len(rawEvent.MultipleTableInfos)),
zap.String("query", rawEvent.Query))
return commonEvent.DDLEvent{}, false, err
}
ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount)
addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount)
Expand Down Expand Up @@ -2938,6 +2932,24 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
return ddlEvent, true, err
}

func splitCreateTablesQueries(rawEvent *PersistedDDLEvent) ([]string, error) {
querys, err := commonEvent.SplitQueries(rawEvent.Query)
if err == nil && len(querys) == len(rawEvent.MultipleTableInfos) {
return querys, nil
}
fields := []zap.Field{
zap.Int("tableCount", len(rawEvent.MultipleTableInfos)),
zap.String("query", rawEvent.Query),
}
if err != nil {
fields = append(fields, zap.Error(err))
} else {
fields = append(fields, zap.Int("queryCount", len(querys)))
}
log.Warn("create tables query count not match table count", fields...)
return nil, cerror.WrapError(cerror.ErrTiDBUnexpectedJobMeta, errors.New("create tables query count not match table count"))
}

func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -3654,6 +3655,47 @@ func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T)
require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs)
}

func TestBuildDDLEventForCreateTablesQueryCountMismatch(t *testing.T) {
tableInfo1 := newEligibleTableInfoForTest(201, "t_26")
tableInfo2 := newEligibleTableInfoForTest(202, "t_27")
tableInfo1.State = model.StatePublic
tableInfo2.State = model.StatePublic
tableInfo1.Columns[0].AddFlag(mysql.NotNullFlag)
tableInfo2.Columns[0].AddFlag(mysql.NotNullFlag)
for _, column := range tableInfo1.Columns {
column.State = model.StatePublic
}
for _, column := range tableInfo2.Columns {
column.State = model.StatePublic
}

job := &model.Job{
Type: model.ActionCreateTables,
SchemaID: 100,
Query: "CREATE TABLE `t_26` (`a` INT PRIMARY KEY,`b` INT)",
BinlogInfo: &model.HistoryInfo{
MultipleTableInfos: []*model.TableInfo{
tableInfo1,
tableInfo2,
},
},
}
originalQueries, err := commonEvent.SplitQueries(job.Query)
require.NoError(t, err)
require.Len(t, originalQueries, 1)

rawEvent := buildPersistedDDLEventForCreateTables(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{100: {Name: "test"}},
})
require.Equal(t, job.Query, rawEvent.Query)

ddlEvent, ok, err := buildDDLEventForCreateTables(&rawEvent, nil, 0)
require.Error(t, err)
require.False(t, ok)
require.Equal(t, commonEvent.DDLEvent{}, ddlEvent)
}

func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) {
args := updateDDLHistoryFuncArgs{
ddlEvent: &PersistedDDLEvent{
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ function skip_if_tidb_version_less_than() {
fi

if tidb_version_less_than "$tidbReleaseVersion" "$minVersion"; then
stop_tidb_cluster
echo "[$(date)] <<<<<< skip test case ${testName}, TiDB version ${tidbReleaseVersion} is less than ${minVersion} >>>>>>"
exit 0
fi
Expand Down
10 changes: 9 additions & 1 deletion tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,15 @@ function run_without_fast_create_table() {

trap 'stop_test $WORK_DIR' EXIT
run_without_fast_create_table $*
tidb_release_version=$(get_tidb_release_version || true)
stop_tidb_cluster
run_with_fast_create_table $*
if [ -z "$tidb_release_version" ]; then
echo "[$(date)] failed to parse TiDB release version, continue fast create table test case"
run_with_fast_create_table $*
elif tidb_version_less_than "$tidb_release_version" "v8.5.0"; then
echo "[$(date)] <<<<<< skip fast create table part of $TEST_NAME, TiDB version ${tidb_release_version} is less than v8.5.0 >>>>>>"
else
run_with_fast_create_table $*
fi
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions tests/integration_tests/generate_column/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
skip_if_tidb_version_less_than "v8.5.0"

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/partial_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
skip_if_tidb_version_less_than "v8.5.0"

cd $WORK_DIR

Expand Down
24 changes: 24 additions & 0 deletions tests/integration_tests/same_upstream_downstream/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function skip_if_not_tidb_8_5() {
local tidbReleaseVersion
tidbReleaseVersion=$(get_tidb_release_version || true)
if [ -z "$tidbReleaseVersion" ]; then
echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME"
return
fi

local versionTriplet
if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then
echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME"
return
fi

local major minor patch
read -r major minor patch <<<"$versionTriplet"
if [ "$major" != "8" ] || [ "$minor" != "5" ]; then
stop_tidb_cluster
echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>"
exit 0
fi
}

function run() {
# Only meaningful for MySQL/TiDB sink.
if [ "$SINK_TYPE" != "mysql" ]; then
Expand All @@ -19,6 +42,7 @@ function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
skip_if_not_tidb_8_5
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

UP_SINK_URI="mysql://root@${UP_TIDB_HOST}:${UP_TIDB_PORT}/"
Expand Down
24 changes: 24 additions & 0 deletions tests/integration_tests/tidb_mysql_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,35 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function skip_if_not_tidb_8_5() {
local tidbReleaseVersion
tidbReleaseVersion=$(get_tidb_release_version || true)
if [ -z "$tidbReleaseVersion" ]; then
echo "[$(date)] failed to parse TiDB release version, continue test case $TEST_NAME"
return
fi

local versionTriplet
if ! versionTriplet=$(normalize_tidb_semver_triplet "$tidbReleaseVersion"); then
echo "[$(date)] failed to normalize TiDB release version ${tidbReleaseVersion}, continue test case $TEST_NAME"
return
fi

local major minor patch
read -r major minor patch <<<"$versionTriplet"
if [ "$major" != "8" ] || [ "$minor" != "5" ]; then
stop_tidb_cluster
echo "[$(date)] <<<<<< skip test case $TEST_NAME, TiDB version ${tidbReleaseVersion} is not in 8.5.x >>>>>>"
exit 0
fi
}

function prepare() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
stop_tidb_cluster

start_tidb_cluster --workdir $WORK_DIR
skip_if_not_tidb_8_5

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
Expand Down
Loading