Skip to content

Commit

Permalink
minor improvements from comments
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Feb 10, 2025
1 parent e906cd5 commit e672ef4
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 16 deletions.
1 change: 1 addition & 0 deletions internal/unionstore/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (cp *MemDBCheckpoint) IsSamePosition(other *MemDBCheckpoint) bool {
return cp.blocks == other.blocks && cp.offsetInBlock == other.offsetInBlock
}

// LessThan compares two checkpoints.
func (cp *MemDBCheckpoint) LessThan(cp2 *MemDBCheckpoint) bool {
if cp == nil || cp2 == nil {
logutil.BgLogger().Panic("unexpected nil checkpoint", zap.Any("cp", cp), zap.Any("cp2", cp2))
Expand Down
14 changes: 7 additions & 7 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type ART struct {
missCount atomic.Uint64

// The counter of every write operation, used to invalidate iterators that were created before the write operation.
SeqNo int
// increased by 1 when an operation that may affect the content returned by "snapshot iter" (i.e. stage[0]) happens.
WriteSeqNo int
// Increased by 1 when an operation that may affect the content returned by "snapshot iter" (i.e. stage[0]) happens.
// It's used to invalidate snapshot iterators.
// invariant: no concurrent access to it
SnapshotSeqNo int
Expand Down Expand Up @@ -122,7 +122,7 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
}
}

t.SeqNo++
t.WriteSeqNo++
if len(t.stages) == 0 {
t.dirty = true
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func (t *ART) RevertToCheckpoint(cp *arena.MemDBCheckpoint) {
t.allocator.vlogAllocator.RevertToCheckpoint(t, cp)
t.allocator.vlogAllocator.Truncate(cp)
t.allocator.vlogAllocator.OnMemChange()
t.SeqNo++
t.WriteSeqNo++
if len(t.stages) == 0 || t.stages[0].LessThan(cp) {
t.SnapshotSeqNo++
}
Expand All @@ -510,7 +510,7 @@ func (t *ART) Release(h int) {
if h != len(t.stages) {
panic("cannot release staging buffer")
}
t.SeqNo++
t.WriteSeqNo++
if h == 1 {
t.SnapshotSeqNo++
tail := t.checkpoint()
Expand All @@ -533,7 +533,7 @@ func (t *ART) Cleanup(h int) {
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(tree.stages)=%v", h, len(t.stages)))
}

t.SeqNo++
t.WriteSeqNo++
if h == 1 {
t.SnapshotSeqNo++
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func (t *ART) Reset() {
t.allocator.vlogAllocator.Reset()
t.lastTraversedNode.Store(arena.NullU64Addr)
t.SnapshotSeqNo++
t.SeqNo++
t.WriteSeqNo++
}

// DiscardValues releases the memory used by all values.
Expand Down
6 changes: 3 additions & 3 deletions internal/unionstore/art/art_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (t *ART) iter(lowerBound, upperBound []byte, reverse, includeFlags bool) (*
// this avoids the initial value of currAddr equals to endAddr.
currAddr: arena.BadAddr,
endAddr: arena.NullAddr,
seqNo: t.SeqNo,
seqNo: t.WriteSeqNo,
}
it.init(lowerBound, upperBound)
if !it.valid {
Expand Down Expand Up @@ -88,11 +88,11 @@ type Iterator struct {
}

func (it *Iterator) checkSeqNo() {
if it.seqNo != it.tree.SeqNo && !it.ignoreSeqNo {
if it.seqNo != it.tree.WriteSeqNo && !it.ignoreSeqNo {
logutil.BgLogger().Panic(
"seqNo mismatch",
zap.Int("it seqNo", it.seqNo),
zap.Int("art seqNo", it.tree.SeqNo),
zap.Int("art seqNo", it.tree.WriteSeqNo),
zap.Stack("stack"),
)
}
Expand Down
8 changes: 5 additions & 3 deletions internal/unionstore/art/art_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)

func (t *ART) getSnapshot() arena.MemDBCheckpoint {
// GetSnapshot returns the "snapshot" for snapshotGetter or snapshotIterator, which is usually the snapshot
// of stage[0]
func (t *ART) GetSnapshot() arena.MemDBCheckpoint {
if len(t.stages) > 0 {
return t.stages[0]
}
Expand All @@ -32,7 +34,7 @@ func (t *ART) getSnapshot() arena.MemDBCheckpoint {
func (t *ART) SnapshotGetter() *SnapGetter {
return &SnapGetter{
tree: t,
cp: t.getSnapshot(),
cp: t.GetSnapshot(),
}
}

Expand All @@ -52,7 +54,7 @@ func (t *ART) newSnapshotIterator(start, end []byte, desc bool) *SnapIter {
inner.ignoreSeqNo = true
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
cp: t.GetSnapshot(),
}
it.tree.allocator.snapshotInc()
for !it.setValue() && it.Valid() {
Expand Down
25 changes: 22 additions & 3 deletions internal/unionstore/memdb_art.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package unionstore
import (
"context"
"fmt"
"go.uber.org/zap"

Check failure on line 20 in internal/unionstore/memdb_art.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed (goimports)
"sync"

"github.com/tikv/client-go/v2/internal/logutil"
Expand Down Expand Up @@ -210,6 +211,9 @@ type snapshotBatchedIter struct {
pos int
batchSize int
nextKey []byte

// only used to check if the snapshot ever changes between batches. It is not supposed to change.
snapshot MemDBCheckpoint
}

func (db *artDBWithContext) BatchedSnapshotIter(lower, upper []byte, reverse bool) Iterator {
Expand All @@ -225,14 +229,29 @@ func (db *artDBWithContext) BatchedSnapshotIter(lower, upper []byte, reverse boo
batchSize: 32,
}

iter.fillBatch()
iter.snapshot = db.GetSnapshot()
err := iter.fillBatch()
if err != nil {
logutil.BgLogger().Error("failed to fill batch for snapshotBatchedIter", zap.Error(err))
}
return iter
}

func (it *snapshotBatchedIter) fillBatch() error {
if it.snapshotTruncateSeqNo != it.db.SnapshotSeqNo {
return errors.New(fmt.Sprintf("invalid iter: truncation happened, iter's=%d, db's=%d",
it.snapshotTruncateSeqNo, it.db.SnapshotSeqNo))
return errors.Errorf(
"invalid iter: truncation happened, iter's=%d, db's=%d",
it.snapshotTruncateSeqNo,
it.db.SnapshotSeqNo,
)
}

if it.db.GetSnapshot() != it.snapshot {
return errors.Errorf(
"snapshot changed between batches, expected=%v, actual=%v",
it.snapshot,
it.db.GetSnapshot(),
)
}

it.db.RLock()
Expand Down

0 comments on commit e672ef4

Please sign in to comment.