Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
TargetSchema: rule.TargetSchema,
TargetTable: rule.TargetTable,
})
}
var columnSelectors []*config.ColumnSelector
Expand Down Expand Up @@ -700,6 +702,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
TargetSchema: rule.TargetSchema,
TargetTable: rule.TargetTable,
})
}
var columnSelectors []*ColumnSelector
Expand Down Expand Up @@ -1190,6 +1194,21 @@ type DispatchRule struct {
IndexName string `json:"index,omitempty"`
Columns []string `json:"columns,omitempty"`
TopicRule string `json:"topic,omitempty"`

// TargetSchema sets the routed downstream schema name.
// Leave it empty to keep the source schema name.
// For example, if the source table is `sales`.`orders`, `target-schema = "sales_bak"`
// writes to `sales_bak`.`orders`.
// You can also use placeholders. For example, `target-schema = "{schema}_bak"`
// the target schema becomes `sales_bak`.
TargetSchema string `json:"target-schema,omitempty"`
// TargetTable sets the routed downstream table name.
// Leave it empty to keep the source table name.
// For example, if the source table is `sales`.`orders`, `target-table = "orders_bak"`
// writes to `sales`.`orders_bak`.
// You can also use placeholders. For example, `target-table = "{schema}_{table}"`
// becomes `sales_orders`.
TargetTable string `json:"target-table,omitempty"`
}

// ColumnSelector represents a column selector for a table.
Expand Down
146 changes: 132 additions & 14 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"go.uber.org/zap"
)

Expand All @@ -41,14 +43,22 @@ type DDLEvent struct {
SchemaID int64 `json:"schema_id"`
SchemaName string `json:"schema_name"`
TableName string `json:"table_name"`
// TargetSchemaName and TargetTableName carry routed names for sink output paths.
// They are runtime-only fields and are not serialized.
TargetSchemaName string `json:"-"`
TargetTableName string `json:"-"`
// the following two fields are just used for RenameTable,
// they are the old schema/table name of the table
ExtraSchemaName string `json:"extra_schema_name"`
ExtraTableName string `json:"extra_table_name"`
Query string `json:"query"`
TableInfo *common.TableInfo `json:"-"`
StartTs uint64 `json:"start_ts"`
FinishedTs uint64 `json:"finished_ts"`
ExtraSchemaName string `json:"extra_schema_name"`
ExtraTableName string `json:"extra_table_name"`
// TargetExtraSchemaName and TargetExtraTableName carry routed old names for rename DDLs.
// They are runtime-only fields and are not serialized.
TargetExtraSchemaName string `json:"-"`
TargetExtraTableName string `json:"-"`
Query string `json:"query"`
TableInfo *common.TableInfo `json:"-"`
StartTs uint64 `json:"start_ts"`
FinishedTs uint64 `json:"finished_ts"`
// The seq of the event. It is set by event service.
Seq uint64 `json:"seq"`
// The epoch of the event. It is set by event service.
Expand Down Expand Up @@ -189,18 +199,62 @@ func (d *DDLEvent) GetSchemaName() string {
return d.SchemaName
}

func (d *DDLEvent) GetSourceSchemaName() string {
return d.SchemaName
}

func (d *DDLEvent) GetTableName() string {
return d.TableName
}

func (d *DDLEvent) GetSourceTableName() string {
return d.TableName
}

func (d *DDLEvent) GetExtraSchemaName() string {
return d.ExtraSchemaName
}

func (d *DDLEvent) GetSourceExtraSchemaName() string {
return d.ExtraSchemaName
}

func (d *DDLEvent) GetExtraTableName() string {
return d.ExtraTableName
}

func (d *DDLEvent) GetSourceExtraTableName() string {
return d.ExtraTableName
}

func (d *DDLEvent) GetTargetSchemaName() string {
if d.TargetSchemaName != "" {
return d.TargetSchemaName
}
return d.SchemaName
}

func (d *DDLEvent) GetTargetTableName() string {
if d.TargetTableName != "" {
return d.TargetTableName
}
return d.TableName
}

func (d *DDLEvent) GetTargetExtraSchemaName() string {
if d.TargetExtraSchemaName != "" {
return d.TargetExtraSchemaName
}
return d.ExtraSchemaName
}

func (d *DDLEvent) GetTargetExtraTableName() string {
if d.TargetExtraTableName != "" {
return d.TargetExtraTableName
}
return d.ExtraTableName
}

// GetTableID returns the logic table ID of the event.
// it returns 0 when there is no tableinfo
func (d *DDLEvent) GetTableID() int64 {
Expand Down Expand Up @@ -230,18 +284,23 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
}
for i, info := range d.MultipleTableInfos {
event := &DDLEvent{
Version: d.Version,
Type: byte(t),
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
TableInfo: info,
Query: queries[i],
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
Version: d.Version,
Type: byte(t),
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
TargetSchemaName: info.GetTargetSchemaName(),
TargetTableName: info.GetTargetTableName(),
TableInfo: info,
Query: queries[i],
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
}
if model.ActionType(d.Type) == model.ActionRenameTables {
event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName
event.ExtraTableName = d.TableNameChange.DropName[i].TableName
targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i])
event.TargetExtraSchemaName = targetExtraSchemaName
event.TargetExtraTableName = targetExtraTableName
}
events = append(events, event)
}
Expand All @@ -251,6 +310,19 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
return []*DDLEvent{d}
}

