Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refactor job args for set-default-value/add column ddl. #56337

Merged
merged 15 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,13 @@ func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Colu
if err != nil {
return nil, nil, nil, nil, false, errors.Trace(err)
}
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
ifNotExists := false
err = job.DecodeArgs(col, pos, &offset, &ifNotExists)

args, err := model.GetAddColumnArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, false, errors.Trace(err)
}
col, pos, ifNotExists := args.Col, args.Pos, args.IfNotExists

columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
Expand Down Expand Up @@ -292,13 +290,12 @@ func isDroppableColumn(tblInfo *model.TableInfo, colName pmodel.CIStr) error {
}

func onSetDefaultValue(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
args, err := model.GetSetDefaultValueArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

newCol := args.Col
return updateColumnDefaultValue(jobCtx, t, job, newCol, &newCol.Name)
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2261,18 +2261,24 @@ func (e *executor) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.Alt
}

job := &model.Job{
Version: model.JobVersion1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still use model.JobVersion1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi schema change job types will handle together after they are all refactored

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the addColumn ddl will be added into multiSchemaChange ddl subjob. The jobVersion of multiSchemaChange is V1, the jobVersion of addColumn ddl is from multiSchemaChange.

SchemaID: schema.ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: tbInfo.Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []any{col, spec.Position, 0, spec.IfNotExists},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AddColumnArgs{
Col: col.ColumnInfo,
Pos: spec.Position,
Offset: 0,
IfNotExists: spec.IfNotExists,
}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3503,18 +3509,21 @@ func (e *executor) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *as
}

job := &model.Job{
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionSetDefaultValue,
BinlogInfo: &model.HistoryInfo{},
Args: []any{col},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.SetDefaultValueArgs{
Col: col.ColumnInfo,
}
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error {
func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error {
switch job.Type {
case model.ActionAddColumn:
col := job.Args[0].(*table.Column)
pos := job.Args[1].(*ast.ColumnPosition)
args := job.JobArgs.(*model.AddColumnArgs)
col, pos := args.Col, args.Pos
info.AddColumns = append(info.AddColumns, col.Name)
for colName := range col.Dependences {
info.RelativeColumns = append(info.RelativeColumns, pmodel.CIStr{L: colName, O: colName})
Expand Down Expand Up @@ -248,7 +248,8 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error {
info.PositionColumns = append(info.PositionColumns, pos.RelativeColumn.Name)
}
case model.ActionSetDefaultValue:
col := job.Args[0].(*table.Column)
args := job.JobArgs.(*model.SetDefaultValueArgs)
col := args.Col
info.ModifyColumns = append(info.ModifyColumns, col.Name)
case model.ActionAlterIndexVisibility:
idxName := job.JobArgs.(*model.AlterIndexVisibilityArgs).IndexName
Expand Down
66 changes: 66 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,3 +1094,69 @@ func GetPlacementPolicyArgs(job *Job) (*PlacementPolicyArgs, error) {

return getOrDecodeArgsV2[*PlacementPolicyArgs](job)
}

// SetDefaultValueArgs is the argument for setting default value ddl.
type SetDefaultValueArgs struct {
Col *ColumnInfo `json:"column_info,omitempty"`
}

func (a *SetDefaultValueArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.Col}
} else {
job.Args = []any{a}
}
}

// GetSetDefaultValueArgs get the args for setting default value ddl.
func GetSetDefaultValueArgs(job *Job) (*SetDefaultValueArgs, error) {
if job.Version == JobVersion1 {
col := &ColumnInfo{}
err := job.DecodeArgs(col)
if err != nil {
return nil, errors.Trace(err)
}
return &SetDefaultValueArgs{Col: col}, nil
}

return getOrDecodeArgsV2[*SetDefaultValueArgs](job)
}

// AddColumnArgs is the arguments for adding column ddl.
type AddColumnArgs struct {
joccau marked this conversation as resolved.
Show resolved Hide resolved
Col *ColumnInfo `json:"column_info,omitempty"`
Pos *ast.ColumnPosition `json:"position,omitempty"`
Offset int `json:"offset,omitempty"`
Copy link
Contributor

@joechenrh joechenrh Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like Offset is never used. Just remove it and put zero in fillJob?

IfNotExists bool `json:"if_not_exists,omitempty"`
}

func (a *AddColumnArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.Col, a.Pos, a.Offset, a.IfNotExists}
} else {
job.Args = []any{a}
}
}

// GetAddColumnArgs gets the args for adding column ddl.
func GetAddColumnArgs(job *Job) (*AddColumnArgs, error) {
if job.Version == JobVersion1 {
var (
col = &ColumnInfo{}
pos = &ast.ColumnPosition{}
offset = 0
ifNotExists = false
)
if err := job.DecodeArgs(col, pos, &offset, &ifNotExists); err != nil {
return nil, errors.Trace(err)
}
return &AddColumnArgs{
Col: col,
Pos: pos,
Offset: offset,
IfNotExists: ifNotExists,
}, nil
}

return getOrDecodeArgsV2[*AddColumnArgs](job)
}
37 changes: 37 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,40 @@ func TestPlacementPolicyArgs(t *testing.T) {
}
}
}

func TestGetSetDefaultValueArgs(t *testing.T) {
inArgs := &SetDefaultValueArgs{
Col: &ColumnInfo{
ID: 7527,
Name: model.NewCIStr("col_name"),
},
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionSetDefaultValue)))
args, err := GetSetDefaultValueArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}

func TestGetAddColumnArgs(t *testing.T) {
inArgs := &AddColumnArgs{
Col: &ColumnInfo{
ID: 7527,
},
Pos: &ast.ColumnPosition{
Tp: ast.ColumnPositionFirst,
},
Offset: 1001,
IfNotExists: true,
}

for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddColumn)))
args, err := GetAddColumnArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}