From 4c16a14e44ad1904520d24f5bd3bdc7d91e10179 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 23 Jan 2025 19:30:05 +0800 Subject: [PATCH] fix initKeysAndMutations Signed-off-by: ekexium --- internal/unionstore/art/art_iterator.go | 23 +++++++++++++++++++---- txnkv/transaction/2pc.go | 7 ++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/unionstore/art/art_iterator.go b/internal/unionstore/art/art_iterator.go index 05a23e62a4..ee825f9dd6 100644 --- a/internal/unionstore/art/art_iterator.go +++ b/internal/unionstore/art/art_iterator.go @@ -19,6 +19,9 @@ import ( "fmt" "sort" + "github.com/tikv/client-go/v2/internal/logutil" + "go.uber.org/zap" + "github.com/pkg/errors" "github.com/tikv/client-go/v2/internal/unionstore/arena" "github.com/tikv/client-go/v2/kv" @@ -84,7 +87,21 @@ type Iterator struct { ignoreSeqNo bool } -func (it *Iterator) Valid() bool { return it.valid && (it.seqNo == it.tree.SeqNo || it.ignoreSeqNo) } +func (it *Iterator) checkSeqNo() { + if it.seqNo != it.tree.SeqNo && !it.ignoreSeqNo { + logutil.BgLogger().Panic( + "seqNo mismatch", + zap.Int("it seqNo", it.seqNo), + zap.Int("art seqNo", it.tree.SeqNo), + zap.Stack("stack"), + ) + } +} + +func (it *Iterator) Valid() bool { + it.checkSeqNo() + return it.valid +} func (it *Iterator) Key() []byte { return it.currLeaf.GetKey() } func (it *Iterator) Flags() kv.KeyFlags { return it.currLeaf.GetKeyFlags() } func (it *Iterator) Value() []byte { @@ -108,9 +125,7 @@ func (it *Iterator) Next() error { // iterate is finished return errors.New("Art: iterator is finished") } - if !it.ignoreSeqNo && it.seqNo != it.tree.SeqNo { - return errors.New(fmt.Sprintf("seqNo mismatch: iter=%d, art=%d", it.seqNo, it.tree.SeqNo)) - } + it.checkSeqNo() if it.currAddr == it.endAddr { it.valid = false return nil diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 6a739b3616..14307bf38e 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -559,6 +559,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { var err error var assertionError error + toUpdatePrewriteOnly := make([][]byte, 0) for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { _ = err key := it.Key() @@ -607,7 +608,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { // due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase. op = kvrpcpb.Op_CheckNotExists checkCnt++ - memBuf.UpdateFlags(key, kv.SetPrewriteOnly) + toUpdatePrewriteOnly = append(toUpdatePrewriteOnly, key) } else { if flags.HasNewlyInserted() { // The delete-your-write keys in pessimistic transactions, only lock needed keys and skip @@ -682,6 +683,10 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { } } + for _, key := range toUpdatePrewriteOnly { + memBuf.UpdateFlags(key, kv.SetPrewriteOnly) + } + if c.mutations.Len() == 0 { return nil }