Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
SchemaRule: rule.SchemaRule,
TableRule: rule.TableRule,
})
}
var columnSelectors []*config.ColumnSelector
Expand Down Expand Up @@ -530,6 +532,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
CheckpointInterval: c.SyncedStatus.CheckpointInterval,
}
}

return res
}

Expand Down Expand Up @@ -580,6 +583,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
SchemaRule: rule.SchemaRule,
TableRule: rule.TableRule,
})
}
var columnSelectors []*ColumnSelector
Expand Down Expand Up @@ -854,6 +859,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
CheckpointInterval: cloned.SyncedStatus.CheckpointInterval,
}
}

return res
}

Expand Down Expand Up @@ -994,14 +1000,18 @@ type LargeMessageHandleConfig struct {
ClaimCheckRawValue bool `json:"claim_check_raw_value"`
}

// DispatchRule represents partition rule for a table
// DispatchRule represents dispatch and routing rules for a table.
// For MQ sinks (Kafka, Pulsar): controls topic and partition assignment.
// For MySQL/TiDB sinks: controls schema and table name routing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will schema routing functionality also works for the MQ Sinks ?

// This is a duplicate of config.DispatchRule
type DispatchRule struct {
Matcher []string `json:"matcher,omitempty"`
PartitionRule string `json:"partition,omitempty"`
IndexName string `json:"index,omitempty"`
Columns []string `json:"columns,omitempty"`
TopicRule string `json:"topic,omitempty"`
SchemaRule string `json:"schema,omitempty"`
TableRule string `json:"table,omitempty"`
}
Comment on lines 1010 to 1015
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for DispatchRule is outdated and doesn't reflect the extended functionality. It says "represents partition rule for a table" but it now also handles schema/table routing for MySQL sinks. This should be updated to match the comprehensive comment in pkg/config/sink.go which correctly describes both MQ (topic/partition) and MySQL (schema/table routing) use cases.

Copilot uses AI. Check for mistakes.

// ColumnSelector represents a column selector for a table.
Expand Down
167 changes: 164 additions & 3 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dispatcher"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
Expand Down Expand Up @@ -287,7 +288,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
jobs, err := getAllHistoryDDLJob(store, f)
require.Nil(t, err)

scheamStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID, util.RoleTester, f)
scheamStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID, util.RoleTester, f, nil)
require.Nil(t, err)
for _, job := range jobs {
err := scheamStorage.HandleDDLJob(job)
Expand Down Expand Up @@ -1254,7 +1255,7 @@ func TestDecodeRow(t *testing.T) {
require.NoError(t, err)

schemaStorage, err := NewSchemaStorage(helper.Storage(),
ver.Ver, false, changefeed, util.RoleTester, filter)
ver.Ver, false, changefeed, util.RoleTester, filter, nil)
require.NoError(t, err)

// apply ddl to schemaStorage
Expand Down Expand Up @@ -1335,7 +1336,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
require.Nil(t, err)

schemaStorage, err := NewSchemaStorage(helper.Storage(),
ver.Ver, false, cfID, util.RoleTester, f)
ver.Ver, false, cfID, util.RoleTester, f, nil)
require.Nil(t, err)
// apply ddl to schemaStorage
for _, ddl := range ddls {
Expand Down Expand Up @@ -1693,3 +1694,163 @@ func TestFormatColVal(t *testing.T) {
require.Equal(t, vector, value)
require.Zero(t, warn)
}

func TestMounterWithSinkRouting(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test;")

changefeed := model.DefaultChangeFeedID("changefeed-test-schema-routing")

ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

cfg := config.GetDefaultReplicaConfig()

filter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)

// Create sink router: test -> test_target
sinkRouter, err := dispatcher.NewSinkRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"test.*"},
SchemaRule: "test_target",
TableRule: dispatcher.TablePlaceholder,
},
},
},
})
require.NoError(t, err)

schemaStorage, err := NewSchemaStorage(helper.Storage(),
ver.Ver, false, changefeed, util.RoleTester, filter, sinkRouter)
require.NoError(t, err)

// Create table in test schema
ddl := "create table test.student(id int primary key, name varchar(50), age int)"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)

// Create mounter (routing is applied via schemaStorage)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter)

// Insert a row
helper.Tk().MustExec(`insert into test.student values(1, "alice", 20)`)

ctx := context.Background()

// Mount the insert event and verify TargetSchema is set
tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)

walkTableSpanInStore(t, helper.Storage(), tableInfo.ID, func(key []byte, value []byte) {
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)

// Verify that the source schema is "test"
require.Equal(t, "test", row.TableInfo.TableName.Schema)
// Verify that the source table is "student"
require.Equal(t, "student", row.TableInfo.TableName.Table)

// Verify that the target schema is routed to "test_target"
require.Equal(t, "test_target", row.TableInfo.TableName.TargetSchema)
// Verify that the target table is unchanged (TableRule is "{table}")
require.Equal(t, "student", row.TableInfo.TableName.TargetTable)
})
}

func TestMounterWithSinkRoutingTableRouting(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test;")

changefeed := model.DefaultChangeFeedID("changefeed-test-table-routing")

ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

cfg := config.GetDefaultReplicaConfig()

filter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)

// Create sink router with table routing: test.student -> test_target.student_routed
sinkRouter, err := dispatcher.NewSinkRouter(&config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"test.student"},
SchemaRule: "test_target",
TableRule: "student_routed",
},
},
},
})
require.NoError(t, err)

schemaStorage, err := NewSchemaStorage(helper.Storage(),
ver.Ver, false, changefeed, util.RoleTester, filter, sinkRouter)
require.NoError(t, err)

// Create table in test schema
ddl := "create table test.student(id int primary key, name varchar(50), age int)"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)

