Skip to content

txn: also update the @@tidb_last_txn_info for readyonly or rollback txns #61057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
exec.Recreated(pointGetPlan, a.Ctx)
a.PsStmt.PointGet.Executor = exec
executor = exec
// If reuses the executor, the executor build phase is skipped, and the txn will not be activated that
// caused `TxnCtx.StartTS` to be 0.
// So we should set the `TxnCtx.StartTS` manually here to make sure it is not 0
// to provide the right value for `@@tidb_last_txn_info` or other variables.
a.Ctx.GetSessionVars().TxnCtx.StartTS = startTs
}
}

Expand Down
94 changes: 77 additions & 17 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"archive/zip"
"context"
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -531,51 +532,110 @@ func TestTiDBLastTxnInfo(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key)")
tk.MustQuery("select @@tidb_last_txn_info").Check(testkit.Rows(""))
// prepare point get
pointGetStmtID, _, _, err := tk.Session().PrepareStmt("select a from t where a = 1024")
require.NoError(t, err)
// first execute short path point-get
rs, err := tk.Session().ExecutePreparedStmt(context.Background(), pointGetStmtID, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(nil)
tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts')").Check(
testkit.Rows(strconv.FormatUint(math.MaxUint64, 10)),
)

// autocommit txn
tk.MustExec("insert into t values (1)")
rows1 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
require.Greater(t, rows1[0][0].(string), "0")
require.Less(t, rows1[0][0].(string), rows1[0][1].(string))
startTS1, err := strconv.ParseUint(rows1[0][0].(string), 10, 64)
require.NoError(t, err)
commitTS1 := tk.Session().GetSessionVars().LastCommitTS
require.Less(t, startTS1, commitTS1)
require.Equal(t, strconv.FormatUint(commitTS1, 10), rows1[0][1])

// readonly txn should also update @@tidb_last_txn_info
tk.MustExec("begin")
tk.MustQuery("select a from t where a = 1").Check(testkit.Rows("1"))
startTS2 := tk.Session().GetSessionVars().TxnCtx.StartTS
require.Less(t, commitTS1, startTS2)
rows2 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows()
tk.MustExec("commit")
rows3 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
require.Equal(t, rows1[0][0], rows2[0][0])
require.Equal(t, rows1[0][1], rows2[0][1])
require.Equal(t, rows1[0][0], rows3[0][0])
require.Equal(t, rows1[0][1], rows3[0][1])
require.Less(t, rows2[0][1], rows2[0][2])
require.Equal(t, strconv.FormatUint(startTS2, 10), rows2[0][2])
tk.MustExec("commit")
rows3 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
require.Equal(t, strconv.FormatUint(startTS2, 10), rows3[0][0])
require.Equal(t, "0", rows3[0][1])

// txn explicitly started with begin
tk.MustExec("begin")
startTS3 := tk.Session().GetSessionVars().TxnCtx.StartTS
require.Less(t, startTS2, startTS3)
tk.MustExec("update t set a = a + 1 where a = 1")
rows4 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows()
require.Equal(t, rows3[0][0], rows4[0][0])
require.Equal(t, rows3[0][1], rows4[0][1])
require.Equal(t, strconv.FormatUint(startTS3, 10), rows4[0][2])
tk.MustExec("commit")
commitTS3 := tk.Session().GetSessionVars().LastCommitTS
require.Less(t, startTS3, commitTS3)
rows5 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
require.Equal(t, rows1[0][0], rows4[0][0])
require.Equal(t, rows1[0][1], rows4[0][1])
require.Equal(t, rows5[0][0], rows4[0][2])
require.Less(t, rows4[0][1], rows4[0][2])
require.Less(t, rows4[0][2], rows5[0][1])
require.Equal(t, strconv.FormatUint(startTS3, 10), rows5[0][0])
require.Equal(t, strconv.FormatUint(commitTS3, 10), rows5[0][1])

// rollback txn
tk.MustExec("begin")
startTS4 := tk.Session().GetSessionVars().TxnCtx.StartTS
require.Less(t, commitTS3, startTS4)
tk.MustExec("update t set a = a + 1 where a = 2")
tk.MustExec("rollback")
rows6 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
require.Equal(t, rows5[0][0], rows6[0][0])
require.Equal(t, rows5[0][1], rows6[0][1])
require.Equal(t, strconv.FormatUint(startTS4, 10), rows6[0][0])
require.Equal(t, "0", rows6[0][1])

// optimistic txn commit failed
tk.MustExec("begin optimistic")
startTS5 := tk.Session().GetSessionVars().TxnCtx.StartTS
require.Less(t, startTS4, startTS5)
tk.MustExec("insert into t values (2)")
err := tk.ExecToErr("commit")
err = tk.ExecToErr("commit")
require.Error(t, err)
rows7 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), json_extract(@@tidb_last_txn_info, '$.error')").Rows()
require.Greater(t, rows7[0][0], rows5[0][0])
require.Equal(t, strconv.FormatUint(startTS5, 10), rows7[0][0])
require.Equal(t, "0", rows7[0][1])
require.Contains(t, err.Error(), rows7[0][1])

