Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,146 @@ func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
return !job.IsDone()
}

<<<<<<< HEAD
=======
// BuildDDLEvents by parsing the DDL job
func (s *schemaStorage) BuildDDLEvents(
ctx context.Context, job *timodel.Job,
) (ddlEvents []*model.DDLEvent, err error) {
switch job.Type {
case timodel.ActionRenameTables:
// The result contains more than one DDLEvent for a rename tables job.
ddlEvents, err = s.buildRenameEvents(ctx, job)
if err != nil {
return nil, errors.Trace(err)
}
case timodel.ActionCreateTables:
if job.BinlogInfo != nil && job.BinlogInfo.MultipleTableInfos != nil {
querys, err := ddl.SplitQueries(job.Query)
if err != nil {
return nil, errors.Trace(err)
}
multiTableInfos := job.BinlogInfo.MultipleTableInfos
for index, tableInfo := range multiTableInfos {
newTableInfo := model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, tableInfo)
job.Query = querys[index]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Modifying job.Query directly within this loop can lead to unexpected side effects if the original job object is expected to remain immutable or if job.Query is used elsewhere after this function call. It's generally safer to create a local copy of the job object for each iteration and modify the copy's Query field before passing it to event.FromJob.

For example:

// Create a copy of the job for each iteration
clonedJob := *job
clonedJob.Query = querys[index]
event.FromJob(&clonedJob, nil, newTableInfo)
clonedJob := *job
clonedJob.Query = querys[index]
event := new(model.DDLEvent)
event.FromJob(&clonedJob, nil, newTableInfo)

event := new(model.DDLEvent)
event.FromJob(job, nil, newTableInfo)
ddlEvents = append(ddlEvents, event)
}
} else {
return nil, errors.Errorf("there is no multiple table infos in the create tables job: %s", job)
}
default:
// parse preTableInfo
preSnap, err := s.GetSnapshot(ctx, job.BinlogInfo.FinishedTS-1)
if err != nil {
return nil, errors.Trace(err)
}
preTableInfo, err := preSnap.PreTableInfo(job)
if err != nil {
return nil, errors.Trace(err)
}

// parse tableInfo
var tableInfo *model.TableInfo
err = preSnap.FillSchemaName(job)
if err != nil {
log.Error("build DDL event fail", zap.Any("job", job), zap.Error(err))
return nil, errors.Trace(err)
}
// TODO: find a better way to refactor this. For example, drop table job should not
// have table info.
if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil {
tableInfo = model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, job.BinlogInfo.TableInfo)

// TODO: remove this after job is fixed by TiDB.
// ref: https://github.com/pingcap/tidb/issues/43819
if job.Type == timodel.ActionExchangeTablePartition {
oldTableInfo, ok := preSnap.PhysicalTableByID(job.BinlogInfo.TableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID)
}
tableInfo.SchemaID = oldTableInfo.SchemaID
tableInfo.TableName = oldTableInfo.TableName
}
} else {
// Just retrieve the schema name for a DDL job that does not contain TableInfo.
// Currently supported by cdc are: ActionCreateSchema, ActionDropSchema,
// and ActionModifySchemaCharsetAndCollate.
tableInfo = &model.TableInfo{
TableName: model.TableName{Schema: job.SchemaName},
Version: job.BinlogInfo.FinishedTS,
}
}
event := new(model.DDLEvent)
event.FromJob(job, preTableInfo, tableInfo)
ddlEvents = append(ddlEvents, event)
}
return ddlEvents, nil
}

// GetNewJobWithArgs returns a new job with the given args
func GetNewJobWithArgs(job *timodel.Job, args timodel.JobArgs) (*timodel.Job, error) {
job.FillArgs(args)
bytes, err := job.Encode(true)
if err != nil {
return nil, errors.Trace(err)
}
encodedJob := &timodel.Job{}
if err = encodedJob.Decode(bytes); err != nil {
return nil, errors.Trace(err)
}
return encodedJob, nil
}

// TODO: find a better way to refactor this function.
// buildRenameEvents gets a list of DDLEvent from a rename tables DDL job.
func (s *schemaStorage) buildRenameEvents(
ctx context.Context, job *timodel.Job,
) ([]*model.DDLEvent, error) {
var ddlEvents []*model.DDLEvent
args, err := timodel.GetRenameTablesArgs(job)
if err != nil {
return nil, errors.Trace(err)
}

multiTableInfos := job.BinlogInfo.MultipleTableInfos
if len(multiTableInfos) != len(args.RenameTableInfos) {
return nil, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}

preSnap, err := s.GetSnapshot(ctx, job.BinlogInfo.FinishedTS-1)
if err != nil {
return nil, errors.Trace(err)
}

for i, tableInfo := range multiTableInfos {
info := args.RenameTableInfos[i]
newSchema, ok := preSnap.SchemaByID(info.NewSchemaID)
if !ok {
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
info.NewSchemaID)
}
newSchemaName := newSchema.Name.O
oldSchemaName := info.OldSchemaName.O
event := new(model.DDLEvent)
preTableInfo, ok := preSnap.PhysicalTableByID(tableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(
job.TableID)
}

tableInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName,
job.BinlogInfo.FinishedTS, tableInfo)
event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName)
event.Seq = uint64(i)
ddlEvents = append(ddlEvents, event)
}
return ddlEvents, nil
}

>>>>>>> 3c7fd0a1fd (cdc(ddl): ensure strict ordering for multi-table DDLs after split (#12450))
// MockSchemaStorage is for tests.
type MockSchemaStorage struct {
Resolved uint64
Expand Down
12 changes: 12 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,18 @@ type DDLEvent struct {
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
<<<<<<< HEAD
=======
IsBootstrap bool `msg:"-"`
// BDRRole is the role of the TiDB cluster, it is used to determine whether
// the DDL is executed by the primary cluster.
BDRRole string `msg:"-"`
SQLMode mysql.SQLMode `msg:"-"`
// Seq is used to order the DDLs with the same commit ts
// Only used in the splited DDLEvent generated by a multi-table DDL,
// we need to keep the order of the original multi-table DDL
Seq uint64 `msg:"seq"`
>>>>>>> 3c7fd0a1fd (cdc(ddl): ensure strict ordering for multi-table DDLs after split (#12450))
}

// FromJob fills the values with DDLEvent from DDL job
Expand Down
Loading
Loading