Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 162 additions & 3 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/sqlexec"
cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
Expand Down Expand Up @@ -83,8 +84,11 @@ type downstreamTracker struct {

// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
WhereHandle *sqlmodel.WhereHandle
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
WhereHandle *sqlmodel.WhereHandle
ForeignKeyRelations []sqlmodel.ForeignKeyCausalityRelation
foreignKeyInitOnce sync.Once
foreignKeyInitErr error
}

type executorContext struct {
Expand Down Expand Up @@ -396,7 +400,16 @@ func (tr *Tracker) BatchCreateTableIfNotExist(tablesToCreate map[string]map[stri
// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTI *model.TableInfo) (*DownstreamTableInfo, error) {
return tr.downstreamTracker.getOrInit(tctx, tableID, originTI)
dti, err := tr.downstreamTracker.getOrInit(tctx, tableID, originTI)
if err != nil {
return nil, err
}

if err := dti.initForeignKeyRelations(tr, tctx, tableID, originTI); err != nil {
return nil, err
}

return dti, nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
Expand Down Expand Up @@ -439,6 +452,19 @@ func (dt *downstreamTracker) getOrInit(tctx *tcontext.Context, tableID string, o
return dti, nil
}

func (dti *DownstreamTableInfo) initForeignKeyRelations(tr *Tracker, tctx *tcontext.Context, tableID string, originTI *model.TableInfo) error {
dti.foreignKeyInitOnce.Do(func() {
relations, err := tr.buildForeignKeyRelations(tctx, tableID, dti.TableInfo, originTI, make(map[string][]sqlmodel.ForeignKeyCausalityRelation), make(map[string]struct{}))
if err != nil {
dti.foreignKeyInitErr = err
return
}
dti.ForeignKeyRelations = relations
})

return dti.foreignKeyInitErr
}

func (dt *downstreamTracker) remove(tctx *tcontext.Context, targetTable *filter.Table) {
dt.Lock()
defer dt.Unlock()
Expand Down Expand Up @@ -497,6 +523,139 @@ func (dt *downstreamTracker) getTableInfoByCreateStmt(tctx *tcontext.Context, ta
return ti, nil
}

func (tr *Tracker) buildForeignKeyRelations(
tctx *tcontext.Context,
tableID string,
downstreamTI *model.TableInfo,
originTI *model.TableInfo,
cache map[string][]sqlmodel.ForeignKeyCausalityRelation,
visiting map[string]struct{},
) ([]sqlmodel.ForeignKeyCausalityRelation, error) {
if relations, ok := cache[tableID]; ok {
return relations, nil
}

if _, ok := visiting[tableID]; ok {
return nil, nil
}
Comment on lines +538 to +540

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When a foreign key cycle is detected, the function currently returns nil, nil silently. While this correctly breaks the recursion, it would be beneficial for debugging and observability to log a warning when this happens. This would make it easier to diagnose unexpected causality behavior in schemas with cyclic dependencies.

Suggested change
if _, ok := visiting[tableID]; ok {
return nil, nil
}
if _, ok := visiting[tableID]; ok {
tctx.Logger.Warn("foreign key cycle detected, will be ignored for causality", zap.String("table", tableID))
return nil, nil
}

visiting[tableID] = struct{}{}
defer delete(visiting, tableID)

childNameToIdx := buildColumnIndexMap(originTI)

relations := make([]sqlmodel.ForeignKeyCausalityRelation, 0, len(downstreamTI.ForeignKeys))
for _, fk := range downstreamTI.ForeignKeys {
if len(fk.Cols) == 0 || len(fk.Cols) != len(fk.RefCols) {
continue
}

childIdxs := make([]int, 0, len(fk.Cols))
for _, col := range fk.Cols {
idx, ok := childNameToIdx[col.L]
if !ok {
return nil, dmterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Generatef("column %s not found in table %s", col.L, tableID)
}
childIdxs = append(childIdxs, idx)
}

parentTable := &filter.Table{Schema: fk.RefSchema.O, Name: fk.RefTable.O}
parentTableID := utils.GenTableID(parentTable)
parentTableName := (&cdcmodel.TableName{Schema: fk.RefSchema.O, Table: fk.RefTable.O}).String()

parentOriginTI, err := tr.GetTableInfo(parentTable)
if err != nil {
return nil, err
}
parentDTI, err := tr.downstreamTracker.getOrInit(tctx, parentTableID, parentOriginTI)
if err != nil {
return nil, err
}

parentNameToIdx := buildColumnIndexMap(parentOriginTI)
parentRelations, err := tr.buildForeignKeyRelations(tctx, parentTableID, parentDTI.TableInfo, parentOriginTI, cache, visiting)
if err != nil {
return nil, err
}

if len(parentRelations) == 0 {
parentColumns, err := getColumnsByNames(parentDTI.TableInfo, fk.RefCols)
if err != nil {
return nil, err
}
relations = append(relations, sqlmodel.ForeignKeyCausalityRelation{
ParentTable: parentTableName,
ParentColumns: parentColumns,
ChildColumnIdx: childIdxs,
})
continue
}

parentIndexToChild := make(map[int]int, len(fk.RefCols))
for i, refCol := range fk.RefCols {
if idx, ok := parentNameToIdx[refCol.L]; ok {
parentIndexToChild[idx] = childIdxs[i]
}
}

for _, parentRelation := range parentRelations {
mappedChildIdxs := make([]int, len(parentRelation.ChildColumnIdx))
skip := false
for i, parentIdx := range parentRelation.ChildColumnIdx {
childIdx, ok := parentIndexToChild[parentIdx]
if !ok {
skip = true
break
}
mappedChildIdxs[i] = childIdx
}
if skip {
continue
}

relations = append(relations, sqlmodel.ForeignKeyCausalityRelation{
ParentTable: parentRelation.ParentTable,
ParentColumns: parentRelation.ParentColumns,
ChildColumnIdx: mappedChildIdxs,
})
}
}

cache[tableID] = relations
return relations, nil
}

func buildColumnIndexMap(ti *model.TableInfo) map[string]int {
nameToIdx := make(map[string]int)
valueIdx := 0
for _, col := range ti.Columns {
if col.Hidden {
continue
}
nameToIdx[col.Name.L] = valueIdx
valueIdx++
}
return nameToIdx
}

func getColumnsByNames(ti *model.TableInfo, names []ast.CIStr) ([]*model.ColumnInfo, error) {
columns := make([]*model.ColumnInfo, 0, len(names))
for _, name := range names {
found := false
for _, col := range ti.Columns {
if col.Name.L == name.L {
columns = append(columns, col)
found = true
break
}
}
if !found {
return nil, dmterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Generatef("column %s not found in table %s", name.O, ti.Name.O)
}
}

return columns, nil
}

// initDownStreamTrackerParser init downstream tracker parser by default sql_mode.
func (dt *downstreamTracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error {
setSQLMode := fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)
Expand Down
54 changes: 54 additions & 0 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/filter"
timock "github.com/pingcap/tidb/pkg/util/mock"
cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand Down Expand Up @@ -620,6 +621,59 @@ func TestGetDownStreamIndexInfo(t *testing.T) {
require.NotNil(t, dti.WhereHandle.UniqueNotNullIdx)
}

func TestForeignKeyRelationBuildsRootParents(t *testing.T) {
ctx := context.Background()
p := parser.New()

dbConn, mock := mockBaseConn(t)
tracker, err := NewTestTracker(ctx, "test-tracker", dbConn, dlog.L())
require.NoError(t, err)
defer tracker.Close()

createAndExec := func(sql string, db string) {
stmt, parseErr := p.ParseOneStmt(sql, "", "")
require.NoError(t, parseErr)
require.NoError(t, tracker.Exec(ctx, db, stmt))
}

createAndExec("create database db", "")
createAndExec("create table a(id int primary key)", "db")
createAndExec("create table b(a_id int primary key, constraint fk_b_a foreign key (a_id) references a(id))", "db")
createAndExec("create table c(b_a_id int primary key, constraint fk_c_b foreign key (b_a_id) references b(a_id))", "db")

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

tableIDA := "`db`.`a`"
tableIDB := "`db`.`b`"
tableIDC := "`db`.`c`"

mock.ExpectQuery("SHOW CREATE TABLE " + tableIDC).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("c", "create table c(b_a_id int primary key, constraint fk_c_b foreign key (b_a_id) references b(a_id))"))
mock.ExpectQuery("SHOW CREATE TABLE " + tableIDB).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("b", "create table b(a_id int primary key, constraint fk_b_a foreign key (a_id) references a(id))"))
mock.ExpectQuery("SHOW CREATE TABLE " + tableIDA).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("a", "create table a(id int primary key)"))

originTI, err := tracker.GetTableInfo(&filter.Table{Schema: "db", Name: "c"})
require.NoError(t, err)
dti, err := tracker.GetDownStreamTableInfo(tcontext.Background(), tableIDC, originTI)
require.NoError(t, err)
require.Len(t, dti.ForeignKeyRelations, 1)

relation := dti.ForeignKeyRelations[0]
require.Equal(t, (&cdcmodel.TableName{Schema: "db", Table: "a"}).String(), relation.ParentTable)
require.Equal(t, []int{0}, relation.ChildColumnIdx)
require.Len(t, relation.ParentColumns, 1)
require.Equal(t, "id", relation.ParentColumns[0].Name.L)

require.NoError(t, mock.ExpectationsWereMet())
}

func TestGetDownStreamIndexInfoExceedsMaxIndexLength(t *testing.T) {
// origin table info
p := parser.New()
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *re
nil,
)
rowChange.SetWhereHandle(downstreamTableInfo.WhereHandle)
rowChange.SetForeignKeyRelations(downstreamTableInfo.ForeignKeyRelations)
size := estimatedRowSize
if changeType == rowUpdated && rowChange.IsIdentityUpdated() {
delRow, insRow := rowChange.SplitUpdate()
Expand Down
3 changes: 3 additions & 0 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ RowLoop:
s.sessCtx,
)
rowChange.SetWhereHandle(downstreamTableInfo.WhereHandle)
rowChange.SetForeignKeyRelations(downstreamTableInfo.ForeignKeyRelations)
dmls = append(dmls, rowChange)
}

Expand Down Expand Up @@ -253,6 +254,7 @@ RowLoop:
s.sessCtx,
)
rowChange.SetWhereHandle(downstreamTableInfo.WhereHandle)
rowChange.SetForeignKeyRelations(downstreamTableInfo.ForeignKeyRelations)
dmls = append(dmls, rowChange)
}

Expand Down Expand Up @@ -307,6 +309,7 @@ RowLoop:
s.sessCtx,
)
rowChange.SetWhereHandle(downstreamTableInfo.WhereHandle)
rowChange.SetForeignKeyRelations(downstreamTableInfo.ForeignKeyRelations)
dmls = append(dmls, rowChange)
}

Expand Down
30 changes: 30 additions & 0 deletions dm/tests/foreign_key_multi_worker/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["fk_chain.?*"]


[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/foreign_key_multi_worker/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
43 changes: 43 additions & 0 deletions dm/tests/foreign_key_multi_worker/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
shard-mode: ""

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""
session:
foreign_key_checks: 1

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["fk_chain"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 4
batch: 100
safe-mode: false
2 changes: 2 additions & 0 deletions dm/tests/foreign_key_multi_worker/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
15 changes: 15 additions & 0 deletions dm/tests/foreign_key_multi_worker/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-relay: true
relay-binlog-name: ''
relay-binlog-gtid: ''
from:
host: 127.0.0.1
user: root
password: '123456'
port: 3306
checker:
check-enable: true
backoff-rollback: 5m
backoff-max: 5m
Loading