From 941d94b94335b9b0f67feda42590e9d3444bfd87 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 10 Feb 2025 14:54:53 +0800 Subject: [PATCH] fix: return error if the first fillBatch fails Signed-off-by: ekexium --- internal/unionstore/arena/arena.go | 3 +-- internal/unionstore/memdb_art.go | 13 +++++++------ internal/unionstore/union_store.go | 5 ----- tikv/unionstore_export.go | 2 -- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/internal/unionstore/arena/arena.go b/internal/unionstore/arena/arena.go index f5e7b83855..6ae3e890d9 100644 --- a/internal/unionstore/arena/arena.go +++ b/internal/unionstore/arena/arena.go @@ -39,10 +39,9 @@ import ( "math" "github.com/tikv/client-go/v2/internal/logutil" - "go.uber.org/zap" - "github.com/tikv/client-go/v2/kv" "go.uber.org/atomic" + "go.uber.org/zap" ) const ( diff --git a/internal/unionstore/memdb_art.go b/internal/unionstore/memdb_art.go index 144c75b8f4..0f0f17338b 100644 --- a/internal/unionstore/memdb_art.go +++ b/internal/unionstore/memdb_art.go @@ -17,7 +17,6 @@ package unionstore import ( "context" "fmt" - "go.uber.org/zap" "sync" "github.com/tikv/client-go/v2/internal/logutil" @@ -204,6 +203,7 @@ type snapshotBatchedIter struct { lower []byte upper []byte reverse bool + err error // current batch keys [][]byte @@ -230,10 +230,7 @@ func (db *artDBWithContext) BatchedSnapshotIter(lower, upper []byte, reverse boo } iter.snapshot = db.GetSnapshot() - err := iter.fillBatch() - if err != nil { - logutil.BgLogger().Error("failed to fill batch for snapshotBatchedIter", zap.Error(err)) - } + iter.err = iter.fillBatch() return iter } @@ -325,10 +322,14 @@ func (it *snapshotBatchedIter) fillBatch() error { func (it *snapshotBatchedIter) Valid() bool { return it.snapshotTruncateSeqNo == it.db.SnapshotSeqNo && - it.pos < len(it.keys) + it.pos < len(it.keys) && + it.err == nil } func (it *snapshotBatchedIter) Next() error { + if it.err != nil { + return it.err + } if it.snapshotTruncateSeqNo != it.db.SnapshotSeqNo { return errors.New( fmt.Sprintf( diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index 6127101863..c6d091f592 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -162,11 +162,6 @@ func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit) } -type KvPair struct { - Key []byte - Value []byte -} - // MemBuffer is an interface that stores mutations that written during transaction execution. // It now unifies MemDB and PipelinedMemDB. // The implementations should follow the transaction guarantees: diff --git a/tikv/unionstore_export.go b/tikv/unionstore_export.go index efbaf93f2c..80ee88f42a 100644 --- a/tikv/unionstore_export.go +++ b/tikv/unionstore_export.go @@ -60,5 +60,3 @@ type MemDBCheckpoint = unionstore.MemDBCheckpoint // Metrics is the metrics of unionstore. type Metrics = unionstore.Metrics - -type KvPair = unionstore.KvPair