// Create mounter (routing is applied via schemaStorage)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter)

// Insert a row
helper.Tk().MustExec(`insert into test.student values(1, "alice", 20)`)

ctx := context.Background()

// Mount the insert event and verify both TargetSchema and TargetTable are set
tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)

walkTableSpanInStore(t, helper.Storage(), tableInfo.ID, func(key []byte, value []byte) {
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)

// Verify that the source schema is "test"
require.Equal(t, "test", row.TableInfo.TableName.Schema)
// Verify that the source table is "student"
require.Equal(t, "student", row.TableInfo.TableName.Table)

// Verify that the target schema is routed to "test_target"
require.Equal(t, "test_target", row.TableInfo.TableName.TargetSchema)
// Verify that the target table is routed to "student_routed"
require.Equal(t, "student_routed", row.TableInfo.TableName.TargetTable)
})
}
56 changes: 53 additions & 3 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dispatcher"
"github.com/pingcap/tiflow/pkg/ddl"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -82,13 +83,16 @@ type schemaStorage struct {

id model.ChangeFeedID
role util.Role

sinkRouter *dispatcher.SinkRouter
}

// NewSchemaStorage creates a new schema storage
func NewSchemaStorage(
storage tidbkv.Storage, startTs uint64,
forceReplicate bool, id model.ChangeFeedID,
role util.Role, filter filter.Filter,
sinkRouter *dispatcher.SinkRouter,
) (SchemaStorage, error) {
var (
snap *schema.Snapshot
Expand All @@ -107,13 +111,22 @@ func NewSchemaStorage(
return nil, errors.Trace(err)
}
}

// Apply sink routing to all tables in the initial snapshot
// This ensures DML events for pre-existing tables (tables that existed before start-ts)
// also get the routed schema/table for SQL generation
if sinkRouter != nil {
applySinkRoutingToSnapshot(snap, sinkRouter)
}

return &schemaStorage{
snaps: []*schema.Snapshot{snap},
resolvedTs: startTs,
forceReplicate: forceReplicate,
filter: filter,
id: id,
role: role,
sinkRouter: sinkRouter,
}, nil
}

Expand Down Expand Up @@ -227,6 +240,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
zap.Error(err))
return errors.Trace(err)
}

// Apply sink routing to the TableInfo if sinkRouter is configured
// This ensures DML events get the routed schema/table for SQL generation
if s.sinkRouter != nil && job.TableID > 0 {
applySinkRoutingToTable(snap, job.TableID, s.sinkRouter)
}

s.snaps = append(s.snaps, snap)
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
log.Info("schemaStorage: update snapshot by the DDL job",
Expand Down Expand Up @@ -410,7 +430,9 @@ func (s *schemaStorage) BuildDDLEvents(
newTableInfo := model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, tableInfo)
job.Query = querys[index]
event := new(model.DDLEvent)
event.FromJob(job, nil, newTableInfo)
if err := event.FromJob(job, nil, newTableInfo, s.sinkRouter); err != nil {
return nil, errors.Trace(err)
}
ddlEvents = append(ddlEvents, event)
}
} else {
Expand Down Expand Up @@ -459,7 +481,9 @@ func (s *schemaStorage) BuildDDLEvents(
}
}
event := new(model.DDLEvent)
event.FromJob(job, preTableInfo, tableInfo)
if err := event.FromJob(job, preTableInfo, tableInfo, s.sinkRouter); err != nil {
return nil, errors.Trace(err)
}
ddlEvents = append(ddlEvents, event)
}
return ddlEvents, nil
Expand Down Expand Up @@ -518,7 +542,9 @@ func (s *schemaStorage) buildRenameEvents(

tableInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName,
job.BinlogInfo.FinishedTS, tableInfo)
event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName)
if err := event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName, s.sinkRouter); err != nil {
return nil, errors.Trace(err)
}
event.Seq = uint64(i)
ddlEvents = append(ddlEvents, event)
}
Expand Down Expand Up @@ -581,3 +607,27 @@ func (s *MockSchemaStorage) ResolvedTs() uint64 {
func (s *MockSchemaStorage) DoGC(ts uint64) uint64 {
return atomic.LoadUint64(&s.Resolved)
}

// applySinkRoutingToTable applies sink routing to a single table in a snapshot.
// This ensures the TableInfo has TargetSchema and TargetTable set for SQL generation in sinks.
func applySinkRoutingToTable(snap *schema.Snapshot, tableID int64, sinkRouter *dispatcher.SinkRouter) {
if tableInfo, ok := snap.PhysicalTableByID(tableID); ok {
routedSchema, routedTable := sinkRouter.Route(tableInfo.TableName.Schema, tableInfo.TableName.Table)
if routedSchema != tableInfo.TableName.Schema || routedTable != tableInfo.TableName.Table {
tableInfo.TableName.TargetSchema = routedSchema
tableInfo.TableName.TargetTable = routedTable
}
}
}

// applySinkRoutingToSnapshot applies sink routing to all tables in a snapshot.
// This ensures pre-existing tables (loaded from initial snapshot) also get routed schemas and tables.
func applySinkRoutingToSnapshot(snap *schema.Snapshot, sinkRouter *dispatcher.SinkRouter) {
snap.IterTables(true, func(tblInfo *model.TableInfo) {
routedSchema, routedTable := sinkRouter.Route(tblInfo.TableName.Schema, tblInfo.TableName.Table)
if routedSchema != tblInfo.TableName.Schema || routedTable != tblInfo.TableName.Table {
tblInfo.TableName.TargetSchema = routedSchema
tblInfo.TableName.TargetTable = routedTable
}
})
}
Loading