func extractRenameTargetExtraFromQuery(query string) (string, string) {
stmt, err := parser.New().ParseOneStmt(query, "", "")
if err != nil {
log.Panic("parse split rename query failed", zap.String("query", query), zap.Error(err))
}
renameStmt, ok := stmt.(*ast.RenameTableStmt)
if !ok || len(renameStmt.TableToTables) == 0 {
log.Panic("unexpected split rename query", zap.String("query", query), zap.Any("stmt", stmt))
}
oldTable := renameStmt.TableToTables[0].OldTable
return oldTable.Schema.O, oldTable.Name.O
}
Comment thread
3AceShowHand marked this conversation as resolved.

func (d *DDLEvent) GetSeq() uint64 {
return d.Seq
}
Expand Down Expand Up @@ -299,6 +371,13 @@ func (e *DDLEvent) GetDDLQuery() string {
return e.Query
}

func (e *DDLEvent) GetDDLSchemaName() string {
if e == nil {
return ""
}
return e.GetTargetSchemaName()
}

func (e *DDLEvent) GetDDLType() model.ActionType {
return model.ActionType(e.Type)
}
Expand Down Expand Up @@ -479,6 +558,45 @@ func (t *DDLEvent) IsPaused() bool {
return false
}

// CloneForRouting creates a shallow copy of the DDLEvent that can safely be mutated
// for table-route purposes without affecting the original event.
//
// The clone shares most read-only fields with the original. Slice fields that can be
// replaced independently downstream are copied so routing can update them without
// mutating shared state.
func (d *DDLEvent) CloneForRouting() *DDLEvent {
if d == nil {
return nil
}

// Create shallow copy
clone := *d

// PostTxnFlushed needs its own backing array to prevent potential races.
// Currently, DDL events arrive with nil PostTxnFlushed (callbacks are added
// downstream by basic_dispatcher.go), so append(nil, f) naturally creates a
// fresh slice. However, we make an explicit copy here for future-proofing:
// if any code path later adds callbacks before cloning, sharing the backing
// array could cause nondeterministic callback visibility or data races.
if d.PostTxnFlushed != nil {
clone.PostTxnFlushed = make([]func(), len(d.PostTxnFlushed))
copy(clone.PostTxnFlushed, d.PostTxnFlushed)
}

// MultipleTableInfos needs a new slice so each dispatcher can independently
// apply routing to its elements without affecting others
if d.MultipleTableInfos != nil {
clone.MultipleTableInfos = make([]*common.TableInfo, len(d.MultipleTableInfos))
copy(clone.MultipleTableInfos, d.MultipleTableInfos)
}

if d.BlockedTableNames != nil {
clone.BlockedTableNames = append([]SchemaTableName(nil), d.BlockedTableNames...)
}

return &clone
}

func (t *DDLEvent) Len() int32 {
return 1
}
Expand Down
Loading
Loading