Skip to content

Commit 238b999

Browse files
committed
try fix
1 parent 9bca9cb commit 238b999

File tree

2 files changed

+382
-20
lines changed

2 files changed

+382
-20
lines changed

logservice/schemastore/persist_storage_ddl_handlers.go

Lines changed: 291 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package schemastore
1515

1616
import (
17+
"encoding/json"
1718
"errors"
1819
"fmt"
1920
"strings"
@@ -800,58 +801,328 @@ func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncA
800801
return event
801802
}
802803

804+
type renameTableQueryInfo struct {
805+
oldSchemaName string
806+
oldTableName string
807+
newSchemaName string
808+
newTableName string
809+
}
810+
811+
func getRenameTablesArgsSafely(job *model.Job) (_ *model.RenameTablesArgs, err error) {
812+
defer func() {
813+
if r := recover(); r != nil {
814+
err = fmt.Errorf("panic when get rename tables args: %v", r)
815+
}
816+
}()
817+
return model.GetRenameTablesArgs(job)
818+
}
819+
820+
func decodeRenameTablesArgsFromRawArgs(job *model.Job) (*model.RenameTablesArgs, error) {
821+
if len(job.RawArgs) == 0 {
822+
return nil, fmt.Errorf("job raw args is empty")
823+
}
824+
825+
if job.Version == model.JobVersion2 {
826+
var args model.RenameTablesArgs
827+
if err := json.Unmarshal(job.RawArgs, &args); err != nil {
828+
return nil, err
829+
}
830+
return &args, nil
831+
}
832+
833+
var rawArgs []json.RawMessage
834+
if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil {
835+
return nil, err
836+
}
837+
838+
var (
839+
oldSchemaIDs []int64
840+
newSchemaIDs []int64
841+
newTableNames []ast.CIStr
842+
tableIDs []int64
843+
oldSchemaNames []ast.CIStr
844+
oldTableNames []ast.CIStr
845+
)
846+
if len(rawArgs) > 0 {
847+
if err := json.Unmarshal(rawArgs[0], &oldSchemaIDs); err != nil {
848+
return nil, err
849+
}
850+
}
851+
if len(rawArgs) > 1 {
852+
if err := json.Unmarshal(rawArgs[1], &newSchemaIDs); err != nil {
853+
return nil, err
854+
}
855+
}
856+
if len(rawArgs) > 2 {
857+
if err := json.Unmarshal(rawArgs[2], &newTableNames); err != nil {
858+
return nil, err
859+
}
860+
}
861+
if len(rawArgs) > 3 {
862+
if err := json.Unmarshal(rawArgs[3], &tableIDs); err != nil {
863+
return nil, err
864+
}
865+
}
866+
if len(rawArgs) > 4 {
867+
if err := json.Unmarshal(rawArgs[4], &oldSchemaNames); err != nil {
868+
return nil, err
869+
}
870+
}
871+
if len(rawArgs) > 5 {
872+
if err := json.Unmarshal(rawArgs[5], &oldTableNames); err != nil {
873+
return nil, err
874+
}
875+
}
876+
877+
n := len(oldSchemaIDs)
878+
if len(newSchemaIDs) > n {
879+
n = len(newSchemaIDs)
880+
}
881+
if len(newTableNames) > n {
882+
n = len(newTableNames)
883+
}
884+
if len(tableIDs) > n {
885+
n = len(tableIDs)
886+
}
887+
888+
infos := make([]*model.RenameTableArgs, 0, n)
889+
for i := 0; i < n; i++ {
890+
info := &model.RenameTableArgs{}
891+
if i < len(oldSchemaIDs) {
892+
info.OldSchemaID = oldSchemaIDs[i]
893+
}
894+
if i < len(newSchemaIDs) {
895+
info.NewSchemaID = newSchemaIDs[i]
896+
}
897+
if i < len(newTableNames) {
898+
info.NewTableName = newTableNames[i]
899+
}
900+
if i < len(tableIDs) {
901+
info.TableID = tableIDs[i]
902+
}
903+
if i < len(oldSchemaNames) {
904+
info.OldSchemaName = oldSchemaNames[i]
905+
}
906+
if i < len(oldTableNames) {
907+
info.OldTableName = oldTableNames[i]
908+
}
909+
infos = append(infos, info)
910+
}
911+
912+
return &model.RenameTablesArgs{RenameTableInfos: infos}, nil
913+
}
914+
915+
func getRenameTablesArgsWithFallback(job *model.Job) (*model.RenameTablesArgs, error) {
916+
renameArgs, err := getRenameTablesArgsSafely(job)
917+
if err == nil {
918+
return renameArgs, nil
919+
}
920+
921+
rawArgs, decodeErr := decodeRenameTablesArgsFromRawArgs(job)
922+
if decodeErr != nil {
923+
return nil, fmt.Errorf("get rename tables args failed: %v, decode from raw args failed: %v", err, decodeErr)
924+
}
925+
926+
log.Warn("get rename tables args failed fallback to decode raw args",
927+
zap.String("query", job.Query),
928+
zap.Error(err),
929+
zap.Int("rawArgLen", len(job.RawArgs)))
930+
return rawArgs, nil
931+
}
932+
933+
func parseRenameTablesQueryInfos(query string) []renameTableQueryInfo {
934+
if query == "" {
935+
return nil
936+
}
937+
stmt, err := parser.New().ParseOneStmt(query, "", "")
938+
if err != nil {
939+
log.Warn("parse rename tables query failed",
940+
zap.String("query", query),
941+
zap.Error(err))
942+
return nil
943+
}
944+
renameStmt, ok := stmt.(*ast.RenameTableStmt)
945+
if !ok {
946+
return nil
947+
}
948+
949+
queryInfos := make([]renameTableQueryInfo, 0, len(renameStmt.TableToTables))
950+
for _, tableToTable := range renameStmt.TableToTables {
951+
queryInfos = append(queryInfos, renameTableQueryInfo{
952+
oldSchemaName: tableToTable.OldTable.Schema.O,
953+
oldTableName: tableToTable.OldTable.Name.O,
954+
newSchemaName: tableToTable.NewTable.Schema.O,
955+
newTableName: tableToTable.NewTable.Name.O,
956+
})
957+
}
958+
return queryInfos
959+
}
960+
961+
func matchRenameQueryInfoByNewTable(
962+
queryInfos []renameTableQueryInfo,
963+
used map[int]struct{},
964+
newSchemaName, newTableName string,
965+
) (renameTableQueryInfo, int, bool) {
966+
for i, info := range queryInfos {
967+
if _, ok := used[i]; ok {
968+
continue
969+
}
970+
if !strings.EqualFold(info.newTableName, newTableName) {
971+
continue
972+
}
973+
if info.newSchemaName == "" {
974+
continue
975+
}
976+
if strings.EqualFold(info.newSchemaName, newSchemaName) {
977+
return info, i, true
978+
}
979+
}
980+
981+
candidateIdx := -1
982+
for i, info := range queryInfos {
983+
if _, ok := used[i]; ok {
984+
continue
985+
}
986+
if !strings.EqualFold(info.newTableName, newTableName) {
987+
continue
988+
}
989+
if info.newSchemaName != "" && !strings.EqualFold(info.newSchemaName, newSchemaName) {
990+
continue
991+
}
992+
if candidateIdx != -1 {
993+
return renameTableQueryInfo{}, -1, false
994+
}
995+
candidateIdx = i
996+
}
997+
if candidateIdx == -1 {
998+
return renameTableQueryInfo{}, -1, false
999+
}
1000+
return queryInfos[candidateIdx], candidateIdx, true
1001+
}
1002+
1003+
func getSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) int64 {
1004+
for schemaID, databaseInfo := range databaseMap {
1005+
if strings.EqualFold(databaseInfo.Name, schemaName) {
1006+
return schemaID
1007+
}
1008+
}
1009+
return 0
1010+
}
1011+
8031012
func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
8041013
// TODO: does rename tables has the same problem(finished ts is not the real commit ts) with rename table?
8051014
event := buildPersistedDDLEventCommon(args)
806-
renameArgs, err := model.GetRenameTablesArgs(args.job)
1015+
renameArgs, err := getRenameTablesArgsWithFallback(args.job)
8071016
if err != nil {
8081017
log.Panic("GetRenameTablesArgs failed",
8091018
zap.String("query", args.job.Query),
8101019
zap.Error(err))
8111020
}
812-
if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) {
813-
log.Panic("should not happen",
814-
zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)),
815-
zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos)))
1021+
renameTableInfos := renameArgs.RenameTableInfos
1022+
multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos
1023+
if len(renameTableInfos) != len(multipleTableInfos) {
1024+
minLen := len(renameTableInfos)
1025+
if len(multipleTableInfos) < minLen {
1026+
minLen = len(multipleTableInfos)
1027+
}
1028+
log.Warn("rename tables args length mismatch with table infos use min length",
1029+
zap.Int("renameArgsLen", len(renameTableInfos)),
1030+
zap.Int("multipleTableInfosLen", len(multipleTableInfos)),
1031+
zap.Int("minLen", minLen),
1032+
zap.String("query", args.job.Query))
1033+
renameTableInfos = renameTableInfos[:minLen]
1034+
multipleTableInfos = multipleTableInfos[:minLen]
8161035
}
8171036

