Skip to content

Commit

Permalink
fix: return error if the first fillBatch fails
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Feb 10, 2025
1 parent e672ef4 commit 7faf0e1
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 17 deletions.
6 changes: 2 additions & 4 deletions internal/unionstore/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ package arena

import (
"encoding/binary"

Check failure on line 38 in internal/unionstore/arena/arena.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed (goimports)
"math"

"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"

"github.com/tikv/client-go/v2/kv"
"go.uber.org/atomic"
"go.uber.org/zap"
"math"
)

const (
Expand Down
13 changes: 7 additions & 6 deletions internal/unionstore/memdb_art.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package unionstore
import (
"context"
"fmt"
"go.uber.org/zap"
"sync"

"github.com/tikv/client-go/v2/internal/logutil"
Expand Down Expand Up @@ -204,6 +203,7 @@ type snapshotBatchedIter struct {
lower []byte
upper []byte
reverse bool
err error

// current batch
keys [][]byte
Expand All @@ -230,10 +230,7 @@ func (db *artDBWithContext) BatchedSnapshotIter(lower, upper []byte, reverse boo
}

iter.snapshot = db.GetSnapshot()
err := iter.fillBatch()
if err != nil {
logutil.BgLogger().Error("failed to fill batch for snapshotBatchedIter", zap.Error(err))
}
iter.err = iter.fillBatch()
return iter
}

Expand Down Expand Up @@ -325,10 +322,14 @@ func (it *snapshotBatchedIter) fillBatch() error {

func (it *snapshotBatchedIter) Valid() bool {
return it.snapshotTruncateSeqNo == it.db.SnapshotSeqNo &&
it.pos < len(it.keys)
it.pos < len(it.keys) &&
it.err == nil
}

func (it *snapshotBatchedIter) Next() error {
if it.err != nil {
return it.err
}
if it.snapshotTruncateSeqNo != it.db.SnapshotSeqNo {
return errors.New(
fmt.Sprintf(
Expand Down
5 changes: 0 additions & 5 deletions internal/unionstore/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,6 @@ func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit)
}

type KvPair struct {
Key []byte
Value []byte
}

// MemBuffer is an interface that stores mutations that written during transaction execution.
// It now unifies MemDB and PipelinedMemDB.
// The implementations should follow the transaction guarantees:
Expand Down
2 changes: 0 additions & 2 deletions tikv/unionstore_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,3 @@ type MemDBCheckpoint = unionstore.MemDBCheckpoint

// Metrics is the metrics of unionstore.
type Metrics = unionstore.Metrics

type KvPair = unionstore.KvPair

0 comments on commit 7faf0e1

Please sign in to comment.