Skip to content

Commit a62a6d1

Browse files
authored
server, session: interrupt autocommit DML after disconnect (#68237)
close #68236
1 parent 296420e commit a62a6d1

6 files changed

Lines changed: 330 additions & 42 deletions

File tree

pkg/server/conn.go

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ import (
107107
"github.com/pingcap/tidb/pkg/util/hack"
108108
"github.com/pingcap/tidb/pkg/util/intest"
109109
"github.com/pingcap/tidb/pkg/util/logutil"
110+
"github.com/pingcap/tidb/pkg/util/sqlkiller"
110111
tlsutil "github.com/pingcap/tidb/pkg/util/tls"
111112
"github.com/pingcap/tidb/pkg/util/topsql"
112113
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
@@ -2089,6 +2090,92 @@ func setResourceGroupTaggerForMultiStmtPrefetch(snapshot kv.Snapshot, sqls strin
20892090
}
20902091
}
20912092

2093+
// setSQLKillerConnectionAlive installs a connection-liveness probe on the
2094+
// session SQLKiller and starts a background monitor for the current statement.
2095+
// The returned cleanup is idempotent and must be called when the statement is
2096+
// done to stop the monitor and clear the probe.
2097+
func (cc *clientConn) setSQLKillerConnectionAlive() func() {
2098+
fn := func() bool {
2099+
if cc.bufReadConn != nil {
2100+
// IsAlive returns 0 only when the connection is known dead. Treat
2101+
// unknown states as alive so we do not interrupt queries
2102+
// conservatively when the liveness check itself cannot run.
2103+
return cc.bufReadConn.IsAlive() != 0
2104+
}
2105+
return true
2106+
}
2107+
cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn)
2108+
stopMonitor := make(chan struct{})
2109+
doneMonitor := make(chan struct{})
2110+
go cc.monitorConnectionAlive(fn, stopMonitor, doneMonitor)
2111+
2112+
var clearOnce sync.Once
2113+
return func() {
2114+
clearOnce.Do(func() {
2115+
close(stopMonitor)
2116+
<-doneMonitor
2117+
cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil)
2118+
})
2119+
}
2120+
}
2121+
2122+
func (cc *clientConn) monitorConnectionAlive(isAlive func() bool, stop <-chan struct{}, done chan<- struct{}) {
2123+
defer close(done)
2124+
checkInterval := time.Second
2125+
failpoint.Inject("mockConnectionAliveMonitorInterval", func(val failpoint.Value) {
2126+
if interval, ok := val.(int); ok {
2127+
checkInterval = time.Duration(interval) * time.Millisecond
2128+
}
2129+
})
2130+
ticker := time.NewTicker(checkInterval)
2131+
defer ticker.Stop()
2132+
for {
2133+
select {
2134+
case <-ticker.C:
2135+
if !isAlive() {
2136+
select {
2137+
case <-stop:
2138+
return
2139+
default:
2140+
}
2141+
cc.ctx.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
2142+
cc.cancelDispatch()
2143+
return
2144+
}
2145+
case <-stop:
2146+
return
2147+
}
2148+
}
2149+
}
2150+
2151+
func (cc *clientConn) cancelDispatch() {
2152+
cc.mu.RLock()
2153+
cancelFunc := cc.mu.cancelFunc
2154+
cc.mu.RUnlock()
2155+
if cancelFunc != nil {
2156+
cancelFunc()
2157+
}
2158+
}
2159+
2160+
func shouldMonitorConnectionAliveDuringExecute(stmt ast.StmtNode, sessVars *variable.SessionVars) bool {
2161+
if !sessVars.IsAutocommit() || sessVars.InTxn() {
2162+
return false
2163+
}
2164+
if executeStmt, ok := stmt.(*ast.ExecuteStmt); ok {
2165+
prepared, err := plannercore.GetPreparedStmt(executeStmt, sessVars)
2166+
if err != nil || prepared.PreparedAst == nil {
2167+
return false
2168+
}
2169+
stmt = prepared.PreparedAst.Stmt
2170+
}
2171+
switch stmt.(type) {
2172+
case *ast.InsertStmt, *ast.UpdateStmt, *ast.DeleteStmt:
2173+
return true
2174+
default:
2175+
return false
2176+
}
2177+
}
2178+
20922179
// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
20932180
// Currently, the first return value is used to fall back to TiKV when TiFlash is down.
20942181
func (cc *clientConn) handleStmt(
@@ -2111,7 +2198,16 @@ func (cc *clientConn) handleStmt(
21112198
}
21122199
}
21132200