// autocommit=0
tk.MustExec("set @@autocommit=0")
tk.MustExec("update t set a = a + 1 where a = 1")
startTS6 := tk.Session().GetSessionVars().TxnCtx.StartTS
require.Less(t, startTS5, startTS6)
tk.MustExec("commit")
rows8 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
commitTS6 := tk.Session().GetSessionVars().LastCommitTS
require.Less(t, startTS6, commitTS6)
require.Equal(t, strconv.FormatUint(startTS6, 10), rows8[0][0])
require.Equal(t, strconv.FormatUint(commitTS6, 10), rows8[0][1])
tk.MustExec("set @@autocommit=1")

// not active txn
tk.MustQuery("select 1")
require.Equal(t, rows8, tk.MustQuery(
"select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows(),
)

// select @@tidb_last_txn_info should not update @@tidb_last_txn_info
require.Equal(t, rows8, tk.MustQuery(
"select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows(),
)

// execute short path point-get again to test the cached case
rs, err = tk.Session().ExecutePreparedStmt(context.Background(), pointGetStmtID, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(nil)
tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts')").Check(
testkit.Rows(strconv.FormatUint(math.MaxUint64, 10)),
)

err = tk.ExecToErr("set @@tidb_last_txn_info = '{}'")
require.True(t, terror.ErrorEqual(err, variable.ErrIncorrectScope), fmt.Sprintf("err: %v", err))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ go_library(
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/transaction",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand Down
24 changes: 24 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tracing"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/transaction"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -919,6 +920,7 @@ func (s *session) CommitTxn(ctx context.Context) error {
r, ctx := tracing.StartRegionEx(ctx, "session.CommitTxn")
defer r.End()

s.setLastTxnInfoBeforeTxnEnd()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting it in onTxnEnd?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems onTxnEnd,the txn info has been cleared

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this line, in doCommitWithRetry -> doCommit, it seems it still have possibility to finally enter a rolled-back state or encounter error. Perhaps it's better to consider put it in a later place where the commit/rollback result is finally determined.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems if a txn comitted or fails to commit, the CommitHook will update it:

txn.SetOption(kv.CommitHook, func(info string, _ error) { sessVars.LastTxnInfo = info })

setLastTxnInfoBeforeTxnEnd in CommitTxn will be overridden at this time. So setLastTxnInfoBeforeTxnEnd is only affects some readonly txns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add comments to explain why it is put here.

var commitDetail *tikvutil.CommitDetails
ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail)
err := s.doCommitWithRetry(ctx)
Expand Down Expand Up @@ -957,6 +959,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
r, ctx := tracing.StartRegionEx(ctx, "session.RollbackTxn")
defer r.End()

s.setLastTxnInfoBeforeTxnEnd()
if s.txn.Valid() {
terror.Log(s.txn.Rollback())
}
Expand All @@ -970,6 +973,26 @@ func (s *session) RollbackTxn(ctx context.Context) {
sessiontxn.GetTxnManager(s).OnTxnEnd()
}

// setLastTxnInfoBeforeTxnEnd sets the @@last_txn_info variable before commit/rollback the transaction.
// The `LastTxnInfo` updated with a JSON string that contains start_ts, for_update_ts, etc.
// The `LastTxnInfo` is updated without the `commit_ts` fields because it is unknown
// until the commit is done (or do not need to commit for readonly or a rollback transaction).
// The non-readonly transaction will overwrite the `LastTxnInfo` again after commit to update the `commit_ts` field.
func (s *session) setLastTxnInfoBeforeTxnEnd() {
txnCtx := s.GetSessionVars().TxnCtx
if txnCtx.StartTS == 0 {
// If the txn is not active, for example, executing "SELECT 1", skip setting the last txn info.
return
}

lastTxnInfo, err := json.Marshal(transaction.TxnInfo{
TxnScope: txnCtx.TxnScope,
StartTS: txnCtx.StartTS,
})
terror.Log(err)
s.GetSessionVars().LastTxnInfo = string(lastTxnInfo)
}

func (s *session) GetClient() kv.Client {
return s.client
}
Expand Down Expand Up @@ -2162,6 +2185,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
var recordSet sqlexec.RecordSet
if stmt.PsStmt != nil { // point plan short path
recordSet, err = stmt.PointGet(ctx)
s.setLastTxnInfoBeforeTxnEnd()
s.txn.changeToInvalid()
} else {
recordSet, err = runStmt(ctx, s, stmt)
Expand Down