1037+
queryInfos := parseRenameTablesQueryInfos(args.job.Query)
1038+
usedQueryInfos := make(map[int]struct{}, len(queryInfos))
8181039
var querys []string
819-
for _, info := range renameArgs.RenameTableInfos {
1040+
for _, info := range renameTableInfos {
8201041
oldSchemaID := info.OldSchemaID
8211042
oldSchemaName := info.OldSchemaName.O
8221043
oldTableName := info.OldTableName.O
823-
if oldSchemaName == "" || oldTableName == "" || oldSchemaID == 0 {
824-
if tableInfo, ok := args.tableMap[info.TableID]; ok {
825-
if oldSchemaID == 0 {
826-
oldSchemaID = tableInfo.SchemaID
827-
}
1044+
newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID)
1045+
1046+
if oldSchemaName == "" || oldTableName == "" {
1047+
queryInfo, queryInfoIndex, ok := matchRenameQueryInfoByNewTable(
1048+
queryInfos,
1049+
usedQueryInfos,
1050+
newSchemaName,
1051+
info.NewTableName.O,
1052+
)
1053+
if ok {
1054+
usedQueryInfos[queryInfoIndex] = struct{}{}
8281055
if oldSchemaName == "" {
829-
oldSchemaName = getSchemaName(args.databaseMap, tableInfo.SchemaID)
1056+
oldSchemaName = queryInfo.oldSchemaName
8301057
}
8311058
if oldTableName == "" {
832-
oldTableName = tableInfo.Name
1059+
oldTableName = queryInfo.oldTableName
1060+
}
1061+
if oldSchemaID == 0 && oldSchemaName != "" {
1062+
oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName)
8331063
}
834-
log.Warn("rename tables args miss old table identifiers fallback to schema store metadata",
1064+
log.Warn("rename tables args miss old table identifiers fallback to query",
8351065
zap.Int64("tableID", info.TableID),
8361066
zap.Int64("oldSchemaIDInArgs", info.OldSchemaID),
8371067
zap.String("oldSchemaNameInArgs", info.OldSchemaName.O),
8381068
zap.String("oldTableNameInArgs", info.OldTableName.O),
839-
zap.Int64("oldSchemaIDInStore", tableInfo.SchemaID),
840-
zap.String("oldTableNameInStore", tableInfo.Name))
1069+
zap.String("newSchemaNameInArgs", newSchemaName),
1070+
zap.String("newTableNameInArgs", info.NewTableName.O),
1071+
zap.String("oldSchemaNameInQuery", queryInfo.oldSchemaName),
1072+
zap.String("oldTableNameInQuery", queryInfo.oldTableName))
8411073
}
8421074
}
8431075

1076+
if tableInfo, ok := args.tableMap[info.TableID]; ok {
1077+
oldSchemaNameInStore := getSchemaName(args.databaseMap, tableInfo.SchemaID)
1078+
if oldSchemaID == 0 {
1079+
oldSchemaID = tableInfo.SchemaID
1080+
}
1081+
if oldSchemaName == "" {
1082+
oldSchemaName = oldSchemaNameInStore
1083+
}
1084+
if oldTableName == "" {
1085+
oldTableName = tableInfo.Name
1086+
} else if tableInfo.Name != "" &&
1087+
!strings.EqualFold(oldTableName, tableInfo.Name) {
1088+
// For cyclic rename statements, the old table name parsed from query can be
1089+
// a temporary name. Prefer schema store metadata to keep the table lifecycle correct.
1090+
log.Warn("rename tables query old table name mismatch with schema metadata prefer schema metadata",
1091+
zap.Int64("tableID", info.TableID),
1092+
zap.String("oldTableNameInQuery", oldTableName),
1093+
zap.String("oldTableNameInStore", tableInfo.Name),
1094+
zap.String("query", args.job.Query))
1095+
oldTableName = tableInfo.Name
1096+
oldSchemaName = oldSchemaNameInStore
1097+
oldSchemaID = tableInfo.SchemaID
1098+
}
1099+
}
1100+
if oldSchemaID == 0 && oldSchemaName != "" {
1101+
oldSchemaID = getSchemaIDByName(args.databaseMap, oldSchemaName)
1102+
}
1103+
if oldSchemaName == "" && oldSchemaID != 0 {
1104+
oldSchemaName = getSchemaName(args.databaseMap, oldSchemaID)
1105+
}
1106+
if oldSchemaID == 0 {
1107+
oldSchemaID = info.NewSchemaID
1108+
}
1109+
if oldSchemaName == "" {
1110+
oldSchemaName = newSchemaName
1111+
}
1112+
if oldTableName == "" {
1113+
oldTableName = info.NewTableName.O
1114+
}
1115+
8441116
event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID)
8451117
event.ExtraSchemaNames = append(event.ExtraSchemaNames, oldSchemaName)
8461118
event.ExtraTableNames = append(event.ExtraTableNames, oldTableName)
8471119
event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID)
848-
SchemaName := getSchemaName(args.databaseMap, info.NewSchemaID)
849-
event.SchemaNames = append(event.SchemaNames, SchemaName)
850-
querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, SchemaName, info.NewTableName.O))
1120+
event.SchemaNames = append(event.SchemaNames, newSchemaName)
1121+
querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", oldSchemaName, oldTableName, newSchemaName, info.NewTableName.O))
8511122
}
8521123

8531124
event.Query = strings.Join(querys, "")
854-
event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos
1125+
event.MultipleTableInfos = multipleTableInfos
8551126
// we have to reverse MultipleTableInfos to get correct schema name
8561127
// see https://github.com/pingcap/tidb/issues/63710
8571128
//

0 commit comments

Comments
 (0)