2201+
clearConnectionAlive := func() {}
2202+
monitoringConnectionAlive := shouldMonitorConnectionAliveDuringExecute(stmt, cc.ctx.GetSessionVars())
2203+
if monitoringConnectionAlive {
2204+
clearConnectionAlive = cc.setSQLKillerConnectionAlive()
2205+
defer clearConnectionAlive()
2206+
}
21142207
rs, err := cc.ctx.ExecuteStmt(ctx, stmt)
2208+
if rs == nil || err != nil {
2209+
clearConnectionAlive()
2210+
}
21152211
reg.End()
21162212
// - If rs is not nil, the statement tracker detachment from session tracker
21172213
// is done in the `rs.Close` in most cases.
@@ -2142,22 +2238,18 @@ func (cc *clientConn) handleStmt(
21422238
if cc.getStatus() == connStatusShutdown {
21432239
return false, exeerrors.ErrQueryInterrupted
21442240
}
2241+
if !monitoringConnectionAlive {
2242+
clearConnectionAlive = cc.setSQLKillerConnectionAlive()
2243+
defer clearConnectionAlive()
2244+
}
21452245
cc.ctx.GetSessionVars().SQLKiller.SetFinishFunc(
21462246
func() {
21472247
//nolint: errcheck
21482248
rs.Finish()
21492249
})
2150-
fn := func() bool {
2151-
if cc.bufReadConn != nil {
2152-
return cc.bufReadConn.IsAlive() != 0
2153-
}
2154-
return true
2155-
}
2156-
cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn)
21572250
cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(true)
21582251
defer cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(false)
21592252
defer cc.ctx.GetSessionVars().SQLKiller.ClearFinishFunc()
2160-
defer cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil)
21612253
if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil {
21622254
return retryable, err
21632255
}

pkg/server/conn_stmt.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,15 +233,6 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
233233
func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args []param.BinaryParam, useCursor bool) (err error) {
234234
ctx = execdetails.ContextWithInitializedExecDetails(ctx)
235235

236-
fn := func() bool {
237-
if cc.bufReadConn != nil {
238-
return cc.bufReadConn.IsAlive() != 0
239-
}
240-
return true
241-
}
242-
cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn)
243-
defer cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil)
244-
245236
//nolint:forcetypeassert
246237
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor)
247238
if err != nil {
@@ -319,9 +310,25 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
319310
sql = planCacheStmt.StmtText
320311
}
321312
execStmt.SetText(charset.EncodingUTF8Impl, sql)
313+
clearConnectionAlive := func() {}
314+
monitoringConnectionAlive := false
315+
if planCacheStmt != nil && planCacheStmt.PreparedAst != nil {
316+
monitoringConnectionAlive = shouldMonitorConnectionAliveDuringExecute(planCacheStmt.PreparedAst.Stmt, vars)
317+
if monitoringConnectionAlive {
318+
clearConnectionAlive = cc.setSQLKillerConnectionAlive()
319+
defer clearConnectionAlive()
320+
}
321+
}
322322
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
323+
if rs == nil || err != nil {
324+
clearConnectionAlive()
325+
}
323326
var lazy bool
324327
if rs != nil {
328+
if !monitoringConnectionAlive {
329+
clearConnectionAlive = cc.setSQLKillerConnectionAlive()
330+
defer clearConnectionAlive()
331+
}
325332
defer func() {
326333
if !lazy {
327334
rs.Close()

pkg/server/server.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,13 +1017,7 @@ func killQuery(conn *clientConn, maxExecutionTime, runaway bool) {
10171017
} else {
10181018
sessVars.SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
10191019
}
1020-
conn.mu.RLock()
1021-
cancelFunc := conn.mu.cancelFunc
1022-
conn.mu.RUnlock()
1023-
1024-
if cancelFunc != nil {
1025-
cancelFunc()
1026-
}
1020+
conn.cancelDispatch()
10271021
sessVars.SQLKiller.FinishResultSet()
10281022
}
10291023

0 commit comments

Comments
 (0)