Skip to content
128 changes: 108 additions & 20 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 {
// schemaName should be "Name.O"
func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) (int64, bool) {
for id, info := range databaseMap {
if info.Name == schemaName {
if strings.EqualFold(info.Name, schemaName) {
return id, true
}
}
Expand All @@ -530,7 +530,7 @@ func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName str
// tableName should be "Name.O"
func findTableIDByName(tableMap map[int64]*BasicTableInfo, schemaID int64, tableName string) (int64, bool) {
for id, info := range tableMap {
if info.SchemaID == schemaID && info.Name == tableName {
if info.SchemaID == schemaID && strings.EqualFold(info.Name, tableName) {
return id, true
}
}
Expand Down Expand Up @@ -600,7 +600,7 @@ func buildPersistedDDLEventForDropView(args buildPersistedDDLEventFuncArgs) Pers
// We don't store the relationship: view_id -> table_name, get table name from args.job
event.TableName = args.job.TableName
// The query in job maybe "DROP VIEW test1.view1, test2.view2", we need rebuild it here.
event.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", event.SchemaName, event.TableName)
event.Query = fmt.Sprintf("DROP VIEW %s", common.QuoteSchema(event.SchemaName, event.TableName))
return event
}

Expand Down Expand Up @@ -662,7 +662,7 @@ func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) Per
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
event.TableName = getTableName(args.tableMap, event.TableID)
// The query in job maybe "DROP TABLE test1.table1, test2.table2", we need rebuild it here.
event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.SchemaName, event.TableName)
event.Query = fmt.Sprintf("DROP TABLE %s", common.QuoteSchema(event.SchemaName, event.TableName))
return event
}

Expand Down Expand Up @@ -723,6 +723,11 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P
//
// InvolvingSchemaInfo returns the schema info involved in the job.
// The value should be stored in lower case.
//
// InvolvingSchemaInfo may store normalized lower-case names,
// while the original query can keep user-provided identifier case.
// Prefer names parsed from the original query whenever possible.
// See https://github.com/pingcap/ticdc/pull/2218 for background.
oldSchemaName := args.job.InvolvingSchemaInfo[0].Database
oldTableName := args.job.InvolvingSchemaInfo[0].Table
stmt, err := parser.New().ParseOneStmt(args.job.Query, "", "")
Expand All @@ -744,9 +749,9 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P
log.Error("unknown stmt type", zap.String("query", args.job.Query), zap.Any("stmt", stmt))
}
}
event.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
oldSchemaName, oldTableName,
event.SchemaName, event.TableName)
event.Query = fmt.Sprintf("RENAME TABLE %s TO %s",
common.QuoteSchema(oldSchemaName, oldTableName),
common.QuoteSchema(event.SchemaName, event.TableName))
}
return event
}
Expand Down Expand Up @@ -787,8 +792,10 @@ func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncA
// Note that partition name should be parsed from original query, not the upperQuery.
partName := strings.TrimSpace(event.Query[idx1:idx2])
partName = strings.Replace(partName, "`", "", -1)
event.Query = fmt.Sprintf("ALTER TABLE `%s`.`%s` EXCHANGE PARTITION `%s` WITH TABLE `%s`.`%s`",
event.ExtraSchemaName, event.ExtraTableName, partName, event.SchemaName, event.TableName)
event.Query = fmt.Sprintf("ALTER TABLE %s EXCHANGE PARTITION %s WITH TABLE %s",
common.QuoteSchema(event.ExtraSchemaName, event.ExtraTableName),
common.QuoteName(partName),
common.QuoteSchema(event.SchemaName, event.TableName))

if strings.HasSuffix(upperQuery, "WITHOUT VALIDATION") {
event.Query += " WITHOUT VALIDATION"
Expand All @@ -800,6 +807,41 @@ func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncA
return event
}

type renameTableQueryInfo struct {
oldSchemaName string
oldTableName string
newSchemaName string
newTableName string
}

func parseRenameTablesQueryInfos(query string) ([]renameTableQueryInfo, bool) {
if query == "" {
return nil, false
}
stmt, err := parser.New().ParseOneStmt(query, "", "")
if err != nil {
log.Warn("parse rename tables query failed",
zap.String("query", query),
zap.Error(err))
return nil, false
}
renameStmt, ok := stmt.(*ast.RenameTableStmt)
if !ok {
return nil, false
}

queryInfos := make([]renameTableQueryInfo, 0, len(renameStmt.TableToTables))
for _, tableToTable := range renameStmt.TableToTables {
queryInfos = append(queryInfos, renameTableQueryInfo{
oldSchemaName: tableToTable.OldTable.Schema.O,
oldTableName: tableToTable.OldTable.Name.O,
newSchemaName: tableToTable.NewTable.Schema.O,
newTableName: tableToTable.NewTable.Name.O,
})
}
return queryInfos, true
}

