Skip to content

Commit ff3bb45

Browse files
committed
fix resolver cache usage for async commit
Signed-off-by: cfzjywxk <[email protected]>
1 parent 2b8c6a7 commit ff3bb45

File tree

3 files changed

+122
-40
lines changed

3 files changed

+122
-40
lines changed

integration_tests/async_commit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
624624
s.Nil(err)
625625
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
626626
s.Nil(err)
627-
if status.IsRolledBack() {
627+
if s.store.GetOracle().IsExpired(lock.TxnID, status.TTL(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
628628
break
629629
}
630630
time.Sleep(time.Millisecond * 30)

txnkv/txnlock/lock_resolver.go

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,27 @@ func (lr *LockResolver) Close() {
113113

114114
// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
115115
type TxnStatus struct {
116+
// The ttl is set from the `CheckTxnStatus` kv response, it is read only and do not change it.
116117
ttl uint64
117118
commitTS uint64
118119
action kvrpcpb.Action
119120
primaryLock *kvrpcpb.LockInfo
120121
}
121122

122123
// IsCommitted returns true if the txn's final status is Commit.
123-
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 }
124+
func (s TxnStatus) IsCommitted() bool { return s.commitTS > 0 }
124125

125126
// IsRolledBack returns true if the txn's final status is rolled back.
126-
func (s TxnStatus) IsRolledBack() bool { return s.ttl == 0 && s.commitTS == 0 }
127+
func (s TxnStatus) IsRolledBack() bool {
128+
return s.ttl == 0 && s.commitTS == 0 && (s.action == kvrpcpb.Action_NoAction ||
129+
s.action == kvrpcpb.Action_LockNotExistRollback ||
130+
s.action == kvrpcpb.Action_TTLExpireRollback)
131+
}
132+
133+
// IsStatusDetermined returns true if the txn's final status is determined.
134+
func (s TxnStatus) IsStatusDetermined() bool {
135+
return s.IsRolledBack() || s.IsCommitted()
136+
}
127137

128138
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
129139
func (s TxnStatus) CommitTS() uint64 { return s.commitTS }
@@ -137,26 +147,32 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }
137147
// StatusCacheable checks whether the transaction status is certain.True will be
138148
// returned if its status is certain:
139149
//
140-
// If transaction is already committed, the result could be cached.
141-
// Otherwise:
142-
// If l.LockType is pessimistic lock type:
143-
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
144-
// - if its primary lock is prewrite lock type, the check txn status could be cached.
145-
// If l.lockType is prewrite lock type:
146-
// - always cache the check txn status result.
150+
// The `CheckTxnStatus` status logic is:
151+
//
152+
// If l.LockType is pessimistic lock type:
153+
// - if its primary lock is pessimistic too, the check txn status result should NOT be cached.
154+
// - if its primary lock is prewrite lock type, the check txn status could be cached.
155+
// If l.lockType is prewrite lock type:
156+
// - always cache the check txn status result.
147157
//
148158
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
159+
//
160+
// The mapping from `CheckTxnStatus` kv result to the tidb status:
161+
//
162+
// TxnStatus::RolledBack => resp.set_action(Action::NoAction),
163+
// TxnStatus::TtlExpire => resp.set_action(Action::TtlExpireRollback),
164+
// TxnStatus::LockNotExist => resp.set_action(Action::LockNotExistRollback),
165+
// TxnStatus::Committed { commit_ts } => {
166+
// resp.set_commit_version(commit_ts.into_inner())
167+
// }
168+
//
169+
// So the transaction is regarded as committed if the commit_ts is not 0, and rollback if the
170+
// `action` equals `Action::NoAction` or `Action::LockNotExistRollback` or `Action::TtlExpireRollback`.
171+
// Refer to the tikv `CheckTxnStatus` handling logic for more information.
149172
func (s TxnStatus) StatusCacheable() bool {
150-
if s.IsCommitted() {
173+
if s.IsCommitted() || s.IsRolledBack() {
151174
return true
152175
}
153-
if s.ttl == 0 {
154-
if s.action == kvrpcpb.Action_NoAction ||
155-
s.action == kvrpcpb.Action_LockNotExistRollback ||
156-
s.action == kvrpcpb.Action_TTLExpireRollback {
157-
return true
158-
}
159-
}
160176
return false
161177
}
162178

@@ -485,7 +501,9 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
485501
} else if err != nil {
486502
return TxnStatus{}, err
487503
}
488-
if status.ttl != 0 {
504+
expiredAsyncCommitLocks := status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit &&
505+
lr.store.GetOracle().IsExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
506+
if status.ttl != 0 && !expiredAsyncCommitLocks {
489507
return status, nil
490508
}
491509

@@ -497,7 +515,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
497515
cleanRegions = make(map[locate.RegionVerID]struct{})
498516
cleanTxns[l.TxnID] = cleanRegions
499517
}
500-
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
518+
if expiredAsyncCommitLocks {
501519
// resolveAsyncCommitLock will resolve all locks of the transaction, so we needn't resolve
502520
// it again if it has been resolved once.
503521
if exists {
@@ -815,11 +833,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
815833
status.action = cmdResp.Action
816834
status.primaryLock = cmdResp.LockInfo
817835

818-
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
819-
if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
820-
status.ttl = cmdResp.LockTtl
821-
}
822-
} else if cmdResp.LockTtl != 0 {
836+
if cmdResp.LockTtl != 0 {
823837
status.ttl = cmdResp.LockTtl
824838
} else {
825839
if cmdResp.CommitVersion == 0 {
@@ -873,7 +887,7 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s
873887

874888
// Check locks to see if any have been committed or rolled back.
875889
if len(locks) < expected {
876-
logutil.BgLogger().Debug("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
890+
logutil.BgLogger().Info("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
877891
// A lock is missing - the transaction must either have been rolled back or committed.
878892
if !data.missingLock {
879893
// commitTS == 0 => lock has been rolled back.
@@ -964,10 +978,10 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
964978
}
965979

966980
// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status.
967-
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error {
981+
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, keys [][]byte) error {
968982
util.EvalFailpoint("resolveAsyncResolveData")
969983

970-
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil)
984+
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
971985
if err != nil {
972986
return err
973987
}
@@ -1003,31 +1017,49 @@ func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, st
10031017
func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, status TxnStatus, asyncResolveAll bool) (TxnStatus, error) {
10041018
metrics.LockResolverCountWithResolveAsync.Inc()
10051019

1006-
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
1007-
if err != nil {
1008-
return TxnStatus{}, err
1009-
}
1010-
resolveData.keys = append(resolveData.keys, l.Primary)
1020+
var toResolveKeys [][]byte
1021+
if status.IsStatusDetermined() {
1022+
toResolveKeys = make([][]byte, 0, len(status.primaryLock.Secondaries)+1)
1023+
toResolveKeys = append(toResolveKeys, status.primaryLock.Secondaries...)
1024+
toResolveKeys = append(toResolveKeys, l.Primary)
1025+
} else {
1026+
// Only do checkAllSecondaries if the transaction status is undetermined.
1027+
// The async commit transaction is regarded as committed if `resolveData.commitTS` is not 0,
1028+
// otherwise it is regarded as rolled back. The transaction status should be determined if the
1029+
// `checkAllSecondaries` finishes with no errors.
1030+
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
1031+
if err != nil {
1032+
return TxnStatus{}, err
1033+
}
1034+
resolveData.keys = append(resolveData.keys, l.Primary)
10111035

1012-
status.commitTS = resolveData.commitTs
1013-
if status.StatusCacheable() {
1014-
lr.saveResolved(l.TxnID, status)
1036+
status.commitTS = resolveData.commitTs
1037+
if status.StatusCacheable() {
1038+
lr.saveResolved(l.TxnID, status)
1039+
}
1040+
toResolveKeys = resolveData.keys
10151041
}
10161042

1017-
logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS))
1043+
if _, err := util.EvalFailpoint("resolveAsyncCommitLockReturn"); err == nil {
1044+
return status, nil
1045+
}
1046+
logutil.BgLogger().Info("resolve async commit locks", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS), zap.Stringer("TxnStatus", status))
10181047
if asyncResolveAll {
10191048
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
10201049
go func() {
1021-
err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData)
1050+
err := lr.resolveAsyncResolveData(asyncBo, l, status, toResolveKeys)
10221051
if err != nil {
10231052
logutil.BgLogger().Info("failed to resolve async-commit locks asynchronously",
10241053
zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
10251054
}
10261055
}()
10271056
} else {
1028-
err = lr.resolveAsyncResolveData(bo, l, status, resolveData)
1057+
err := lr.resolveAsyncResolveData(bo, l, status, toResolveKeys)
1058+
if err != nil {
1059+
return TxnStatus{}, err
1060+
}
10291061
}
1030-
return status, err
1062+
return status, nil
10311063
}
10321064

10331065
// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final

txnkv/txnlock/lock_resolver_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package txnlock
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/pingcap/failpoint"
8+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
9+
"github.com/stretchr/testify/require"
10+
"github.com/tikv/client-go/v2/config/retry"
11+
"github.com/tikv/client-go/v2/util"
12+
)
13+
14+
// TestLockResolverCache is used to cover the issue https://github.com/pingcap/tidb/issues/59494.
15+
func TestLockResolverCache(t *testing.T) {
16+
util.EnableFailpoints()
17+
lockResolver := NewLockResolver(nil)
18+
lock := func(key, primary string, startTS uint64, useAsyncCommit bool, secondaries [][]byte) *kvrpcpb.LockInfo {
19+
return &kvrpcpb.LockInfo{
20+
Key: []byte(key),
21+
PrimaryLock: []byte(primary),
22+
LockVersion: startTS,
23+
UseAsyncCommit: useAsyncCommit,
24+
MinCommitTs: startTS + 1,
25+
Secondaries: secondaries,
26+
}
27+
}
28+
29+
resolvedTxnTS := uint64(1)
30+
k1 := "k1"
31+
k2 := "k2"
32+
resolvedTxnStatus := TxnStatus{
33+
ttl: 0,
34+
commitTS: 10,
35+
primaryLock: lock(k1, k1, resolvedTxnTS, true, [][]byte{[]byte(k2)}),
36+
}
37+
lockResolver.mu.resolved[resolvedTxnTS] = resolvedTxnStatus
38+
toResolveLock := lock(k2, k1, resolvedTxnTS, true, [][]byte{})
39+
backOff := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff)
40+
41+
// Save the async commit transaction resolved result to the resolver cache.
42+
lockResolver.saveResolved(resolvedTxnTS, resolvedTxnStatus)
43+
44+
// With failpoint, the async commit transaction will be resolved and `CheckSecondaries` would not be called.
45+
// Otherwise, the test would panic as the storage is nil.
46+
require.Nil(t, failpoint.Enable("tikvclient/resolveAsyncCommitLockReturn", "return"))
47+
_, err := lockResolver.ResolveLocks(backOff, 5, []*Lock{NewLock(toResolveLock)})
48+
require.NoError(t, err)
49+
require.Nil(t, failpoint.Disable("tikvclient/resolveAsyncCommitLockReturn"))
50+
}

0 commit comments

Comments
 (0)