Skip to content

txn: fix resolver cache usage for async commit #1629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
s.Nil(err)
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
s.Nil(err)
if status.IsRolledBack() {
if s.store.GetOracle().IsExpired(lock.TxnID, status.TTL(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
break
}
time.Sleep(time.Millisecond * 30)
Expand Down
36 changes: 29 additions & 7 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit() {
committer, err := txn.NewCommitter(1)
s.Nil(err)
s.Equal(committer.GetMutations().Len(), 2)
committer.SetLockTTL(0)
committer.SetLockTTL(1)
committer.SetUseAsyncCommit()
committer.SetCommitTS(committer.GetStartTS() + (100 << 18)) // 100ms

Expand Down Expand Up @@ -647,10 +647,20 @@ func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit() {
err = lr.CheckAllSecondaries(bo, lock, &status)
s.True(lr.IsNonAsyncCommitLock(err))

status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
s.Nil(err)
resolveStarted := time.Now()
for {
status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
s.Nil(err)
if status.Action() == kvrpcpb.Action_TTLExpireRollback {
break
}
if time.Since(resolveStarted) > 10*time.Second {
s.NoError(errors.Errorf("Resolve fallback async commit locks timeout"))
}
}
s.Equal(status.Action(), kvrpcpb.Action_TTLExpireRollback)
s.Equal(status.TTL(), uint64(0))
s.Equal(status.IsRolledBack(), true)
}

func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
Expand All @@ -659,9 +669,18 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
lock := s.mustGetLock([]byte("fb1"))
s.True(lock.UseAsyncCommit)
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
s.Nil(err)
s.Equal(expire, int64(0))

resolveStarted := time.Now()
for {
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
s.Nil(err)
if expire == 0 {
break
}
if time.Since(resolveStarted) > 10*time.Second {
s.NoError(errors.Errorf("Resolve fallback async commit locks timeout"))
}
}

t3, err := s.store.Begin()
s.Nil(err)
Expand Down Expand Up @@ -943,7 +962,7 @@ func (s *testLockSuite) TestResolveLocksForRead() {
locks = append(locks, lock)

// can't be pushed but is expired
startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 0, false, true)
startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 1, false, true)
committedLocks = append(committedLocks, startTS)
lock = s.mustGetLock([]byte("k5"))
locks = append(locks, lock)
Expand Down Expand Up @@ -977,6 +996,9 @@ func (s *testLockSuite) TestResolveLocksForRead() {
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
lr := s.store.NewLockResolver()
defer lr.Close()

// Sleep for a while to make sure the async commit lock "k5" expires, so it could be resolve commit.
time.Sleep(500 * time.Millisecond)
msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false)
s.Nil(err)
s.Greater(msBeforeExpired, int64(0))
Expand Down
115 changes: 76 additions & 39 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,27 @@ func (lr *LockResolver) Close() {

// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
// The ttl is set from the `CheckTxnStatus` kv response, it is read only and do not change it.
ttl uint64
commitTS uint64
action kvrpcpb.Action
primaryLock *kvrpcpb.LockInfo
}

// IsCommitted returns true if the txn's final status is Commit.
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 }
func (s TxnStatus) IsCommitted() bool { return s.commitTS > 0 }

// IsRolledBack returns true if the txn's final status is rolled back.
func (s TxnStatus) IsRolledBack() bool { return s.ttl == 0 && s.commitTS == 0 }
func (s TxnStatus) IsRolledBack() bool {
return s.ttl == 0 && s.commitTS == 0 && (s.action == kvrpcpb.Action_NoAction ||
s.action == kvrpcpb.Action_LockNotExistRollback ||
s.action == kvrpcpb.Action_TTLExpireRollback)
}
Comment on lines +127 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my memory, whether the TxnStatus type stands for commited or rolled back is strictly defined by the field ttl and commitTS. Is the previous definition wrong now? May this change cause other unexpected side effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TxnStatus is an abstraction that introduces some complexity. In the current implementation, TxnStatus is determined by the KV response from CheckTxnStatus. Whether a transaction is rolled back depends on the TTL and the specified action, while whether it's committed depends on whether the commit timestamp is greater than 0.

To make it clear two seperate functions IsCommitted and IsRolledback are abstarcated, according to the current CheckTxnStatus implementation.

TxnStatus::RolledBack => resp.set_action(Action::NoAction),
TxnStatus::TtlExpire => resp.set_action(Action::TtlExpireRollback),
TxnStatus::LockNotExist => resp.set_action(Action::LockNotExistRollback),
TxnStatus::Committed { commit_ts } => {
    resp.set_commit_version(commit_ts.into_inner())
}

Only from these four types of respones the transaction status is determined.


// IsStatusDetermined returns true if the txn's final status is determined.
func (s TxnStatus) IsStatusDetermined() bool {
return s.IsRolledBack() || s.IsCommitted()
}

// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return s.commitTS }
Expand All @@ -137,26 +147,32 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }
// StatusCacheable checks whether the transaction status is certain.True will be
// returned if its status is certain:
//
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// The `CheckTxnStatus` status logic is:
//
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should NOT be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
//
// The mapping from `CheckTxnStatus` kv result to the tidb status:
//
// TxnStatus::RolledBack => resp.set_action(Action::NoAction),
// TxnStatus::TtlExpire => resp.set_action(Action::TtlExpireRollback),
// TxnStatus::LockNotExist => resp.set_action(Action::LockNotExistRollback),
// TxnStatus::Committed { commit_ts } => {
// resp.set_commit_version(commit_ts.into_inner())
// }
//
// So the transaction is regarded as committed if the commit_ts is not 0, and rollback if the
// `action` equals `Action::NoAction` or `Action::LockNotExistRollback` or `Action::TtlExpireRollback`.
// Refer to the tikv `CheckTxnStatus` handling logic for more information.
func (s TxnStatus) StatusCacheable() bool {
if s.IsCommitted() {
if s.IsCommitted() || s.IsRolledBack() {
return true
}
if s.ttl == 0 {
if s.action == kvrpcpb.Action_NoAction ||
s.action == kvrpcpb.Action_LockNotExistRollback ||
s.action == kvrpcpb.Action_TTLExpireRollback {
return true
}
}
return false
}

Expand Down Expand Up @@ -204,6 +220,11 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock {
}

func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
if !status.IsStatusDetermined() {
logutil.BgLogger().Error("unexpected undetermined status saved to cache",
zap.Uint64("txnID", txnID), zap.Stringer("status", status), zap.Stack("stack"))
panic("unexpected undetermined status saved to cache")
}
lr.mu.Lock()
defer lr.mu.Unlock()

Expand Down Expand Up @@ -485,7 +506,9 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
} else if err != nil {
return TxnStatus{}, err
}
if status.ttl != 0 {
ttlExpired := (lr.store == nil) || (lr.store.GetOracle().IsExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}))
expiredAsyncCommitLocks := status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit && ttlExpired
if status.ttl != 0 && !expiredAsyncCommitLocks {
return status, nil
}

