From 6845700af69ccbb6e4fbd3cff7a8dea7217eebf4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 7 Mar 2026 21:37:52 +0800 Subject: [PATCH 01/19] schemastore: fix rename tables for v8.1.2 --- .../persist_storage_ddl_handlers.go | 32 ++++++++++++++++--- .../schemastore/persist_storage_test.go | 32 +++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 7296c2d550..37ef774d89 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -817,13 +817,37 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) var querys []string for _, info := range renameArgs.RenameTableInfos { - event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, info.OldSchemaID) - event.ExtraSchemaNames = append(event.ExtraSchemaNames, info.OldSchemaName.O) - event.ExtraTableNames = append(event.ExtraTableNames, info.OldTableName.O) + oldSchemaID := info.OldSchemaID + oldSchemaName := info.OldSchemaName.O + oldTableName := info.OldTableName.O + if oldSchemaName == "" || oldTableName == "" || oldSchemaID == 0 { + if tableInfo, ok := args.tableMap[info.TableID]; ok { + if oldSchemaID == 0 { + oldSchemaID = tableInfo.SchemaID + } + if oldSchemaName == "" { + oldSchemaName = getSchemaName(args.databaseMap, tableInfo.SchemaID) + } + if oldTableName == "" { + oldTableName = tableInfo.Name + } + log.Warn("rename tables args miss old table identifiers fallback to schema store metadata", + zap.Int64("tableID", info.TableID), + zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), + zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), + zap.String("oldTableNameInArgs", info.OldTableName.O), + zap.Int64("oldSchemaIDInStore", tableInfo.SchemaID), + zap.String("oldTableNameInStore", tableInfo.Name)) + } + } + + event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) + event.ExtraSchemaNames = append(event.ExtraSchemaNames, oldSchemaName) + event.ExtraTableNames = append(event.ExtraTableNames, oldTableName) event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID) SchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) event.SchemaNames = append(event.SchemaNames, SchemaName) - querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", info.OldSchemaName.O, info.OldTableName.O, SchemaName, info.NewTableName.O)) + querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, SchemaName, info.NewTableName.O)) } event.Query = strings.Join(querys, "") diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index fd22723db9..1702e42f85 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3130,6 +3130,38 @@ func TestRenameTable(t *testing.T) { assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query) } +func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"test_predelete", "test_predelete"}, + []string{"", ""}, + []string{"orders", "users"}, + 1010, + ) + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "test_predelete", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "sc_predeleted_test_predelete", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "orders"}, + 201: {SchemaID: 100, Name: "users"}, + }, + }) + + assert.Equal(t, + "RENAME TABLE `test_predelete`.`orders` TO `sc_predeleted_test_predelete`.`orders`;"+ + "RENAME TABLE `test_predelete`.`users` TO `sc_predeleted_test_predelete`.`users`;", + ddl.Query) + assert.Equal(t, []string{"orders", "users"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"test_predelete", "test_predelete"}, ddl.ExtraSchemaNames) + assert.Equal(t, []string{"sc_predeleted_test_predelete", "sc_predeleted_test_predelete"}, ddl.SchemaNames) +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 9bca9cbc4a4e071fa4fc5af4f9b0faab42d3507f Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 7 Mar 2026 21:56:25 +0800 Subject: [PATCH 02/19] improve test --- .../schemastore/persist_storage_test.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 1702e42f85..6c677c3bf8 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3135,31 +3135,31 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) []int64{100, 100}, []int64{105, 105}, []int64{200, 201}, - []string{"test_predelete", "test_predelete"}, + []string{"source_db", "source_db"}, []string{"", ""}, - []string{"orders", "users"}, + []string{"target_t1", "target_t2"}, 1010, ) ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ job: job, databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "test_predelete", Tables: map[int64]bool{200: true, 201: true}}, - 105: {Name: "sc_predeleted_test_predelete", Tables: map[int64]bool{200: true, 201: true}}, + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, }, tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "orders"}, - 201: {SchemaID: 100, Name: "users"}, + 200: {SchemaID: 100, Name: "source_t1"}, + 201: {SchemaID: 100, Name: "source_t2"}, }, }) assert.Equal(t, - "RENAME TABLE `test_predelete`.`orders` TO `sc_predeleted_test_predelete`.`orders`;"+ - "RENAME TABLE `test_predelete`.`users` TO `sc_predeleted_test_predelete`.`users`;", + "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2` TO `target_db`.`target_t2`;", ddl.Query) - assert.Equal(t, []string{"orders", "users"}, ddl.ExtraTableNames) - assert.Equal(t, []string{"test_predelete", "test_predelete"}, ddl.ExtraSchemaNames) - assert.Equal(t, []string{"sc_predeleted_test_predelete", "sc_predeleted_test_predelete"}, ddl.SchemaNames) + assert.Equal(t, []string{"source_t1", "source_t2"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) + assert.Equal(t, []string{"target_db", "target_db"}, ddl.SchemaNames) } func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { From 238b9990fd102e2d3e098c7ee9da4d9decdb90e6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 7 Mar 2026 22:55:50 +0800 Subject: [PATCH 03/19] try fix --- .../persist_storage_ddl_handlers.go | 311 ++++++++++++++++-- .../schemastore/persist_storage_test.go | 91 +++++ 2 files changed, 382 insertions(+), 20 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 37ef774d89..7e8fd8c07f 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -14,6 +14,7 @@ package schemastore import ( + "encoding/json" "errors" "fmt" "strings" @@ -800,58 +801,328 @@ func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncA return event } +type renameTableQueryInfo struct { + oldSchemaName string + oldTableName string + newSchemaName string + newTableName string +} + +func getRenameTablesArgsSafely(job *model.Job) (_ *model.RenameTablesArgs, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic when get rename tables args: %v", r) + } + }() + return model.GetRenameTablesArgs(job) +} + +func decodeRenameTablesArgsFromRawArgs(job *model.Job) (*model.RenameTablesArgs, error) { + if len(job.RawArgs) == 0 { + return nil, fmt.Errorf("job raw args is empty") + } + + if job.Version == model.JobVersion2 { + var args model.RenameTablesArgs + if err := json.Unmarshal(job.RawArgs, &args); err != nil { + return nil, err + } + return &args, nil + } + + var rawArgs []json.RawMessage + if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil { + return nil, err + } + + var ( + oldSchemaIDs []int64 + newSchemaIDs []int64 + newTableNames []ast.CIStr + tableIDs []int64 + oldSchemaNames []ast.CIStr + oldTableNames []ast.CIStr + ) + if len(rawArgs) > 0 { + if err := json.Unmarshal(rawArgs[0], &oldSchemaIDs); err != nil { + return nil, err + } + } + if len(rawArgs) > 1 { + if err := json.Unmarshal(rawArgs[1], &newSchemaIDs); err != nil { + return nil, err + } + } + if len(rawArgs) > 2 { + if err := json.Unmarshal(rawArgs[2], &newTableNames); err != nil { + return nil, err + } + } + if len(rawArgs) > 3 { + if err := json.Unmarshal(rawArgs[3], &tableIDs); err != nil { + return nil, err + } + } + if len(rawArgs) > 4 { + if err := json.Unmarshal(rawArgs[4], &oldSchemaNames); err != nil { + return nil, err + } + } + if len(rawArgs) > 5 { + if err := json.Unmarshal(rawArgs[5], &oldTableNames); err != nil { + return nil, err + } + } + + n := len(oldSchemaIDs) + if len(newSchemaIDs) > n { + n = len(newSchemaIDs) + } + if len(newTableNames) > n { + n = len(newTableNames) + } + if len(tableIDs) > n { + n = len(tableIDs) + } + + infos := make([]*model.RenameTableArgs, 0, n) + for i := 0; i < n; i++ { + info := &model.RenameTableArgs{} + if i < len(oldSchemaIDs) { + info.OldSchemaID = oldSchemaIDs[i] + } + if i < len(newSchemaIDs) { + info.NewSchemaID = newSchemaIDs[i] + } + if i < len(newTableNames) { + info.NewTableName = newTableNames[i] + } + if i < len(tableIDs) { + info.TableID = tableIDs[i] + } + if i < len(oldSchemaNames) { + info.OldSchemaName = oldSchemaNames[i] + } + if i < len(oldTableNames) { + info.OldTableName = oldTableNames[i] + } + infos = append(infos, info) + } + + return &model.RenameTablesArgs{RenameTableInfos: infos}, nil +} + +func getRenameTablesArgsWithFallback(job *model.Job) (*model.RenameTablesArgs, error) { + renameArgs, err := getRenameTablesArgsSafely(job) + if err == nil { + return renameArgs, nil + } + + rawArgs, decodeErr := decodeRenameTablesArgsFromRawArgs(job) + if decodeErr != nil { + return nil, fmt.Errorf("get rename tables args failed: %v, decode from raw args failed: %v", err, decodeErr) + } + + log.Warn("get rename tables args failed fallback to decode raw args", + zap.String("query", job.Query), + zap.Error(err), + zap.Int("rawArgLen", len(job.RawArgs))) + return rawArgs, nil +} + +func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo { + if query == "" { + return nil + } + stmt, err := parser.New().ParseOneStmt(query, "", "") + if err != nil { + log.Warn("parse rename tables query failed", + zap.String("query", query), + zap.Error(err)) + return nil + } + renameStmt, ok := stmt.(*ast.RenameTableStmt) + if !ok { + return nil + } + + queryInfos := make([]renameTableQueryInfo, 0, len(renameStmt.TableToTables)) + for _, tableToTable := range renameStmt.TableToTables { + queryInfos = append(queryInfos, renameTableQueryInfo{ + oldSchemaName: tableToTable.OldTable.Schema.O, + oldTableName: tableToTable.OldTable.Name.O, + newSchemaName: tableToTable.NewTable.Schema.O, + newTableName: tableToTable.NewTable.Name.O, + }) + } + return queryInfos +} + +func matchRenameQueryInfoByNewTable( + queryInfos []renameTableQueryInfo, + used map[int]struct{}, + newSchemaName, newTableName string, +) (renameTableQueryInfo, int, bool) { + for i, info := range queryInfos { + if _, ok := used[i]; ok { + continue + } + if !strings.EqualFold(info.newTableName, newTableName) { + continue + } + if info.newSchemaName == "" { + continue + } + if strings.EqualFold(info.newSchemaName, newSchemaName) { + return info, i, true + } + } + + candidateIdx := -1 + for i, info := range queryInfos { + if _, ok := used[i]; ok { + continue + } + if !strings.EqualFold(info.newTableName, newTableName) { + continue + } + if info.newSchemaName != "" && !strings.EqualFold(info.newSchemaName, newSchemaName) { + continue + } + if candidateIdx != -1 { + return renameTableQueryInfo{}, -1, false + } + candidateIdx = i + } + if candidateIdx == -1 { + return renameTableQueryInfo{}, -1, false + } + return queryInfos[candidateIdx], candidateIdx, true +} + +func getSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) int64 { + for schemaID, databaseInfo := range databaseMap { + if strings.EqualFold(databaseInfo.Name, schemaName) { + return schemaID + } + } + return 0 +} + func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { // TODO: does rename tables has the same problem(finished ts is not the real commit ts) with rename table? event := buildPersistedDDLEventCommon(args) - renameArgs, err := model.GetRenameTablesArgs(args.job) + renameArgs, err := getRenameTablesArgsWithFallback(args.job) if err != nil { log.Panic("GetRenameTablesArgs failed", zap.String("query", args.job.Query), zap.Error(err)) } - if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) { - log.Panic("should not happen", - zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)), - zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos))) + renameTableInfos := renameArgs.RenameTableInfos + multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos + if len(renameTableInfos) != len(multipleTableInfos) { + minLen := len(renameTableInfos) + if len(multipleTableInfos) < minLen { + minLen = len(multipleTableInfos) + } + log.Warn("rename tables args length mismatch with table infos use min length", + zap.Int("renameArgsLen", len(renameTableInfos)), + zap.Int("multipleTableInfosLen", len(multipleTableInfos)), + zap.Int("minLen", minLen), + zap.String("query", args.job.Query)) + renameTableInfos = renameTableInfos[:minLen] + multipleTableInfos = multipleTableInfos[:minLen] } + queryInfos := parseRenameTablesQueryInfos(args.job.Query) + usedQueryInfos := make(map[int]struct{}, len(queryInfos)) var querys []string - for _, info := range renameArgs.RenameTableInfos { + for _, info := range renameTableInfos { oldSchemaID := info.OldSchemaID oldSchemaName := info.OldSchemaName.O oldTableName := info.OldTableName.O - if oldSchemaName == "" || oldTableName == "" || oldSchemaID == 0 { - if tableInfo, ok := args.tableMap[info.TableID]; ok { - if oldSchemaID == 0 { - oldSchemaID = tableInfo.SchemaID - } + newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) + + if oldSchemaName == "" || oldTableName == "" { + queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable( + queryInfos, + usedQueryInfos, + newSchemaName, + info.NewTableName.O, + ) + if ok { + usedQueryInfos[queryInfoIndex] = struct{}{} if oldSchemaName == "" { - oldSchemaName = getSchemaName(args.databaseMap, tableInfo.SchemaID) + oldSchemaName = queryInfo.oldSchemaName } if oldTableName == "" { - oldTableName = tableInfo.Name + oldTableName = queryInfo.oldTableName + } + if oldSchemaID == 0 && oldSchemaName != "" { + oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName) } - log.Warn("rename tables args miss old table identifiers fallback to schema store metadata", + log.Warn("rename tables args miss old table identifiers fallback to query", zap.Int64("tableID", info.TableID), zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), zap.String("oldTableNameInArgs", info.OldTableName.O), - zap.Int64("oldSchemaIDInStore", tableInfo.SchemaID), - zap.String("oldTableNameInStore", tableInfo.Name)) + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), + zap.String("oldTableNameInQuery", queryInfo.oldTableName)) } } + if tableInfo, ok := args.tableMap[info.TableID]; ok { + oldSchemaNameInStore := getSchemaName(args.databaseMap, tableInfo.SchemaID) + if oldSchemaID == 0 { + oldSchemaID = tableInfo.SchemaID + } + if oldSchemaName == "" { + oldSchemaName = oldSchemaNameInStore + } + if oldTableName == "" { + oldTableName = tableInfo.Name + } else if tableInfo.Name != "" && + !strings.EqualFold(oldTableName, tableInfo.Name) { + // For cyclic rename statements, the old table name parsed from query can be + // a temporary name. Prefer schema store metadata to keep the table lifecycle correct. + log.Warn("rename tables query old table name mismatch with schema metadata prefer schema metadata", + zap.Int64("tableID", info.TableID), + zap.String("oldTableNameInQuery", oldTableName), + zap.String("oldTableNameInStore", tableInfo.Name), + zap.String("query", args.job.Query)) + oldTableName = tableInfo.Name + oldSchemaName = oldSchemaNameInStore + oldSchemaID = tableInfo.SchemaID + } + } + if oldSchemaID == 0 && oldSchemaName != "" { + oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName) + } + if oldSchemaName == "" && oldSchemaID != 0 { + oldSchemaName = getSchemaName(args.databaseMap, oldSchemaID) + } + if oldSchemaID == 0 { + oldSchemaID = info.NewSchemaID + } + if oldSchemaName == "" { + oldSchemaName = newSchemaName + } + if oldTableName == "" { + oldTableName = info.NewTableName.O + } + event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) event.ExtraSchemaNames = append(event.ExtraSchemaNames, oldSchemaName) event.ExtraTableNames = append(event.ExtraTableNames, oldTableName) event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID) - SchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - event.SchemaNames = append(event.SchemaNames, SchemaName) - querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, SchemaName, info.NewTableName.O)) + event.SchemaNames = append(event.SchemaNames, newSchemaName) + querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, newSchemaName, info.NewTableName.O)) } event.Query = strings.Join(querys, "") - event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos + event.MultipleTableInfos = multipleTableInfos // we have to reverse MultipleTableInfos to get correct schema name // see https://github.com/pingcap/tidb/issues/63710 // diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 6c677c3bf8..cc0a24df2e 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3162,6 +3162,97 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) assert.Equal(t, []string{"target_db", "target_db"}, ddl.SchemaNames) } +func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"", ""}, + []string{"", ""}, + []string{"target_t1", "target_t2"}, + 1010, + ) + job.Query = "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`" + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1"}, + }, + }) + + assert.Equal(t, + "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2` TO `target_db`.`target_t2`;", + ddl.Query) + assert.Equal(t, []string{"source_t1", "source_t2"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) +} + +func TestBuildPersistedDDLEventForRenameTablesRecoverGetArgsPanic(t *testing.T) { + renameArgs := &model.RenameTablesArgs{ + RenameTableInfos: []*model.RenameTableArgs{ + { + OldSchemaID: 100, + NewSchemaID: 105, + TableID: 200, + NewTableName: ast.NewCIStr("target_t1"), + }, + { + OldSchemaID: 100, + NewSchemaID: 105, + TableID: 201, + NewTableName: ast.NewCIStr("target_t2"), + }, + }, + } + job := &model.Job{ + Type: model.ActionRenameTables, + Version: model.JobVersion1, + Query: "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`", + BinlogInfo: &model.HistoryInfo{ + MultipleTableInfos: []*model.TableInfo{ + newEligibleTableInfoForTest(200, "target_t1"), + newEligibleTableInfoForTest(201, "target_t2"), + }, + FinishedTS: 1010, + }, + } + job.FillArgs(renameArgs) + model.UpdateJobArgsForTest(job, func(args []any) []any { + // Simulate old TiDB rename tables args shape that drops old schema names and old table names. + return args[:4] + }) + _, err := job.Encode(true) + require.NoError(t, err) + + var ddl PersistedDDLEvent + require.NotPanics(t, func() { + ddl = buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1"}, + 201: {SchemaID: 100, Name: "source_t2"}, + }, + }) + }) + + assert.Equal(t, + "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2` TO `target_db`.`target_t2`;", + ddl.Query) + assert.Equal(t, []string{"source_t1", "source_t2"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 61294c11d8ea6985d09e567b05a88589251d8165 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 8 Mar 2026 22:48:19 +0800 Subject: [PATCH 04/19] f --- .../persist_storage_ddl_handlers.go | 125 +----------------- .../schemastore/persist_storage_test.go | 60 --------- 2 files changed, 1 insertion(+), 184 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 7e8fd8c07f..86e4774dcf 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -14,7 +14,6 @@ package schemastore import ( - "encoding/json" "errors" "fmt" "strings" @@ -808,128 +807,6 @@ type renameTableQueryInfo struct { newTableName string } -func getRenameTablesArgsSafely(job *model.Job) (_ *model.RenameTablesArgs, err error) { - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("panic when get rename tables args: %v", r) - } - }() - return model.GetRenameTablesArgs(job) -} - -func decodeRenameTablesArgsFromRawArgs(job *model.Job) (*model.RenameTablesArgs, error) { - if len(job.RawArgs) == 0 { - return nil, fmt.Errorf("job raw args is empty") - } - - if job.Version == model.JobVersion2 { - var args model.RenameTablesArgs - if err := json.Unmarshal(job.RawArgs, &args); err != nil { - return nil, err - } - return &args, nil - } - - var rawArgs []json.RawMessage - if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil { - return nil, err - } - - var ( - oldSchemaIDs []int64 - newSchemaIDs []int64 - newTableNames []ast.CIStr - tableIDs []int64 - oldSchemaNames []ast.CIStr - oldTableNames []ast.CIStr - ) - if len(rawArgs) > 0 { - if err := json.Unmarshal(rawArgs[0], &oldSchemaIDs); err != nil { - return nil, err - } - } - if len(rawArgs) > 1 { - if err := json.Unmarshal(rawArgs[1], &newSchemaIDs); err != nil { - return nil, err - } - } - if len(rawArgs) > 2 { - if err := json.Unmarshal(rawArgs[2], &newTableNames); err != nil { - return nil, err - } - } - if len(rawArgs) > 3 { - if err := json.Unmarshal(rawArgs[3], &tableIDs); err != nil { - return nil, err - } - } - if len(rawArgs) > 4 { - if err := json.Unmarshal(rawArgs[4], &oldSchemaNames); err != nil { - return nil, err - } - } - if len(rawArgs) > 5 { - if err := json.Unmarshal(rawArgs[5], &oldTableNames); err != nil { - return nil, err - } - } - - n := len(oldSchemaIDs) - if len(newSchemaIDs) > n { - n = len(newSchemaIDs) - } - if len(newTableNames) > n { - n = len(newTableNames) - } - if len(tableIDs) > n { - n = len(tableIDs) - } - - infos := make([]*model.RenameTableArgs, 0, n) - for i := 0; i < n; i++ { - info := &model.RenameTableArgs{} - if i < len(oldSchemaIDs) { - info.OldSchemaID = oldSchemaIDs[i] - } - if i < len(newSchemaIDs) { - info.NewSchemaID = newSchemaIDs[i] - } - if i < len(newTableNames) { - info.NewTableName = newTableNames[i] - } - if i < len(tableIDs) { - info.TableID = tableIDs[i] - } - if i < len(oldSchemaNames) { - info.OldSchemaName = oldSchemaNames[i] - } - if i < len(oldTableNames) { - info.OldTableName = oldTableNames[i] - } - infos = append(infos, info) - } - - return &model.RenameTablesArgs{RenameTableInfos: infos}, nil -} - -func getRenameTablesArgsWithFallback(job *model.Job) (*model.RenameTablesArgs, error) { - renameArgs, err := getRenameTablesArgsSafely(job) - if err == nil { - return renameArgs, nil - } - - rawArgs, decodeErr := decodeRenameTablesArgsFromRawArgs(job) - if decodeErr != nil { - return nil, fmt.Errorf("get rename tables args failed: %v, decode from raw args failed: %v", err, decodeErr) - } - - log.Warn("get rename tables args failed fallback to decode raw args", - zap.String("query", job.Query), - zap.Error(err), - zap.Int("rawArgLen", len(job.RawArgs))) - return rawArgs, nil -} - func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo { if query == "" { return nil @@ -1012,7 +889,7 @@ func getSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName stri func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { // TODO: does rename tables has the same problem(finished ts is not the real commit ts) with rename table? event := buildPersistedDDLEventCommon(args) - renameArgs, err := getRenameTablesArgsWithFallback(args.job) + renameArgs, err := model.GetRenameTablesArgs(args.job) if err != nil { log.Panic("GetRenameTablesArgs failed", zap.String("query", args.job.Query), diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index cc0a24df2e..3475f102d9 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3193,66 +3193,6 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testi assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) } -func TestBuildPersistedDDLEventForRenameTablesRecoverGetArgsPanic(t *testing.T) { - renameArgs := &model.RenameTablesArgs{ - RenameTableInfos: []*model.RenameTableArgs{ - { - OldSchemaID: 100, - NewSchemaID: 105, - TableID: 200, - NewTableName: ast.NewCIStr("target_t1"), - }, - { - OldSchemaID: 100, - NewSchemaID: 105, - TableID: 201, - NewTableName: ast.NewCIStr("target_t2"), - }, - }, - } - job := &model.Job{ - Type: model.ActionRenameTables, - Version: model.JobVersion1, - Query: "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`", - BinlogInfo: &model.HistoryInfo{ - MultipleTableInfos: []*model.TableInfo{ - newEligibleTableInfoForTest(200, "target_t1"), - newEligibleTableInfoForTest(201, "target_t2"), - }, - FinishedTS: 1010, - }, - } - job.FillArgs(renameArgs) - model.UpdateJobArgsForTest(job, func(args []any) []any { - // Simulate old TiDB rename tables args shape that drops old schema names and old table names. - return args[:4] - }) - _, err := job.Encode(true) - require.NoError(t, err) - - var ddl PersistedDDLEvent - require.NotPanics(t, func() { - ddl = buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, - 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "source_t1"}, - 201: {SchemaID: 100, Name: "source_t2"}, - }, - }) - }) - - assert.Equal(t, - "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`;"+ - "RENAME TABLE `source_db`.`source_t2` TO `target_db`.`target_t2`;", - ddl.Query) - assert.Equal(t, []string{"source_t1", "source_t2"}, ddl.ExtraTableNames) - assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) -} - func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 5fd62820c027fdc13e8827f3af8e6720f12e044b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 11:13:49 +0800 Subject: [PATCH 05/19] fix --- .../persist_storage_ddl_handlers.go | 9 +---- .../schemastore/persist_storage_test.go | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 86e4774dcf..f7975c3ebb 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -898,17 +898,10 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) renameTableInfos := renameArgs.RenameTableInfos multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos if len(renameTableInfos) != len(multipleTableInfos) { - minLen := len(renameTableInfos) - if len(multipleTableInfos) < minLen { - minLen = len(multipleTableInfos) - } - log.Warn("rename tables args length mismatch with table infos use min length", + log.Panic("should not happen", zap.Int("renameArgsLen", len(renameTableInfos)), zap.Int("multipleTableInfosLen", len(multipleTableInfos)), - zap.Int("minLen", minLen), zap.String("query", args.job.Query)) - renameTableInfos = renameTableInfos[:minLen] - multipleTableInfos = multipleTableInfos[:minLen] } queryInfos := parseRenameTablesQueryInfos(args.job.Query) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 3475f102d9..5eb6c01d56 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3193,6 +3193,39 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testi assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) } +func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t *testing.T) { + // Simulate: rename table a to c, b to a, c to b; + // c only exists as a temporary name inside this statement. + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{100, 100}, + []int64{200, 201}, + []string{"", ""}, + []string{"", ""}, + []string{"b", "a"}, + 1010, + ) + job.Query = "RENAME TABLE `test`.`a` TO `test`.`c`, `test`.`b` TO `test`.`a`, `test`.`c` TO `test`.`b`" + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "test", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "a"}, + 201: {SchemaID: 100, Name: "b"}, + }, + }) + + assert.Equal(t, + "RENAME TABLE `test`.`a` TO `test`.`b`;"+ + "RENAME TABLE `test`.`b` TO `test`.`a`;", + ddl.Query) + assert.Equal(t, []string{"a", "b"}, ddl.ExtraTableNames) + assert.NotContains(t, ddl.Query, "`c`") +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 2cbc40d4de03f5cbcc2bfc8f9b1f12b6a375409d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 11:31:23 +0800 Subject: [PATCH 06/19] fix --- .../persist_storage_ddl_handlers.go | 5 +++- .../schemastore/persist_storage_test.go | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index f7975c3ebb..3a826bd495 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -944,10 +944,13 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) } if tableInfo, ok := args.tableMap[info.TableID]; ok { - oldSchemaNameInStore := getSchemaName(args.databaseMap, tableInfo.SchemaID) if oldSchemaID == 0 { oldSchemaID = tableInfo.SchemaID } + oldSchemaNameInStore := getSchemaName(args.databaseMap, oldSchemaID) + if oldSchemaNameInStore == "" { + oldSchemaNameInStore = getSchemaName(args.databaseMap, tableInfo.SchemaID) + } if oldSchemaName == "" { oldSchemaName = oldSchemaNameInStore } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 5eb6c01d56..2cf8bec725 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3226,6 +3226,34 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t * assert.NotContains(t, ddl.Query, "`c`") } +func TestBuildPersistedDDLEventForRenameTablesUseOldSchemaIDForSchemaNameLookup(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{101}, + []int64{105}, + []int64{200}, + []string{""}, + []string{"source_t1"}, + []string{"target_t1"}, + 1010, + ) + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db_from_table_map", Tables: map[int64]bool{200: true}}, + 101: {Name: "source_db_from_args", Tables: map[int64]bool{200: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1"}, + }, + }) + + assert.Equal(t, []int64{101}, ddl.ExtraSchemaIDs) + assert.Equal(t, []string{"source_db_from_args"}, ddl.ExtraSchemaNames) + assert.Equal(t, "RENAME TABLE `source_db_from_args`.`source_t1` TO `target_db`.`target_t1`;", ddl.Query) +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 0ba4977c5d932b484e80f7b7887cedb16d502280 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 11:51:46 +0800 Subject: [PATCH 07/19] fix --- .../persist_storage_ddl_handlers.go | 35 ++++++++++++++----- .../schemastore/persist_storage_test.go | 28 +++++++++++++++ 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 3a826bd495..957d1804e6 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -911,27 +911,33 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldSchemaID := info.OldSchemaID oldSchemaName := info.OldSchemaName.O oldTableName := info.OldTableName.O + oldTableNameFromQuery := false newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - if oldSchemaName == "" || oldTableName == "" { + if oldTableName == "" { queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable( queryInfos, usedQueryInfos, newSchemaName, info.NewTableName.O, ) - if ok { + queryOldSchemaID := int64(0) + if queryInfo.oldSchemaName != "" { + queryOldSchemaID = getSchemaIDByName(args.databaseMap, queryInfo.oldSchemaName) + } + if ok && queryInfo.oldTableName != "" && + (oldSchemaName == "" || queryInfo.oldSchemaName == "" || strings.EqualFold(oldSchemaName, queryInfo.oldSchemaName)) && + (oldSchemaID == 0 || queryOldSchemaID == 0 || oldSchemaID == queryOldSchemaID) { usedQueryInfos[queryInfoIndex] = struct{}{} + oldTableName = queryInfo.oldTableName + oldTableNameFromQuery = true if oldSchemaName == "" { oldSchemaName = queryInfo.oldSchemaName } - if oldTableName == "" { - oldTableName = queryInfo.oldTableName - } - if oldSchemaID == 0 && oldSchemaName != "" { - oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName) + if oldSchemaID == 0 && queryOldSchemaID != 0 { + oldSchemaID = queryOldSchemaID } - log.Warn("rename tables args miss old table identifiers fallback to query", + log.Warn("rename tables args miss old table name fallback to query", zap.Int64("tableID", info.TableID), zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), @@ -940,6 +946,16 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("newTableNameInArgs", info.NewTableName.O), zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), zap.String("oldTableNameInQuery", queryInfo.oldTableName)) + } else if ok { + log.Warn("rename tables query is inconsistent with args skip query fallback", + zap.Int64("tableID", info.TableID), + zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), + zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), + zap.String("oldTableNameInQuery", queryInfo.oldTableName), + zap.Int64("oldSchemaIDInQuery", queryOldSchemaID)) } } @@ -956,7 +972,8 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) } if oldTableName == "" { oldTableName = tableInfo.Name - } else if tableInfo.Name != "" && + } else if oldTableNameFromQuery && + tableInfo.Name != "" && !strings.EqualFold(oldTableName, tableInfo.Name) { // For cyclic rename statements, the old table name parsed from query can be // a temporary name. Prefer schema store metadata to keep the table lifecycle correct. diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 2cf8bec725..402914ee86 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3254,6 +3254,34 @@ func TestBuildPersistedDDLEventForRenameTablesUseOldSchemaIDForSchemaNameLookup( assert.Equal(t, "RENAME TABLE `source_db_from_args`.`source_t1` TO `target_db`.`target_t1`;", ddl.Query) } +func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100}, + []int64{105}, + []int64{200}, + []string{"source_db"}, + []string{"source_t1_from_args"}, + []string{"target_t1"}, + 1010, + ) + job.Query = "RENAME TABLE `source_db`.`source_t1_from_query` TO `target_db`.`target_t1`" + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db", Tables: map[int64]bool{200: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1_from_store"}, + }, + }) + + assert.Equal(t, []string{"source_t1_from_args"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db"}, ddl.ExtraSchemaNames) + assert.Equal(t, "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;", ddl.Query) +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From 27494c2ed8bbac1eb32d20703a2c840b19754228 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 12:21:53 +0800 Subject: [PATCH 08/19] fix --- .../persist_storage_ddl_handlers.go | 65 +++++++------------ 1 file changed, 24 insertions(+), 41 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 957d1804e6..1dd1e70e8e 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -900,8 +900,7 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) if len(renameTableInfos) != len(multipleTableInfos) { log.Panic("should not happen", zap.Int("renameArgsLen", len(renameTableInfos)), - zap.Int("multipleTableInfosLen", len(multipleTableInfos)), - zap.String("query", args.job.Query)) + zap.Int("multipleTableInfosLen", len(multipleTableInfos))) } queryInfos := parseRenameTablesQueryInfos(args.job.Query) @@ -913,6 +912,9 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldTableName := info.OldTableName.O oldTableNameFromQuery := false newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) + if oldSchemaName == "" { + oldSchemaName = getSchemaName(args.databaseMap, oldSchemaID) + } if oldTableName == "" { queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable( @@ -921,26 +923,29 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) newSchemaName, info.NewTableName.O, ) - queryOldSchemaID := int64(0) - if queryInfo.oldSchemaName != "" { - queryOldSchemaID = getSchemaIDByName(args.databaseMap, queryInfo.oldSchemaName) + if ok && queryInfo.oldSchemaName != "" { + queryOldSchemaID := getSchemaIDByName(args.databaseMap, queryInfo.oldSchemaName) + if !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) || + (queryOldSchemaID != 0 && queryOldSchemaID != oldSchemaID) { + log.Warn("rename tables query schema is inconsistent with args", + zap.Int64("tableID", info.TableID), + zap.Int64("oldSchemaIDInArgs", oldSchemaID), + zap.String("oldSchemaNameInArgs", oldSchemaName), + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), + zap.Int64("oldSchemaIDInQuery", queryOldSchemaID), + zap.String("query", args.job.Query)) + } } - if ok && queryInfo.oldTableName != "" && - (oldSchemaName == "" || queryInfo.oldSchemaName == "" || strings.EqualFold(oldSchemaName, queryInfo.oldSchemaName)) && - (oldSchemaID == 0 || queryOldSchemaID == 0 || oldSchemaID == queryOldSchemaID) { + if ok && queryInfo.oldTableName != "" { usedQueryInfos[queryInfoIndex] = struct{}{} oldTableName = queryInfo.oldTableName oldTableNameFromQuery = true - if oldSchemaName == "" { - oldSchemaName = queryInfo.oldSchemaName - } - if oldSchemaID == 0 && queryOldSchemaID != 0 { - oldSchemaID = queryOldSchemaID - } log.Warn("rename tables args miss old table name fallback to query", zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), - zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), + zap.Int64("oldSchemaIDInArgs", oldSchemaID), + zap.String("oldSchemaNameInArgs", oldSchemaName), zap.String("oldTableNameInArgs", info.OldTableName.O), zap.String("newSchemaNameInArgs", newSchemaName), zap.String("newTableNameInArgs", info.NewTableName.O), @@ -949,27 +954,16 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) } else if ok { log.Warn("rename tables query is inconsistent with args skip query fallback", zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", info.OldSchemaID), - zap.String("oldSchemaNameInArgs", info.OldSchemaName.O), + zap.Int64("oldSchemaIDInArgs", oldSchemaID), + zap.String("oldSchemaNameInArgs", oldSchemaName), zap.String("newSchemaNameInArgs", newSchemaName), zap.String("newTableNameInArgs", info.NewTableName.O), zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.String("oldTableNameInQuery", queryInfo.oldTableName), - zap.Int64("oldSchemaIDInQuery", queryOldSchemaID)) + zap.String("oldTableNameInQuery", queryInfo.oldTableName)) } } if tableInfo, ok := args.tableMap[info.TableID]; ok { - if oldSchemaID == 0 { - oldSchemaID = tableInfo.SchemaID - } - oldSchemaNameInStore := getSchemaName(args.databaseMap, oldSchemaID) - if oldSchemaNameInStore == "" { - oldSchemaNameInStore = getSchemaName(args.databaseMap, tableInfo.SchemaID) - } - if oldSchemaName == "" { - oldSchemaName = oldSchemaNameInStore - } if oldTableName == "" { oldTableName = tableInfo.Name } else if oldTableNameFromQuery && @@ -983,19 +977,8 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("oldTableNameInStore", tableInfo.Name), zap.String("query", args.job.Query)) oldTableName = tableInfo.Name - oldSchemaName = oldSchemaNameInStore - oldSchemaID = tableInfo.SchemaID } } - if oldSchemaID == 0 && oldSchemaName != "" { - oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName) - } - if oldSchemaName == "" && oldSchemaID != 0 { - oldSchemaName = getSchemaName(args.databaseMap, oldSchemaID) - } - if oldSchemaID == 0 { - oldSchemaID = info.NewSchemaID - } if oldSchemaName == "" { oldSchemaName = newSchemaName } From fba42299391afc3dcd2d8fd26d66ce8216fbb4d2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 14:07:17 +0800 Subject: [PATCH 09/19] fix --- .../persist_storage_ddl_handlers.go | 19 +++-------- .../schemastore/persist_storage_test.go | 32 ++----------------- 2 files changed, 7 insertions(+), 44 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 1dd1e70e8e..22d43e616e 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -895,13 +895,13 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("query", args.job.Query), zap.Error(err)) } - renameTableInfos := renameArgs.RenameTableInfos - multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos - if len(renameTableInfos) != len(multipleTableInfos) { + if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) { log.Panic("should not happen", - zap.Int("renameArgsLen", len(renameTableInfos)), - zap.Int("multipleTableInfosLen", len(multipleTableInfos))) + zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)), + zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos))) } + renameTableInfos := renameArgs.RenameTableInfos + multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos queryInfos := parseRenameTablesQueryInfos(args.job.Query) usedQueryInfos := make(map[int]struct{}, len(queryInfos)) @@ -912,9 +912,6 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldTableName := info.OldTableName.O oldTableNameFromQuery := false newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - if oldSchemaName == "" { - oldSchemaName = getSchemaName(args.databaseMap, oldSchemaID) - } if oldTableName == "" { queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable( @@ -979,12 +976,6 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldTableName = tableInfo.Name } } - if oldSchemaName == "" { - oldSchemaName = newSchemaName - } - if oldTableName == "" { - oldTableName = info.NewTableName.O - } event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) event.ExtraSchemaNames = append(event.ExtraSchemaNames, oldSchemaName) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 402914ee86..3fe0b11c8a 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3167,7 +3167,7 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testi []int64{100, 100}, []int64{105, 105}, []int64{200, 201}, - []string{"", ""}, + []string{"source_db", "source_db"}, []string{"", ""}, []string{"target_t1", "target_t2"}, 1010, @@ -3200,7 +3200,7 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t * []int64{100, 100}, []int64{100, 100}, []int64{200, 201}, - []string{"", ""}, + []string{"test", "test"}, []string{"", ""}, []string{"b", "a"}, 1010, @@ -3226,34 +3226,6 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t * assert.NotContains(t, ddl.Query, "`c`") } -func TestBuildPersistedDDLEventForRenameTablesUseOldSchemaIDForSchemaNameLookup(t *testing.T) { - job := buildRenameTablesJobForTest( - []int64{101}, - []int64{105}, - []int64{200}, - []string{""}, - []string{"source_t1"}, - []string{"target_t1"}, - 1010, - ) - - ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "source_db_from_table_map", Tables: map[int64]bool{200: true}}, - 101: {Name: "source_db_from_args", Tables: map[int64]bool{200: true}}, - 105: {Name: "target_db", Tables: map[int64]bool{200: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "source_t1"}, - }, - }) - - assert.Equal(t, []int64{101}, ddl.ExtraSchemaIDs) - assert.Equal(t, []string{"source_db_from_args"}, ddl.ExtraSchemaNames) - assert.Equal(t, "RENAME TABLE `source_db_from_args`.`source_t1` TO `target_db`.`target_t1`;", ddl.Query) -} - func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { job := buildRenameTablesJobForTest( []int64{100}, From e93808ba4f74315f7f7eab8f2a9663025ebec050 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 15:31:06 +0800 Subject: [PATCH 10/19] fix --- .../persist_storage_ddl_handlers.go | 144 +++++------------- .../schemastore/persist_storage_test.go | 1 + 2 files changed, 43 insertions(+), 102 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 22d43e616e..615a1f58e1 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -835,57 +835,6 @@ func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo { return queryInfos } -func matchRenameQueryInfoByNewTable( - queryInfos []renameTableQueryInfo, - used map[int]struct{}, - newSchemaName, newTableName string, -) (renameTableQueryInfo, int, bool) { - for i, info := range queryInfos { - if _, ok := used[i]; ok { - continue - } - if !strings.EqualFold(info.newTableName, newTableName) { - continue - } - if info.newSchemaName == "" { - continue - } - if strings.EqualFold(info.newSchemaName, newSchemaName) { - return info, i, true - } - } - - candidateIdx := -1 - for i, info := range queryInfos { - if _, ok := used[i]; ok { - continue - } - if !strings.EqualFold(info.newTableName, newTableName) { - continue - } - if info.newSchemaName != "" && !strings.EqualFold(info.newSchemaName, newSchemaName) { - continue - } - if candidateIdx != -1 { - return renameTableQueryInfo{}, -1, false - } - candidateIdx = i - } - if candidateIdx == -1 { - return renameTableQueryInfo{}, -1, false - } - return queryInfos[candidateIdx], candidateIdx, true -} - -func getSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) int64 { - for schemaID, databaseInfo := range databaseMap { - if strings.EqualFold(databaseInfo.Name, schemaName) { - return schemaID - } - } - return 0 -} - func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { // TODO: does rename tables has the same problem(finished ts is not the real commit ts) with rename table? event := buildPersistedDDLEventCommon(args) @@ -903,42 +852,59 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) renameTableInfos := renameArgs.RenameTableInfos multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos - queryInfos := parseRenameTablesQueryInfos(args.job.Query) - usedQueryInfos := make(map[int]struct{}, len(queryInfos)) + needFillOldTableNamesFromQuery := len(renameTableInfos) > 0 && renameTableInfos[0].OldTableName.O == "" + queryInfos := make([]renameTableQueryInfo, 0) + if needFillOldTableNamesFromQuery { + queryInfos = parseRenameTablesQueryInfos(args.job.Query) + if len(queryInfos) != len(renameTableInfos) { + log.Warn("rename tables query info length is inconsistent with args", + zap.Int("queryInfosLen", len(queryInfos)), + zap.Int("renameArgsLen", len(renameTableInfos)), + zap.String("query", args.job.Query)) + } + } + var querys []string - for _, info := range renameTableInfos { + for i, info := range renameTableInfos { oldSchemaID := info.OldSchemaID oldSchemaName := info.OldSchemaName.O oldTableName := info.OldTableName.O - oldTableNameFromQuery := false newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - if oldTableName == "" { - queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable( - queryInfos, - usedQueryInfos, - newSchemaName, - info.NewTableName.O, - ) - if ok && queryInfo.oldSchemaName != "" { - queryOldSchemaID := getSchemaIDByName(args.databaseMap, queryInfo.oldSchemaName) - if !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) || - (queryOldSchemaID != 0 && queryOldSchemaID != oldSchemaID) { - log.Warn("rename tables query schema is inconsistent with args", - zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("newSchemaNameInArgs", newSchemaName), + if needFillOldTableNamesFromQuery && oldTableName == "" { + if i >= len(queryInfos) { + log.Warn("rename tables query info is missing for old table name fallback", + zap.Int("index", i), + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("query", args.job.Query)) + } else { + queryInfo := queryInfos[i] + if !strings.EqualFold(queryInfo.newTableName, info.NewTableName.O) { + log.Warn("rename tables query new table name is inconsistent with args", + zap.Int("index", i), zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("newTableNameInQuery", queryInfo.newTableName), + zap.String("query", args.job.Query)) + } + if queryInfo.newSchemaName != "" && + !strings.EqualFold(queryInfo.newSchemaName, newSchemaName) { + log.Warn("rename tables query new schema name is inconsistent with args", + zap.Int("index", i), + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newSchemaNameInQuery", queryInfo.newSchemaName), + zap.String("query", args.job.Query)) + } + if queryInfo.oldSchemaName != "" && + !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) { + log.Warn("rename tables query old schema name is inconsistent with args", + zap.Int("index", i), + zap.String("oldSchemaNameInArgs", oldSchemaName), zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.Int64("oldSchemaIDInQuery", queryOldSchemaID), + zap.Int64("oldSchemaIDInArgs", oldSchemaID), zap.String("query", args.job.Query)) } - } - if ok && queryInfo.oldTableName != "" { - usedQueryInfos[queryInfoIndex] = struct{}{} oldTableName = queryInfo.oldTableName - oldTableNameFromQuery = true log.Warn("rename tables args miss old table name fallback to query", zap.Int64("tableID", info.TableID), zap.Int64("oldSchemaIDInArgs", oldSchemaID), @@ -948,32 +914,6 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("newTableNameInArgs", info.NewTableName.O), zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), zap.String("oldTableNameInQuery", queryInfo.oldTableName)) - } else if ok { - log.Warn("rename tables query is inconsistent with args skip query fallback", - zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("newSchemaNameInArgs", newSchemaName), - zap.String("newTableNameInArgs", info.NewTableName.O), - zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.String("oldTableNameInQuery", queryInfo.oldTableName)) - } - } - - if tableInfo, ok := args.tableMap[info.TableID]; ok { - if oldTableName == "" { - oldTableName = tableInfo.Name - } else if oldTableNameFromQuery && - tableInfo.Name != "" && - !strings.EqualFold(oldTableName, tableInfo.Name) { - // For cyclic rename statements, the old table name parsed from query can be - // a temporary name. Prefer schema store metadata to keep the table lifecycle correct. - log.Warn("rename tables query old table name mismatch with schema metadata prefer schema metadata", - zap.Int64("tableID", info.TableID), - zap.String("oldTableNameInQuery", oldTableName), - zap.String("oldTableNameInStore", tableInfo.Name), - zap.String("query", args.job.Query)) - oldTableName = tableInfo.Name } } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 3fe0b11c8a..10e067fb08 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3140,6 +3140,7 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) []string{"target_t1", "target_t2"}, 1010, ) + job.Query = "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`" ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ job: job, From e1b688da5d1b28392103828855fc316deb53a809 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 15:58:30 +0800 Subject: [PATCH 11/19] fix --- .../persist_storage_ddl_handlers.go | 70 +++++++++---------- .../schemastore/persist_storage_test.go | 29 ++++---- 2 files changed, 45 insertions(+), 54 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 615a1f58e1..32ef94d7d6 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -852,12 +852,14 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) renameTableInfos := renameArgs.RenameTableInfos multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos + // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. + // See https://github.com/pingcap/tidb/pull/64421 for the upstream fix. needFillOldTableNamesFromQuery := len(renameTableInfos) > 0 && renameTableInfos[0].OldTableName.O == "" queryInfos := make([]renameTableQueryInfo, 0) if needFillOldTableNamesFromQuery { queryInfos = parseRenameTablesQueryInfos(args.job.Query) if len(queryInfos) != len(renameTableInfos) { - log.Warn("rename tables query info length is inconsistent with args", + log.Panic("rename tables query info length is inconsistent with args", zap.Int("queryInfosLen", len(queryInfos)), zap.Int("renameArgsLen", len(renameTableInfos)), zap.String("query", args.job.Query)) @@ -872,49 +874,41 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) if needFillOldTableNamesFromQuery && oldTableName == "" { - if i >= len(queryInfos) { - log.Warn("rename tables query info is missing for old table name fallback", + queryInfo := queryInfos[i] + if !strings.EqualFold(queryInfo.newTableName, info.NewTableName.O) { + log.Panic("rename tables query new table name is inconsistent with args", zap.Int("index", i), - zap.String("newSchemaNameInArgs", newSchemaName), zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("newTableNameInQuery", queryInfo.newTableName), zap.String("query", args.job.Query)) - } else { - queryInfo := queryInfos[i] - if !strings.EqualFold(queryInfo.newTableName, info.NewTableName.O) { - log.Warn("rename tables query new table name is inconsistent with args", - zap.Int("index", i), - zap.String("newTableNameInArgs", info.NewTableName.O), - zap.String("newTableNameInQuery", queryInfo.newTableName), - zap.String("query", args.job.Query)) - } - if queryInfo.newSchemaName != "" && - !strings.EqualFold(queryInfo.newSchemaName, newSchemaName) { - log.Warn("rename tables query new schema name is inconsistent with args", - zap.Int("index", i), - zap.String("newSchemaNameInArgs", newSchemaName), - zap.String("newSchemaNameInQuery", queryInfo.newSchemaName), - zap.String("query", args.job.Query)) - } - if queryInfo.oldSchemaName != "" && - !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) { - log.Warn("rename tables query old schema name is inconsistent with args", - zap.Int("index", i), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("query", args.job.Query)) - } - oldTableName = queryInfo.oldTableName - log.Warn("rename tables args miss old table name fallback to query", - zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("oldTableNameInArgs", info.OldTableName.O), + } + if queryInfo.newSchemaName != "" && + !strings.EqualFold(queryInfo.newSchemaName, newSchemaName) { + log.Panic("rename tables query new schema name is inconsistent with args", + zap.Int("index", i), zap.String("newSchemaNameInArgs", newSchemaName), - zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("newSchemaNameInQuery", queryInfo.newSchemaName), + zap.String("query", args.job.Query)) + } + if queryInfo.oldSchemaName != "" && + !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) { + log.Panic("rename tables query old schema name is inconsistent with args", + zap.Int("index", i), + zap.String("oldSchemaNameInArgs", oldSchemaName), zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.String("oldTableNameInQuery", queryInfo.oldTableName)) + zap.Int64("oldSchemaIDInArgs", oldSchemaID), + zap.String("query", args.job.Query)) } + oldTableName = queryInfo.oldTableName + log.Info("rename tables args miss old table name fallback to query", + zap.Int64("tableID", info.TableID), + zap.Int64("oldSchemaIDInArgs", oldSchemaID), + zap.String("oldSchemaNameInArgs", oldSchemaName), + zap.String("oldTableNameInArgs", info.OldTableName.O), + zap.String("newSchemaNameInArgs", newSchemaName), + zap.String("newTableNameInArgs", info.NewTableName.O), + zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), + zap.String("oldTableNameInQuery", queryInfo.oldTableName)) } event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 10e067fb08..c545422869 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3197,6 +3197,8 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testi func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t *testing.T) { // Simulate: rename table a to c, b to a, c to b; // c only exists as a temporary name inside this statement. + // The query has 3 rename clauses, while rename table infos contain 2 entries. + // This should panic due to the strict consistency check. job := buildRenameTablesJobForTest( []int64{100, 100}, []int64{100, 100}, @@ -3208,23 +3210,18 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t * ) job.Query = "RENAME TABLE `test`.`a` TO `test`.`c`, `test`.`b` TO `test`.`a`, `test`.`c` TO `test`.`b`" - ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "test", Tables: map[int64]bool{200: true, 201: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "a"}, - 201: {SchemaID: 100, Name: "b"}, - }, + require.Panics(t, func() { + _ = buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "test", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "a"}, + 201: {SchemaID: 100, Name: "b"}, + }, + }) }) - - assert.Equal(t, - "RENAME TABLE `test`.`a` TO `test`.`b`;"+ - "RENAME TABLE `test`.`b` TO `test`.`a`;", - ddl.Query) - assert.Equal(t, []string{"a", "b"}, ddl.ExtraTableNames) - assert.NotContains(t, ddl.Query, "`c`") } func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { From 1b3f9453c516de4036031d0d37209492ad921bad Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 16:08:00 +0800 Subject: [PATCH 12/19] fix --- .../persist_storage_ddl_handlers.go | 16 ++--- .../schemastore/persist_storage_test.go | 61 +++++++++++++++++++ 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 32ef94d7d6..dd74be77e9 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -844,13 +844,13 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("query", args.job.Query), zap.Error(err)) } + renameTableInfos := renameArgs.RenameTableInfos + multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) { log.Panic("should not happen", zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)), zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos))) } - renameTableInfos := renameArgs.RenameTableInfos - multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. // See https://github.com/pingcap/tidb/pull/64421 for the upstream fix. @@ -864,6 +864,9 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.Int("renameArgsLen", len(renameTableInfos)), zap.String("query", args.job.Query)) } + log.Info("rename tables args miss old table name fallback to query", + zap.Int("tableCount", len(renameTableInfos)), + zap.String("query", args.job.Query)) } var querys []string @@ -900,15 +903,6 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("query", args.job.Query)) } oldTableName = queryInfo.oldTableName - log.Info("rename tables args miss old table name fallback to query", - zap.Int64("tableID", info.TableID), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("oldTableNameInArgs", info.OldTableName.O), - zap.String("newSchemaNameInArgs", newSchemaName), - zap.String("newTableNameInArgs", info.NewTableName.O), - zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.String("oldTableNameInQuery", queryInfo.oldTableName)) } event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index c545422869..e3fbd34122 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3252,6 +3252,67 @@ func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t assert.Equal(t, "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;", ddl.Query) } +func TestParseRenameTablesQueryInfos(t *testing.T) { + cases := []struct { + name string + query string + expected []renameTableQueryInfo + }{ + { + name: "multiple tables with schema", + query: "RENAME TABLE `db1`.`t1` TO `db2`.`t2`, `db1`.`t3` TO `db2`.`t4`", + expected: []renameTableQueryInfo{ + { + oldSchemaName: "db1", + oldTableName: "t1", + newSchemaName: "db2", + newTableName: "t2", + }, + { + oldSchemaName: "db1", + oldTableName: "t3", + newSchemaName: "db2", + newTableName: "t4", + }, + }, + }, + { + name: "without schema names", + query: "RENAME TABLE `t1` TO `t2`", + expected: []renameTableQueryInfo{ + { + oldSchemaName: "", + oldTableName: "t1", + newSchemaName: "", + newTableName: "t2", + }, + }, + }, + { + name: "empty query", + query: "", + expected: nil, + }, + { + name: "non rename statement", + query: "CREATE TABLE t(a INT)", + expected: nil, + }, + { + name: "invalid sql", + query: "RENAME TABLE", + expected: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseRenameTablesQueryInfos(tc.query) + assert.Equal(t, tc.expected, got) + }) + } +} + func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { cases := []struct { name string From bcd3b11686b2563fa4ca778d8c6f4e06fc8de32d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 16:35:23 +0800 Subject: [PATCH 13/19] fix unit test --- .../persist_storage_ddl_handlers.go | 27 +-- .../schemastore/persist_storage_test.go | 165 +++++++++++++++--- 2 files changed, 152 insertions(+), 40 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index dd74be77e9..91bb5f8ca3 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -600,7 +600,7 @@ func buildPersistedDDLEventForDropView(args buildPersistedDDLEventFuncArgs) Pers // We don't store the relationship: view_id -> table_name, get table name from args.job event.TableName = args.job.TableName // The query in job maybe "DROP VIEW test1.view1, test2.view2", we need rebuild it here. - event.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", event.SchemaName, event.TableName) + event.Query = fmt.Sprintf("DROP VIEW %s", common.QuoteSchema(event.SchemaName, event.TableName)) return event } @@ -662,7 +662,7 @@ func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) Per event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.TableName = getTableName(args.tableMap, event.TableID) // The query in job maybe "DROP TABLE test1.table1, test2.table2", we need rebuild it here. - event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.SchemaName, event.TableName) + event.Query = fmt.Sprintf("DROP TABLE %s", common.QuoteSchema(event.SchemaName, event.TableName)) return event } @@ -744,9 +744,9 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P log.Error("unknown stmt type", zap.String("query", args.job.Query), zap.Any("stmt", stmt)) } } - event.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", - oldSchemaName, oldTableName, - event.SchemaName, event.TableName) + event.Query = fmt.Sprintf("RENAME TABLE %s TO %s", + common.QuoteSchema(oldSchemaName, oldTableName), + common.QuoteSchema(event.SchemaName, event.TableName)) } return event } @@ -787,8 +787,10 @@ func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncA // Note that partition name should be parsed from original query, not the upperQuery. partName := strings.TrimSpace(event.Query[idx1:idx2]) partName = strings.Replace(partName, "`", "", -1) - event.Query = fmt.Sprintf("ALTER TABLE `%s`.`%s` EXCHANGE PARTITION `%s` WITH TABLE `%s`.`%s`", - event.ExtraSchemaName, event.ExtraTableName, partName, event.SchemaName, event.TableName) + event.Query = fmt.Sprintf("ALTER TABLE %s EXCHANGE PARTITION %s WITH TABLE %s", + common.QuoteSchema(event.ExtraSchemaName, event.ExtraTableName), + common.QuoteName(partName), + common.QuoteSchema(event.SchemaName, event.TableName)) if strings.HasSuffix(upperQuery, "WITHOUT VALIDATION") { event.Query += " WITHOUT VALIDATION" @@ -846,10 +848,11 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) } renameTableInfos := renameArgs.RenameTableInfos multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos - if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) { + if len(renameTableInfos) != len(multipleTableInfos) { log.Panic("should not happen", - zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)), - zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos))) + zap.Int("renameArgsLen", len(renameTableInfos)), + zap.Int("multipleTableInfosLen", len(multipleTableInfos)), + zap.String("query", args.job.Query)) } // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. @@ -910,7 +913,9 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) event.ExtraTableNames = append(event.ExtraTableNames, oldTableName) event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID) event.SchemaNames = append(event.SchemaNames, newSchemaName) - querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, newSchemaName, info.NewTableName.O)) + querys = append(querys, fmt.Sprintf("RENAME TABLE %s TO %s;", + common.QuoteSchema(oldSchemaName, oldTableName), + common.QuoteSchema(newSchemaName, info.NewTableName.O))) } event.Query = strings.Join(querys, "") diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index e3fbd34122..b0fa0ec910 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3194,11 +3194,8 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testi assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) } -func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t *testing.T) { - // Simulate: rename table a to c, b to a, c to b; - // c only exists as a temporary name inside this statement. - // The query has 3 rename clauses, while rename table infos contain 2 entries. - // This should panic due to the strict consistency check. +func TestBuildPersistedDDLEventForRenameTablesCyclicRenameNormal(t *testing.T) { + // Simulate: rename table a to b, b to a. job := buildRenameTablesJobForTest( []int64{100, 100}, []int64{100, 100}, @@ -3208,48 +3205,158 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameWithTemporaryTable(t * []string{"b", "a"}, 1010, ) - job.Query = "RENAME TABLE `test`.`a` TO `test`.`c`, `test`.`b` TO `test`.`a`, `test`.`c` TO `test`.`b`" + job.Query = "RENAME TABLE `test`.`a` TO `test`.`b`, `test`.`b` TO `test`.`a`" - require.Panics(t, func() { - _ = buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "test", Tables: map[int64]bool{200: true, 201: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "a"}, - 201: {SchemaID: 100, Name: "b"}, - }, - }) + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "test", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "a"}, + 201: {SchemaID: 100, Name: "b"}, + }, }) + + assert.Equal(t, + "RENAME TABLE `test`.`a` TO `test`.`b`;"+ + "RENAME TABLE `test`.`b` TO `test`.`a`;", + ddl.Query) + assert.Equal(t, []string{"a", "b"}, ddl.ExtraTableNames) } func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { job := buildRenameTablesJobForTest( - []int64{100}, - []int64{105}, - []int64{200}, - []string{"source_db"}, - []string{"source_t1_from_args"}, - []string{"target_t1"}, + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"source_db", "source_db"}, + []string{"source_t1_from_args", "source_t2_from_args"}, + []string{"target_t1", "target_t2"}, 1010, ) - job.Query = "RENAME TABLE `source_db`.`source_t1_from_query` TO `target_db`.`target_t1`" + job.Query = "RENAME TABLE `source_db`.`source_t1_from_query` TO `target_db`.`target_t1`, `source_db`.`source_t2_from_query` TO `target_db`.`target_t2`" ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ job: job, databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "source_db", Tables: map[int64]bool{200: true}}, - 105: {Name: "target_db", Tables: map[int64]bool{200: true}}, + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, }, tableMap: map[int64]*BasicTableInfo{ 200: {SchemaID: 100, Name: "source_t1_from_store"}, + 201: {SchemaID: 100, Name: "source_t2_from_store"}, + }, + }) + + assert.Equal(t, []string{"source_t1_from_args", "source_t2_from_args"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) + assert.Equal(t, + "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2_from_args` TO `target_db`.`target_t2`;", + ddl.Query) +} + +func TestBuildPersistedDDLEventForRenameTablesEscapesIdentifiers(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"source`db", "source`db"}, + []string{"source`t1", "source`t2"}, + []string{"target`t1", "target`t2"}, + 1010, + ) + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source`db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target`db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source`t1"}, + 201: {SchemaID: 100, Name: "source`t2"}, }, }) - assert.Equal(t, []string{"source_t1_from_args"}, ddl.ExtraTableNames) - assert.Equal(t, []string{"source_db"}, ddl.ExtraSchemaNames) - assert.Equal(t, "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;", ddl.Query) + assert.Equal(t, + "RENAME TABLE `source``db`.`source``t1` TO `target``db`.`target``t1`;"+ + "RENAME TABLE `source``db`.`source``t2` TO `target``db`.`target``t2`;", + ddl.Query) +} + +func TestBuildPersistedDDLEventForRenameTableEscapesIdentifiers(t *testing.T) { + job := buildRenameTableJobForTest(100, 101, "target`t", 100, &model.InvolvingSchemaInfo{ + Database: "source`db", + Table: "source`t", + }) + // Keep empty to force using InvolvingSchemaInfo as source name. + job.Query = "" + + ddl := buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "target`db", Tables: map[int64]bool{101: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 101: {SchemaID: 100, Name: "source`t"}, + }, + }) + + assert.Equal(t, "RENAME TABLE `source``db`.`source``t` TO `target``db`.`target``t`", ddl.Query) +} + +func TestBuildPersistedDDLEventForDropTableEscapesIdentifiers(t *testing.T) { + job := buildDropTableJobForTest(100, 200, 1000) + ddl := buildPersistedDDLEventForDropTable(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "schema`x", Tables: map[int64]bool{200: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "table`x"}, + }, + }) + assert.Equal(t, "DROP TABLE `schema``x`.`table``x`", ddl.Query) +} + +func TestBuildPersistedDDLEventForDropViewEscapesIdentifiers(t *testing.T) { + job := buildDropViewJobForTest(100, 1000) + job.TableName = "view`x" + ddl := buildPersistedDDLEventForDropView(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "schema`x", Tables: map[int64]bool{}}, + }, + }) + assert.Equal(t, "DROP VIEW `schema``x`.`view``x`", ddl.Query) +} + +func TestBuildPersistedDDLEventForExchangePartitionEscapesIdentifiers(t *testing.T) { + job := buildExchangePartitionJobForTest(100, 200, 300, "pt`x", []int64{301}, 1000) + job.Query = "ALTER TABLE `ignored`.`ignored` EXCHANGE PARTITION `p0` WITH TABLE `ignored2`.`ignored2` WITHOUT VALIDATION" + + ddl := buildPersistedDDLEventForExchangePartition(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "normal`db", Tables: map[int64]bool{200: true}}, + 101: {Name: "part`db", Tables: map[int64]bool{300: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "normal`t"}, + 300: {SchemaID: 101, Name: "pt`x"}, + }, + partitionMap: map[int64]BasicPartitionInfo{ + 300: { + 301: nil, + }, + }, + }) + + assert.Equal(t, + "ALTER TABLE `part``db`.`pt``x` EXCHANGE PARTITION `p0` WITH TABLE `normal``db`.`normal``t` WITHOUT VALIDATION", + ddl.Query) } func TestParseRenameTablesQueryInfos(t *testing.T) { From 5f1e4047e6ddc7869a3a0693f289be9eae846141 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 16:45:20 +0800 Subject: [PATCH 14/19] fix --- .../persist_storage_ddl_handlers.go | 4 +- .../schemastore/persist_storage_test.go | 244 ++++++++---------- 2 files changed, 116 insertions(+), 132 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 91bb5f8ca3..20405fb5ad 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -520,7 +520,7 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 { // schemaName should be "Name.O" func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) (int64, bool) { for id, info := range databaseMap { - if info.Name == schemaName { + if strings.EqualFold(info.Name, schemaName) { return id, true } } @@ -530,7 +530,7 @@ func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName str // tableName should be "Name.O" func findTableIDByName(tableMap map[int64]*BasicTableInfo, schemaID int64, tableName string) (int64, bool) { for id, info := range tableMap { - if info.SchemaID == schemaID && info.Name == tableName { + if info.SchemaID == schemaID && strings.EqualFold(info.Name, tableName) { return id, true } } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index b0fa0ec910..48d1040481 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3150,7 +3150,6 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) }, tableMap: map[int64]*BasicTableInfo{ 200: {SchemaID: 100, Name: "source_t1"}, - 201: {SchemaID: 100, Name: "source_t2"}, }, }) @@ -3163,49 +3162,19 @@ func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) assert.Equal(t, []string{"target_db", "target_db"}, ddl.SchemaNames) } -func TestBuildPersistedDDLEventForRenameTablesFallbackQueryOldTableName(t *testing.T) { +func TestBuildPersistedDDLEventForRenameTablesCyclicRename(t *testing.T) { + // Simulate: rename table a to c, b to a, c to b. + // Table c only exists as a temporary name inside this statement. job := buildRenameTablesJobForTest( - []int64{100, 100}, - []int64{105, 105}, - []int64{200, 201}, - []string{"source_db", "source_db"}, - []string{"", ""}, - []string{"target_t1", "target_t2"}, + []int64{100, 100, 100}, + []int64{100, 100, 100}, + []int64{200, 201, 201}, + []string{"test", "test", "test"}, + []string{"", "", ""}, + []string{"c", "a", "b"}, 1010, ) - job.Query = "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`" - - ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, - 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "source_t1"}, - }, - }) - - assert.Equal(t, - "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`;"+ - "RENAME TABLE `source_db`.`source_t2` TO `target_db`.`target_t2`;", - ddl.Query) - assert.Equal(t, []string{"source_t1", "source_t2"}, ddl.ExtraTableNames) - assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) -} - -func TestBuildPersistedDDLEventForRenameTablesCyclicRenameNormal(t *testing.T) { - // Simulate: rename table a to b, b to a. - job := buildRenameTablesJobForTest( - []int64{100, 100}, - []int64{100, 100}, - []int64{200, 201}, - []string{"test", "test"}, - []string{"", ""}, - []string{"b", "a"}, - 1010, - ) - job.Query = "RENAME TABLE `test`.`a` TO `test`.`b`, `test`.`b` TO `test`.`a`" + job.Query = "RENAME TABLE `test`.`a` TO `test`.`c`, `test`.`b` TO `test`.`a`, `test`.`c` TO `test`.`b`" ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ job: job, @@ -3219,10 +3188,11 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRenameNormal(t *testing.T) { }) assert.Equal(t, - "RENAME TABLE `test`.`a` TO `test`.`b`;"+ - "RENAME TABLE `test`.`b` TO `test`.`a`;", + "RENAME TABLE `test`.`a` TO `test`.`c`;"+ + "RENAME TABLE `test`.`b` TO `test`.`a`;"+ + "RENAME TABLE `test`.`c` TO `test`.`b`;", ddl.Query) - assert.Equal(t, []string{"a", "b"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"a", "b", "c"}, ddl.ExtraTableNames) } func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { @@ -3257,106 +3227,108 @@ func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t ddl.Query) } -func TestBuildPersistedDDLEventForRenameTablesEscapesIdentifiers(t *testing.T) { - job := buildRenameTablesJobForTest( - []int64{100, 100}, - []int64{105, 105}, - []int64{200, 201}, - []string{"source`db", "source`db"}, - []string{"source`t1", "source`t2"}, - []string{"target`t1", "target`t2"}, - 1010, - ) +func TestBuildPersistedDDLEventEscapesIdentifiers(t *testing.T) { + t.Run("rename tables", func(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"source`db", "source`db"}, + []string{"source`t1", "source`t2"}, + []string{"target`t1", "target`t2"}, + 1010, + ) + + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source`db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target`db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source`t1"}, + 201: {SchemaID: 100, Name: "source`t2"}, + }, + }) - ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "source`db", Tables: map[int64]bool{200: true, 201: true}}, - 105: {Name: "target`db", Tables: map[int64]bool{200: true, 201: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "source`t1"}, - 201: {SchemaID: 100, Name: "source`t2"}, - }, + assert.Equal(t, + "RENAME TABLE `source``db`.`source``t1` TO `target``db`.`target``t1`;"+ + "RENAME TABLE `source``db`.`source``t2` TO `target``db`.`target``t2`;", + ddl.Query) }) - assert.Equal(t, - "RENAME TABLE `source``db`.`source``t1` TO `target``db`.`target``t1`;"+ - "RENAME TABLE `source``db`.`source``t2` TO `target``db`.`target``t2`;", - ddl.Query) -} + t.Run("rename table", func(t *testing.T) { + job := buildRenameTableJobForTest(100, 101, "target`t", 100, &model.InvolvingSchemaInfo{ + Database: "source`db", + Table: "source`t", + }) + // Keep empty to force using InvolvingSchemaInfo as source name. + job.Query = "" -func TestBuildPersistedDDLEventForRenameTableEscapesIdentifiers(t *testing.T) { - job := buildRenameTableJobForTest(100, 101, "target`t", 100, &model.InvolvingSchemaInfo{ - Database: "source`db", - Table: "source`t", - }) - // Keep empty to force using InvolvingSchemaInfo as source name. - job.Query = "" + ddl := buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "target`db", Tables: map[int64]bool{101: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 101: {SchemaID: 100, Name: "source`t"}, + }, + }) - ddl := buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "target`db", Tables: map[int64]bool{101: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 101: {SchemaID: 100, Name: "source`t"}, - }, + assert.Equal(t, "RENAME TABLE `source``db`.`source``t` TO `target``db`.`target``t`", ddl.Query) }) - assert.Equal(t, "RENAME TABLE `source``db`.`source``t` TO `target``db`.`target``t`", ddl.Query) -} - -func TestBuildPersistedDDLEventForDropTableEscapesIdentifiers(t *testing.T) { - job := buildDropTableJobForTest(100, 200, 1000) - ddl := buildPersistedDDLEventForDropTable(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "schema`x", Tables: map[int64]bool{200: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "table`x"}, - }, + t.Run("drop table", func(t *testing.T) { + job := buildDropTableJobForTest(100, 200, 1000) + ddl := buildPersistedDDLEventForDropTable(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "schema`x", Tables: map[int64]bool{200: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "table`x"}, + }, + }) + assert.Equal(t, "DROP TABLE `schema``x`.`table``x`", ddl.Query) }) - assert.Equal(t, "DROP TABLE `schema``x`.`table``x`", ddl.Query) -} -func TestBuildPersistedDDLEventForDropViewEscapesIdentifiers(t *testing.T) { - job := buildDropViewJobForTest(100, 1000) - job.TableName = "view`x" - ddl := buildPersistedDDLEventForDropView(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "schema`x", Tables: map[int64]bool{}}, - }, + t.Run("drop view", func(t *testing.T) { + job := buildDropViewJobForTest(100, 1000) + job.TableName = "view`x" + ddl := buildPersistedDDLEventForDropView(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "schema`x", Tables: map[int64]bool{}}, + }, + }) + assert.Equal(t, "DROP VIEW `schema``x`.`view``x`", ddl.Query) }) - assert.Equal(t, "DROP VIEW `schema``x`.`view``x`", ddl.Query) -} -func TestBuildPersistedDDLEventForExchangePartitionEscapesIdentifiers(t *testing.T) { - job := buildExchangePartitionJobForTest(100, 200, 300, "pt`x", []int64{301}, 1000) - job.Query = "ALTER TABLE `ignored`.`ignored` EXCHANGE PARTITION `p0` WITH TABLE `ignored2`.`ignored2` WITHOUT VALIDATION" + t.Run("exchange partition", func(t *testing.T) { + job := buildExchangePartitionJobForTest(100, 200, 300, "pt`x", []int64{301}, 1000) + job.Query = "ALTER TABLE `ignored`.`ignored` EXCHANGE PARTITION `p0` WITH TABLE `ignored2`.`ignored2` WITHOUT VALIDATION" - ddl := buildPersistedDDLEventForExchangePartition(buildPersistedDDLEventFuncArgs{ - job: job, - databaseMap: map[int64]*BasicDatabaseInfo{ - 100: {Name: "normal`db", Tables: map[int64]bool{200: true}}, - 101: {Name: "part`db", Tables: map[int64]bool{300: true}}, - }, - tableMap: map[int64]*BasicTableInfo{ - 200: {SchemaID: 100, Name: "normal`t"}, - 300: {SchemaID: 101, Name: "pt`x"}, - }, - partitionMap: map[int64]BasicPartitionInfo{ - 300: { - 301: nil, + ddl := buildPersistedDDLEventForExchangePartition(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "normal`db", Tables: map[int64]bool{200: true}}, + 101: {Name: "part`db", Tables: map[int64]bool{300: true}}, }, - }, - }) + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "normal`t"}, + 300: {SchemaID: 101, Name: "pt`x"}, + }, + partitionMap: map[int64]BasicPartitionInfo{ + 300: { + 301: nil, + }, + }, + }) - assert.Equal(t, - "ALTER TABLE `part``db`.`pt``x` EXCHANGE PARTITION `p0` WITH TABLE `normal``db`.`normal``t` WITHOUT VALIDATION", - ddl.Query) + assert.Equal(t, + "ALTER TABLE `part``db`.`pt``x` EXCHANGE PARTITION `p0` WITH TABLE `normal``db`.`normal``t` WITHOUT VALIDATION", + ddl.Query) + }) } func TestParseRenameTablesQueryInfos(t *testing.T) { @@ -3479,6 +3451,18 @@ func TestBuildPersistedDDLEventForCreateTableLikeSetsReferTableID(t *testing.T) partitionIDs: []int64{111, 112}, expectedReferID: 101, }, + { + name: "refer table name with different case", + query: "CREATE TABLE `b` LIKE `A`", + partitionIDs: nil, + expectedReferID: 101, + }, + { + name: "refer schema and table names with different case", + query: "CREATE TABLE `b` LIKE `TeSt`.`A`", + partitionIDs: nil, + expectedReferID: 101, + }, } for _, tc := range cases { From 1098b7b329aa5f80d219454775acee50acdad178 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 17:35:45 +0800 Subject: [PATCH 15/19] fix --- .../persist_storage_ddl_handlers.go | 70 +++++++++---------- .../schemastore/persist_storage_test.go | 8 +-- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 20405fb5ad..9fb159b381 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -723,6 +723,9 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P // // InvolvingSchemaInfo returns the schema info involved in the job. // The value should be stored in lower case. + // + // Prefer names parsed from the original query whenever possible. + // See https://github.com/pingcap/ticdc/pull/2218 for background. oldSchemaName := args.job.InvolvingSchemaInfo[0].Database oldTableName := args.job.InvolvingSchemaInfo[0].Table stmt, err := parser.New().ParseOneStmt(args.job.Query, "", "") @@ -855,21 +858,29 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("query", args.job.Query)) } + // Prefer names parsed from the original query whenever possible. + // See https://github.com/pingcap/ticdc/pull/2218 for background. + // // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. // See https://github.com/pingcap/tidb/pull/64421 for the upstream fix. - needFillOldTableNamesFromQuery := len(renameTableInfos) > 0 && renameTableInfos[0].OldTableName.O == "" - queryInfos := make([]renameTableQueryInfo, 0) - if needFillOldTableNamesFromQuery { - queryInfos = parseRenameTablesQueryInfos(args.job.Query) - if len(queryInfos) != len(renameTableInfos) { - log.Panic("rename tables query info length is inconsistent with args", - zap.Int("queryInfosLen", len(queryInfos)), - zap.Int("renameArgsLen", len(renameTableInfos)), + queryInfos := parseRenameTablesQueryInfos(args.job.Query) + useQueryInfos := len(queryInfos) > 0 + if useQueryInfos && len(queryInfos) != len(renameTableInfos) { + log.Panic("rename tables query info length is inconsistent with args", + zap.Int("queryInfosLen", len(queryInfos)), + zap.Int("renameArgsLen", len(renameTableInfos)), + zap.String("query", args.job.Query)) + } + if len(renameTableInfos) > 0 && renameTableInfos[0].OldTableName.O == "" { + if useQueryInfos { + log.Info("rename tables args miss old table name fallback to query", + zap.Int("tableCount", len(renameTableInfos)), + zap.String("query", args.job.Query)) + } else { + log.Panic("rename tables args miss old table name and query is not parseable", + zap.Int("tableCount", len(renameTableInfos)), zap.String("query", args.job.Query)) } - log.Info("rename tables args miss old table name fallback to query", - zap.Int("tableCount", len(renameTableInfos)), - zap.String("query", args.job.Query)) } var querys []string @@ -878,34 +889,21 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldSchemaName := info.OldSchemaName.O oldTableName := info.OldTableName.O newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - - if needFillOldTableNamesFromQuery && oldTableName == "" { + newTableName := info.NewTableName.O + if useQueryInfos { queryInfo := queryInfos[i] - if !strings.EqualFold(queryInfo.newTableName, info.NewTableName.O) { - log.Panic("rename tables query new table name is inconsistent with args", - zap.Int("index", i), - zap.String("newTableNameInArgs", info.NewTableName.O), - zap.String("newTableNameInQuery", queryInfo.newTableName), - zap.String("query", args.job.Query)) + if queryInfo.oldSchemaName != "" { + oldSchemaName = queryInfo.oldSchemaName + } + if queryInfo.oldTableName != "" { + oldTableName = queryInfo.oldTableName } - if queryInfo.newSchemaName != "" && - !strings.EqualFold(queryInfo.newSchemaName, newSchemaName) { - log.Panic("rename tables query new schema name is inconsistent with args", - zap.Int("index", i), - zap.String("newSchemaNameInArgs", newSchemaName), - zap.String("newSchemaNameInQuery", queryInfo.newSchemaName), - zap.String("query", args.job.Query)) + if queryInfo.newSchemaName != "" { + newSchemaName = queryInfo.newSchemaName } - if queryInfo.oldSchemaName != "" && - !strings.EqualFold(queryInfo.oldSchemaName, oldSchemaName) { - log.Panic("rename tables query old schema name is inconsistent with args", - zap.Int("index", i), - zap.String("oldSchemaNameInArgs", oldSchemaName), - zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName), - zap.Int64("oldSchemaIDInArgs", oldSchemaID), - zap.String("query", args.job.Query)) + if queryInfo.newTableName != "" { + newTableName = queryInfo.newTableName } - oldTableName = queryInfo.oldTableName } event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID) @@ -915,7 +913,7 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) event.SchemaNames = append(event.SchemaNames, newSchemaName) querys = append(querys, fmt.Sprintf("RENAME TABLE %s TO %s;", common.QuoteSchema(oldSchemaName, oldTableName), - common.QuoteSchema(newSchemaName, info.NewTableName.O))) + common.QuoteSchema(newSchemaName, newTableName))) } event.Query = strings.Join(querys, "") diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 48d1040481..0ddc492a21 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3195,7 +3195,7 @@ func TestBuildPersistedDDLEventForRenameTablesCyclicRename(t *testing.T) { assert.Equal(t, []string{"a", "b", "c"}, ddl.ExtraTableNames) } -func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t *testing.T) { +func TestBuildPersistedDDLEventForRenameTablesPreferQueryNames(t *testing.T) { job := buildRenameTablesJobForTest( []int64{100, 100}, []int64{105, 105}, @@ -3219,11 +3219,11 @@ func TestBuildPersistedDDLEventForRenameTablesDoNotOverrideExistingArgsByQuery(t }, }) - assert.Equal(t, []string{"source_t1_from_args", "source_t2_from_args"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_t1_from_query", "source_t2_from_query"}, ddl.ExtraTableNames) assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) assert.Equal(t, - "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;"+ - "RENAME TABLE `source_db`.`source_t2_from_args` TO `target_db`.`target_t2`;", + "RENAME TABLE `source_db`.`source_t1_from_query` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2_from_query` TO `target_db`.`target_t2`;", ddl.Query) } From 4d618831cb6479ce3a0e7f3c8a5d7de192b1c4eb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 17:46:25 +0800 Subject: [PATCH 16/19] add some comment --- logservice/schemastore/persist_storage_ddl_handlers.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 9fb159b381..bf92222598 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -724,6 +724,8 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P // InvolvingSchemaInfo returns the schema info involved in the job. // The value should be stored in lower case. // + // InvolvingSchemaInfo may store normalized lower-case names, + // while the original query can keep user-provided identifier case. // Prefer names parsed from the original query whenever possible. // See https://github.com/pingcap/ticdc/pull/2218 for background. oldSchemaName := args.job.InvolvingSchemaInfo[0].Database @@ -858,6 +860,8 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) zap.String("query", args.job.Query)) } + // RenameTableInfos may store normalized lower-case names, + // while the original query can keep user-provided identifier case. // Prefer names parsed from the original query whenever possible. // See https://github.com/pingcap/ticdc/pull/2218 for background. // From aee42789e730089eb047176466db7210740e2ca7 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 19:12:33 +0800 Subject: [PATCH 17/19] refactor --- .../persist_storage_ddl_handlers.go | 38 ++++----- .../schemastore/persist_storage_test.go | 78 +++++++++++++++++-- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index bf92222598..2b917737d9 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -814,20 +814,20 @@ type renameTableQueryInfo struct { newTableName string } -func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo { +func parseRenameTablesQueryInfos(query string) ([]renameTableQueryInfo, bool) { if query == "" { - return nil + return nil, false } stmt, err := parser.New().ParseOneStmt(query, "", "") if err != nil { log.Warn("parse rename tables query failed", zap.String("query", query), zap.Error(err)) - return nil + return nil, false } renameStmt, ok := stmt.(*ast.RenameTableStmt) if !ok { - return nil + return nil, false } queryInfos := make([]renameTableQueryInfo, 0, len(renameStmt.TableToTables)) @@ -839,7 +839,7 @@ func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo { newTableName: tableToTable.NewTable.Name.O, }) } - return queryInfos + return queryInfos, true } func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { @@ -864,27 +864,21 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) // while the original query can keep user-provided identifier case. // Prefer names parsed from the original query whenever possible. // See https://github.com/pingcap/ticdc/pull/2218 for background. - // - // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. - // See https://github.com/pingcap/tidb/pull/64421 for the upstream fix. - queryInfos := parseRenameTablesQueryInfos(args.job.Query) - useQueryInfos := len(queryInfos) > 0 - if useQueryInfos && len(queryInfos) != len(renameTableInfos) { + queryInfos, queryParsed := parseRenameTablesQueryInfos(args.job.Query) + if queryParsed && len(queryInfos) != len(renameTableInfos) { log.Panic("rename tables query info length is inconsistent with args", zap.Int("queryInfosLen", len(queryInfos)), zap.Int("renameArgsLen", len(renameTableInfos)), zap.String("query", args.job.Query)) } - if len(renameTableInfos) > 0 && renameTableInfos[0].OldTableName.O == "" { - if useQueryInfos { - log.Info("rename tables args miss old table name fallback to query", - zap.Int("tableCount", len(renameTableInfos)), - zap.String("query", args.job.Query)) - } else { - log.Panic("rename tables args miss old table name and query is not parseable", - zap.Int("tableCount", len(renameTableInfos)), - zap.String("query", args.job.Query)) - } + + // TiDB <= v8.1 may emit empty old table names for RENAME TABLES args. + // See https://github.com/pingcap/tidb/pull/64421 for the upstream fix. + if !queryParsed && renameTableInfos[0].OldTableName.O == "" { + // TODO: return error instead of falling back to args once builder supports error propagation. + log.Warn("rename tables args miss old table name and query is unavailable, keep args as-is", + zap.Int("tableCount", len(renameTableInfos)), + zap.String("query", args.job.Query)) } var querys []string @@ -894,7 +888,7 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) oldTableName := info.OldTableName.O newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) newTableName := info.NewTableName.O - if useQueryInfos { + if queryParsed { queryInfo := queryInfos[i] if queryInfo.oldSchemaName != "" { oldSchemaName = queryInfo.oldSchemaName diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 0ddc492a21..bb52e1b5a3 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3227,6 +3227,67 @@ func TestBuildPersistedDDLEventForRenameTablesPreferQueryNames(t *testing.T) { ddl.Query) } +func TestBuildPersistedDDLEventForRenameTablesKeepArgsWhenQueryUnavailable(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"source_db", "source_db"}, + []string{"source_t1_from_args", "source_t2_from_args"}, + []string{"target_t1", "target_t2"}, + 1010, + ) + job.Query = "RENAME TABLE" + + require.NotPanics(t, func() { + ddl := buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1_from_store"}, + 201: {SchemaID: 100, Name: "source_t2_from_store"}, + }, + }) + + assert.Equal(t, []string{"source_t1_from_args", "source_t2_from_args"}, ddl.ExtraTableNames) + assert.Equal(t, []string{"source_db", "source_db"}, ddl.ExtraSchemaNames) + assert.Equal(t, + "RENAME TABLE `source_db`.`source_t1_from_args` TO `target_db`.`target_t1`;"+ + "RENAME TABLE `source_db`.`source_t2_from_args` TO `target_db`.`target_t2`;", + ddl.Query) + }) +} + +func TestBuildPersistedDDLEventForRenameTablesPanicOnQueryInfoLengthMismatch(t *testing.T) { + job := buildRenameTablesJobForTest( + []int64{100, 100}, + []int64{105, 105}, + []int64{200, 201}, + []string{"source_db", "source_db"}, + []string{"source_t1", "source_t2"}, + []string{"target_t1", "target_t2"}, + 1010, + ) + job.Query = "RENAME TABLE `source_db`.`source_t1` TO `target_db`.`target_t1`, `source_db`.`source_t2` TO `target_db`.`target_t2`, `source_db`.`source_t3` TO `target_db`.`target_t3`" + + require.Panics(t, func() { + _ = buildPersistedDDLEventForRenameTables(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "source_db", Tables: map[int64]bool{200: true, 201: true}}, + 105: {Name: "target_db", Tables: map[int64]bool{200: true, 201: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 200: {SchemaID: 100, Name: "source_t1"}, + 201: {SchemaID: 100, Name: "source_t2"}, + }, + }) + }) +} + func TestBuildPersistedDDLEventEscapesIdentifiers(t *testing.T) { t.Run("rename tables", func(t *testing.T) { job := buildRenameTablesJobForTest( @@ -3335,11 +3396,13 @@ func TestParseRenameTablesQueryInfos(t *testing.T) { cases := []struct { name string query string + parsed bool expected []renameTableQueryInfo }{ { - name: "multiple tables with schema", - query: "RENAME TABLE `db1`.`t1` TO `db2`.`t2`, `db1`.`t3` TO `db2`.`t4`", + name: "multiple tables with schema", + query: "RENAME TABLE `db1`.`t1` TO `db2`.`t2`, `db1`.`t3` TO `db2`.`t4`", + parsed: true, expected: []renameTableQueryInfo{ { oldSchemaName: "db1", @@ -3356,8 +3419,9 @@ func TestParseRenameTablesQueryInfos(t *testing.T) { }, }, { - name: "without schema names", - query: "RENAME TABLE `t1` TO `t2`", + name: "without schema names", + query: "RENAME TABLE `t1` TO `t2`", + parsed: true, expected: []renameTableQueryInfo{ { oldSchemaName: "", @@ -3370,23 +3434,27 @@ func TestParseRenameTablesQueryInfos(t *testing.T) { { name: "empty query", query: "", + parsed: false, expected: nil, }, { name: "non rename statement", query: "CREATE TABLE t(a INT)", + parsed: false, expected: nil, }, { name: "invalid sql", query: "RENAME TABLE", + parsed: false, expected: nil, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - got := parseRenameTablesQueryInfos(tc.query) + got, parsed := parseRenameTablesQueryInfos(tc.query) + assert.Equal(t, tc.parsed, parsed) assert.Equal(t, tc.expected, got) }) } From ddd65a4f3f47af28fa1df2d0e413caccebec4a2d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 21:27:37 +0800 Subject: [PATCH 18/19] f (cherry picked from commit e021af10da381266be4af56e17f379b6a4a2e75b) --- tests/integration_tests/_utils/test_prepare | 72 +++++++++++++++++++++ tests/integration_tests/vector/run.sh | 1 + 2 files changed, 73 insertions(+) diff --git a/tests/integration_tests/_utils/test_prepare b/tests/integration_tests/_utils/test_prepare index 81121ee207..d149188064 100644 --- a/tests/integration_tests/_utils/test_prepare +++ b/tests/integration_tests/_utils/test_prepare @@ -93,3 +93,75 @@ else fi export TEST_OUT_DATA_DIR=test_out_data + +function get_tidb_release_version() { + local tidbHost=${1:-$UP_TIDB_HOST} + local tidbPort=${2:-$UP_TIDB_PORT} + + mysql -uroot -h${tidbHost} -P${tidbPort} --default-character-set utf8mb4 -e "select tidb_version()\G" \ + | awk -F': ' '/Release Version/ {print $2; exit}' | tr -d ' ' +} + +function normalize_tidb_semver_triplet() { + local rawVersion=${1#v} + rawVersion=${rawVersion%%-*} + rawVersion=${rawVersion%%+*} + + local major minor patch + IFS='.' read -r major minor patch <<<"$rawVersion" + major=${major:-0} + minor=${minor:-0} + patch=${patch:-0} + + if ! [[ "$major" =~ ^[0-9]+$ && "$minor" =~ ^[0-9]+$ && "$patch" =~ ^[0-9]+$ ]]; then + return 1 + fi + + echo "$major $minor $patch" +} + +function tidb_version_less_than() { + local currentVersion=$1 + local minVersion=$2 + + local currentTriplet minTriplet + currentTriplet=$(normalize_tidb_semver_triplet "$currentVersion") || return 1 + minTriplet=$(normalize_tidb_semver_triplet "$minVersion") || return 1 + + local -a currentParts minParts + read -r -a currentParts <<<"$currentTriplet" + read -r -a minParts <<<"$minTriplet" + + local idx currentPart minPart + for idx in 0 1 2; do + currentPart=$((10#${currentParts[$idx]})) + minPart=$((10#${minParts[$idx]})) + if ((currentPart < minPart)); then + return 0 + fi + if ((currentPart > minPart)); then + return 1 + fi + done + + return 1 +} + +function skip_if_tidb_version_less_than() { + local minVersion=$1 + local tidbHost=${2:-$UP_TIDB_HOST} + local tidbPort=${3:-$UP_TIDB_PORT} + local testName=${TEST_NAME:-unknown} + + local tidbReleaseVersion + tidbReleaseVersion=$(get_tidb_release_version "$tidbHost" "$tidbPort" || true) + if [ -z "$tidbReleaseVersion" ]; then + echo "[$(date)] failed to parse TiDB release version from ${tidbHost}:${tidbPort}, continue test case ${testName}" + return + fi + + if tidb_version_less_than "$tidbReleaseVersion" "$minVersion"; then + echo "[$(date)] <<<<<< skip test case ${testName}, TiDB version ${tidbReleaseVersion} is less than ${minVersion} >>>>>>" + exit 0 + fi +} diff --git a/tests/integration_tests/vector/run.sh b/tests/integration_tests/vector/run.sh index 4bec91c09d..4ddd2b6b05 100755 --- a/tests/integration_tests/vector/run.sh +++ b/tests/integration_tests/vector/run.sh @@ -12,6 +12,7 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR + skip_if_tidb_version_less_than "v8.5.0" start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY From 0ee7a0a259bab5ce78ba69e50f997c1a03f34d99 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Mar 2026 22:23:04 +0800 Subject: [PATCH 19/19] try --- tests/integration_tests/_utils/test_prepare | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/_utils/test_prepare b/tests/integration_tests/_utils/test_prepare index d149188064..4d087151fd 100644 --- a/tests/integration_tests/_utils/test_prepare +++ b/tests/integration_tests/_utils/test_prepare @@ -99,7 +99,8 @@ function get_tidb_release_version() { local tidbPort=${2:-$UP_TIDB_PORT} mysql -uroot -h${tidbHost} -P${tidbPort} --default-character-set utf8mb4 -e "select tidb_version()\G" \ - | awk -F': ' '/Release Version/ {print $2; exit}' | tr -d ' ' + | sed -nE 's/.*Release Version:[[:space:]]*(v[0-9]+(\.[0-9]+){0,2}([-.+][0-9A-Za-z.-]+)?).*/\1/p' \ + | head -n1 } function normalize_tidb_semver_triplet() {