func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
// TODO: does rename tables has the same problem(finished ts is not the real commit ts) with rename table?
event := buildPersistedDDLEventCommon(args)
Expand All @@ -809,25 +851,71 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs)
zap.String("query", args.job.Query),
zap.Error(err))
}
if len(renameArgs.RenameTableInfos) != len(args.job.BinlogInfo.MultipleTableInfos) {
renameTableInfos := renameArgs.RenameTableInfos
multipleTableInfos := args.job.BinlogInfo.MultipleTableInfos
if len(renameTableInfos) != len(multipleTableInfos) {
log.Panic("should not happen",
zap.Int("renameArgsLen", len(renameArgs.RenameTableInfos)),
zap.Int("multipleTableInfosLen", len(args.job.BinlogInfo.MultipleTableInfos)))
zap.Int("renameArgsLen", len(renameTableInfos)),
zap.Int("multipleTableInfosLen", len(multipleTableInfos)),
zap.String("query", args.job.Query))
}

// RenameTableInfos may store normalized lower-case names,
// while the original query can keep user-provided identifier case.
// Prefer names parsed from the original query whenever possible.
// See https://github.com/pingcap/ticdc/pull/2218 for background.
queryInfos, queryParsed := parseRenameTablesQueryInfos(args.job.Query)
if queryParsed && len(queryInfos) != len(renameTableInfos) {
log.Panic("rename tables query info length is inconsistent with args",
zap.Int("queryInfosLen", len(queryInfos)),
zap.Int("renameArgsLen", len(renameTableInfos)),
zap.String("query", args.job.Query))
}

// TiDB <= v8.1 may emit empty old table names for RENAME TABLES args.
// See https://github.com/pingcap/tidb/pull/64421 for the upstream fix.
if !queryParsed && renameTableInfos[0].OldTableName.O == "" {
// TODO: return error instead of falling back to args once builder supports error propagation.
log.Warn("rename tables args miss old table name and query is unavailable, keep args as-is",
zap.Int("tableCount", len(renameTableInfos)),
zap.String("query", args.job.Query))
}

var querys []string
for _, info := range renameArgs.RenameTableInfos {
event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, info.OldSchemaID)
event.ExtraSchemaNames = append(event.ExtraSchemaNames, info.OldSchemaName.O)
event.ExtraTableNames = append(event.ExtraTableNames, info.OldTableName.O)
for i, info := range renameTableInfos {
oldSchemaID := info.OldSchemaID
oldSchemaName := info.OldSchemaName.O
oldTableName := info.OldTableName.O
newSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID)
newTableName := info.NewTableName.O
if queryParsed {
queryInfo := queryInfos[i]
if queryInfo.oldSchemaName != "" {
oldSchemaName = queryInfo.oldSchemaName
}
if queryInfo.oldTableName != "" {
oldTableName = queryInfo.oldTableName
}
if queryInfo.newSchemaName != "" {
newSchemaName = queryInfo.newSchemaName
}
if queryInfo.newTableName != "" {
newTableName = queryInfo.newTableName
}
}

event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, oldSchemaID)
event.ExtraSchemaNames = append(event.ExtraSchemaNames, oldSchemaName)
event.ExtraTableNames = append(event.ExtraTableNames, oldTableName)
event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID)
SchemaName := getSchemaName(args.databaseMap, info.NewSchemaID)
event.SchemaNames = append(event.SchemaNames, SchemaName)
querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", info.OldSchemaName.O, info.OldTableName.O, SchemaName, info.NewTableName.O))
event.SchemaNames = append(event.SchemaNames, newSchemaName)
querys = append(querys, fmt.Sprintf("RENAME TABLE %s TO %s;",
common.QuoteSchema(oldSchemaName, oldTableName),
common.QuoteSchema(newSchemaName, newTableName)))
}

event.Query = strings.Join(querys, "")
event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos
event.MultipleTableInfos = multipleTableInfos
// we have to reverse MultipleTableInfos to get correct schema name
// see https://github.com/pingcap/tidb/issues/63710
//
Expand Down
Loading