Skip to content

Query buffering, terminating blocking transactions for INSTANT DDL and other "special plans" #17945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,10 @@ func testScheduler(t *testing.T) {
ALTER TABLE t2_test ENGINE=InnoDB;
`
instantAlterT1Statement = `
ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0;
ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0
`
instantUndoAlterT1Statement = `
ALTER TABLE t1_test DROP COLUMN i0
`
dropT1Statement = `
DROP TABLE IF EXISTS t1_test
Expand All @@ -405,7 +408,7 @@ func testScheduler(t *testing.T) {
ALTER TABLE nonexistent FORCE
`
populateT1Statement = `
insert into t1_test values (1, 'new_row')
insert ignore into t1_test values (1, 'new_row')
`
)

Expand Down Expand Up @@ -798,6 +801,64 @@ func testScheduler(t *testing.T) {
})
})
}

if forceCutoverCapable {
t.Run("force_cutover_instant", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5)
defer cancel()

t.Run("populate t1_test", func(t *testing.T) {
onlineddl.VtgateExecQuery(t, &vtParams, populateT1Statement, "")
})

commitTransactionChan := make(chan any)
transactionErrorChan := make(chan error)
t.Run("locking table rows", func(t *testing.T) {
go runInTransaction(t, ctx, primaryTablet, "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
})

t.Run("execute migration", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(instantAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait
})
t.Run("expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("check special_plan", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
specialPlan := row.AsString("special_plan", "")
assert.Contains(t, specialPlan, "instant-ddl")
}
})
t.Run("expect transaction failure", func(t *testing.T) {
select {
case commitTransactionChan <- true: // good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
// Transaction will now attempt to commit. But we expect our "force_cutover" to have terminated
// the transaction's connection.
select {
case err := <-transactionErrorChan:
assert.ErrorContains(t, err, "broken pipe")
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
})
t.Run("cleanup: undo migration", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(instantUndoAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait
})
t.Run("cleanup: expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
})
}

t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
Expand Down
63 changes: 61 additions & 2 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,65 @@ func (e *Executor) executeAlterViewOnline(ctx context.Context, onlineDDL *schema
return nil
}

// executeSpecialAlterDirectDDLActionMigration executes a special plan using a direct ALTER TABLE statement.
func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) {

forceCutOverAfter, err := onlineDDL.StrategySetting().ForceCutOverAfter()
if err != nil {
return err
}

bufferingCtx, bufferingContextCancel := context.WithCancel(ctx)
defer bufferingContextCancel()

// Buffer queries while issuing the ALTER TABLE statement (we assume this ALTER is going to be quick,
// as in ALGORITHM=INSTANT or a quick partition operation)
toggleBuffering := func(bufferQueries bool) {
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
timeout := onlineDDL.CutOverThreshold + qrBufferExtraTimeout

e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries)
if !bufferQueries {
// unbuffer existing queries:
bufferingContextCancel()
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
}
defer toggleBuffering(false)
toggleBuffering(true)

// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
// they will be able to complete before the ALTER.
e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering")
time.Sleep(100 * time.Millisecond)

if forceCutOverAfter > 0 {
// Irrespective of the --force-cut-over-after flag value, as long as it's nonzero, we now terminate
// connections adn transactions on the migrated table.
// --force-cut-over-after was designed to work with `vitess` migrations, that could cut-over multiple times,
// and was meant to set a limit to the overall duration of the attempts, for example 1 hour.
// With INSTANT DDL or other quick operations, this becomes meaningless. Once we begin the operation, there
// is no going back. We submit it to MySQL, and it takes however long it takes.
// In this particular function, we expect *very quick* operation.
// So we take --force-cut-over-after as a hint that we should force terminate connections and transactions.
//
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
// This will help in keeping the atomicity guarantee of a prepared transaction.
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
return vterrors.Wrapf(err, "checking prepared pool for table")
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}

if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
return err
}
return nil
}

// executeSpecialAlterDDLActionMigrationIfApplicable sees if the given migration can be executed via special execution path, that isn't a full blown online schema change process.
func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context.Context, onlineDDL *schema.OnlineDDL) (specialMigrationExecuted bool, err error) {
// Before we jump on to strategies... Some ALTERs can be optimized without having to run through
Expand All @@ -2505,11 +2564,11 @@ func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context
case instantDDLSpecialOperation:
schemadiff.AddInstantAlgorithm(specialPlan.alterTable)
onlineDDL.SQL = sqlparser.CanonicalString(specialPlan.alterTable)
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil {
return false, err
}
case rangePartitionSpecialOperation:
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil {
return false, err
}
default:
Expand Down
Loading