Expand All @@ -497,7 +520,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
cleanRegions = make(map[locate.RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
if expiredAsyncCommitLocks {
// resolveAsyncCommitLock will resolve all locks of the transaction, so we needn't resolve
// it again if it has been resolved once.
if exists {
Expand Down Expand Up @@ -815,11 +838,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
status.action = cmdResp.Action
status.primaryLock = cmdResp.LockInfo

if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
status.ttl = cmdResp.LockTtl
}
} else if cmdResp.LockTtl != 0 {
if cmdResp.LockTtl != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't remenber the purpose of this code. Could you explain this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The

if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
			if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
				status.ttl = cmdResp.LockTtl
			}
		}

logic is to change TxnStatus.ttl to zero, so the

if status.ttl != 0
   return

if status.primaryLock != nil && status.primaryLock.usaAsyncCommit && !forceSyncCommit {
    resolveAsyncCommit
}

resolveAsyncCommit could be reached in the resolve function.

I’ve moved the TTL expiration check back into the resolve function. This helps avoid modifying the TxnStatus state deep inside the call stack—especially since setting the TTL to 0 can be confusing. It also ensures that the TTL field remains read-only throughout the process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In actual code, all lock TTLs should be greater than 0. Whether a lock has expired should ideally be determined by the upper layer, rather than by modifying the internal state of TxnStatus IMO.

status.ttl = cmdResp.LockTtl
} else {
if cmdResp.CommitVersion == 0 {
Expand Down Expand Up @@ -873,7 +892,7 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s

// Check locks to see if any have been committed or rolled back.
if len(locks) < expected {
logutil.BgLogger().Debug("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
logutil.BgLogger().Info("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
// A lock is missing - the transaction must either have been rolled back or committed.
if !data.missingLock {
// commitTS == 0 => lock has been rolled back.
Expand Down Expand Up @@ -964,10 +983,10 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
}

// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status.
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error {
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, keys [][]byte) error {
util.EvalFailpoint("resolveAsyncResolveData")

keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil)
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1003,31 +1022,49 @@ func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, st
func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, status TxnStatus, asyncResolveAll bool) (TxnStatus, error) {
metrics.LockResolverCountWithResolveAsync.Inc()

resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
return TxnStatus{}, err
}
resolveData.keys = append(resolveData.keys, l.Primary)
var toResolveKeys [][]byte
if status.IsStatusDetermined() {
toResolveKeys = make([][]byte, 0, len(status.primaryLock.Secondaries)+1)
toResolveKeys = append(toResolveKeys, status.primaryLock.Secondaries...)
toResolveKeys = append(toResolveKeys, l.Primary)
Comment on lines +1027 to +1029
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might include those keys that has already been resolved. But it seems it's still faster than checking secondary again and then resolve them as 1 RPC is faster than 2.
Is it that you've already considered that? I suggest that this can be noted in comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would add some comments. The perform should be minial as resolving async commit locks is not a common operation.

} else {
// Only do checkAllSecondaries if the transaction status is undetermined.
// The async commit transaction is regarded as committed if `resolveData.commitTS` is not 0,
// otherwise it is regarded as rolled back. The transaction status should be determined if the
// `checkAllSecondaries` finishes with no errors.
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
return TxnStatus{}, err
}
resolveData.keys = append(resolveData.keys, l.Primary)

status.commitTS = resolveData.commitTs
if status.StatusCacheable() {
lr.saveResolved(l.TxnID, status)
status.commitTS = resolveData.commitTs
if status.StatusCacheable() {
lr.saveResolved(l.TxnID, status)
}
toResolveKeys = resolveData.keys
}

logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS))
if _, err := util.EvalFailpoint("resolveAsyncCommitLockReturn"); err == nil {
return status, nil
}
logutil.BgLogger().Info("resolve async commit locks", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS), zap.Stringer("TxnStatus", status))
if asyncResolveAll {
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
go func() {
err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData)
err := lr.resolveAsyncResolveData(asyncBo, l, status, toResolveKeys)
if err != nil {
logutil.BgLogger().Info("failed to resolve async-commit locks asynchronously",
zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
}
}()
} else {
err = lr.resolveAsyncResolveData(bo, l, status, resolveData)
err := lr.resolveAsyncResolveData(bo, l, status, toResolveKeys)
if err != nil {
return TxnStatus{}, err
}
}
return status, err
return status, nil
}

// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final
Expand Down
50 changes: 50 additions & 0 deletions txnkv/txnlock/lock_resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package txnlock

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/util"
)

// TestLockResolverCache is used to cover the issue https://github.com/pingcap/tidb/issues/59494.
func TestLockResolverCache(t *testing.T) {
util.EnableFailpoints()
lockResolver := NewLockResolver(nil)
lock := func(key, primary string, startTS uint64, useAsyncCommit bool, secondaries [][]byte) *kvrpcpb.LockInfo {
return &kvrpcpb.LockInfo{
Key: []byte(key),
PrimaryLock: []byte(primary),
LockVersion: startTS,
UseAsyncCommit: useAsyncCommit,
MinCommitTs: startTS + 1,
Secondaries: secondaries,
}
}

resolvedTxnTS := uint64(1)
k1 := "k1"
k2 := "k2"
resolvedTxnStatus := TxnStatus{
ttl: 0,
commitTS: 10,
primaryLock: lock(k1, k1, resolvedTxnTS, true, [][]byte{[]byte(k2)}),
}
lockResolver.mu.resolved[resolvedTxnTS] = resolvedTxnStatus
toResolveLock := lock(k2, k1, resolvedTxnTS, true, [][]byte{})
backOff := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff)

// Save the async commit transaction resolved result to the resolver cache.
lockResolver.saveResolved(resolvedTxnTS, resolvedTxnStatus)

// With failpoint, the async commit transaction will be resolved and `CheckSecondaries` would not be called.
// Otherwise, the test would panic as the storage is nil.
require.Nil(t, failpoint.Enable("tikvclient/resolveAsyncCommitLockReturn", "return"))
_, err := lockResolver.ResolveLocks(backOff, 5, []*Lock{NewLock(toResolveLock)})
require.NoError(t, err)
require.Nil(t, failpoint.Disable("tikvclient/resolveAsyncCommitLockReturn"))
}
1 change: 1 addition & 0 deletions txnkv/txnlock/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type LockProbe struct{}
// NewLockStatus returns a txn state that has been locked.
func (l LockProbe) NewLockStatus(keys [][]byte, useAsyncCommit bool, minCommitTS uint64) TxnStatus {
return TxnStatus{
ttl: 1,
primaryLock: &kvrpcpb.LockInfo{
Secondaries: keys,
UseAsyncCommit: useAsyncCommit,
Expand Down
Loading