Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions iavl/branch_layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ func init() {
}

const (
SizeBranch = 72
SizeBranch = 80
)

type BranchLayout struct {
Id NodeID
Left NodeRef
Right NodeRef
Left NodeID
Right NodeID
LeftOffset uint32 // absolute offset
RightOffset uint32 // absolute offset
KeyOffset uint32
Height uint8
Size uint32 // TODO 5 bytes?
Expand Down
38 changes: 25 additions & 13 deletions iavl/branch_persisted.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package iavlx

import "bytes"
import (
"bytes"
"sync/atomic"
)

type BranchPersisted struct {
store *Changeset
selfIdx uint32
layout BranchLayout
leftPtr, rightPtr *NodePointer
store *Changeset
layout BranchLayout
}

func (node *BranchPersisted) Left() *NodePointer {
return &NodePointer{
mem: atomic.Pointer[MemNode]{},
store: node.store,
fileIdx: node.layout.LeftOffset,
id: node.layout.Left,
}
}

func (node *BranchPersisted) Right() *NodePointer {
return &NodePointer{
mem: atomic.Pointer[MemNode]{},
store: node.store,
fileIdx: node.layout.RightOffset,
id: node.layout.Right,
}
}

func (node *BranchPersisted) ID() NodeID {
Expand Down Expand Up @@ -37,14 +56,6 @@ func (node *BranchPersisted) Value() ([]byte, error) {
return nil, nil
}

func (node *BranchPersisted) Left() *NodePointer {
return node.leftPtr
}

func (node *BranchPersisted) Right() *NodePointer {
return node.rightPtr
}

func (node *BranchPersisted) Hash() []byte {
return node.layout.Hash[:]
}
Expand All @@ -58,6 +69,7 @@ func (node *BranchPersisted) MutateBranch(version uint32) (*MemNode, error) {
if err != nil {
return nil, err
}

memNode := &MemNode{
height: node.Height(),
size: node.Size(),
Expand Down
102 changes: 43 additions & 59 deletions iavl/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,77 +170,61 @@ func (cr *Changeset) resolveBranchWithIdx(nodeId NodeID, fileIdx uint32) (Branch
}
}

func (cr *Changeset) resolveNodeRef(nodeRef NodeRef, selfIdx uint32) *NodePointer {
if nodeRef.IsNodeID() {
id := nodeRef.AsNodeID()
return &NodePointer{
id: id,
store: cr.treeStore.getChangesetForVersion(uint32(id.Version())),
}
}
relPtr := nodeRef.AsRelativePointer()
offset := relPtr.Offset()
if nodeRef.IsLeaf() {
if offset < 1 {
panic(fmt.Sprintf("invalid leaf offset: %d", offset))
}
itemIdx := uint32(offset - 1)
if itemIdx >= uint32(cr.leavesData.Count()) {
panic(fmt.Sprintf("leaf offset %d out of bounds (have %d leaves)", offset, cr.leavesData.Count()))
}
layout := cr.leavesData.UnsafeItem(itemIdx)
return &NodePointer{
id: layout.Id,
store: cr,
fileIdx: uint32(offset),
}
} else {
idx := int64(selfIdx) + offset
if idx < 1 {
panic(fmt.Sprintf("invalid branch index: %d (selfIdx=%d, offset=%d)", idx, selfIdx, offset))
}
itemIdx := uint32(idx - 1)
if itemIdx >= uint32(cr.branchesData.Count()) {
panic(fmt.Sprintf("branch index %d out of bounds (have %d branches)", idx, cr.branchesData.Count()))
}
layout := cr.branchesData.UnsafeItem(itemIdx)
return &NodePointer{
id: layout.Id,
store: cr,
fileIdx: uint32(idx),
}
}
}

func (cr *Changeset) Resolve(nodeId NodeID, fileIdx uint32) (Node, error) {
if cr.evicted.Load() {
return cr.treeStore.Resolve(nodeId, fileIdx)
}
cr.Pin()
defer cr.Unpin()

if nodeId.IsLeaf() {
layout, err := cr.ResolveLeaf(nodeId, fileIdx)
// we don't have a fileIdx, so its probably not in this changeset.
if fileIdx == 0 {
// load up the changeset for this node
cs := cr.treeStore.getChangesetForVersion(uint32(nodeId.Version()))
cs.Pin()
defer cs.Unpin()

// get version data
version := uint32(nodeId.Version())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at the top of the func, no need to call Version() twice

vi, err := cs.getVersionInfo(version)
if err != nil {
return nil, err
}
return &LeafPersisted{layout: layout, store: cr}, nil
if nodeId.IsLeaf() {
leaf, err := cs.leavesData.FindByID(nodeId, &vi.Leaves)
if err != nil {
return nil, err
}
return &LeafPersisted{
store: cs,
selfIdx: 0,
layout: *leaf,
}, nil
} else {
branch, err := cs.branchesData.FindByID(nodeId, &vi.Branches)
if err != nil {
return nil, err
}
return &BranchPersisted{
store: cs,
layout: *branch,
}, nil
}
} else {
layout, actualIdx, err := cr.resolveBranchWithIdx(nodeId, fileIdx)
if err != nil {
return nil, err
// since we have the fileIdx, we know it's in this changeset.
// we can just directly index in this changeset's leaf/branch data.
if nodeId.IsLeaf() {
itemIdx := fileIdx - 1
leafLayout := *cr.leavesData.UnsafeItem(itemIdx)
return &LeafPersisted{layout: leafLayout, store: cr}, nil
} else {
itemIdx := fileIdx - 1
branchLayout := *cr.branchesData.UnsafeItem(itemIdx)
return &BranchPersisted{
layout: branchLayout,
store: cr,
}, nil
}

leftPtr := cr.resolveNodeRef(layout.Left, actualIdx)
rightPtr := cr.resolveNodeRef(layout.Right, actualIdx)

return &BranchPersisted{
layout: layout,
store: cr,
selfIdx: actualIdx,
leftPtr: leftPtr,
rightPtr: rightPtr,
}, nil
}
}

Expand Down
24 changes: 18 additions & 6 deletions iavl/changeset_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,27 @@ func (cs *ChangesetWriter) writeBranch(np *NodePointer, node *MemNode) error {
}
}

// now write parent
parentIdx := int64(cs.branchesData.Count() + 1) // +1 to account for the node being written
leftRef := cs.createNodeRef(parentIdx, node.left)
rightRef := cs.createNodeRef(parentIdx, node.right)
leftVersion := node.left.id.Version()
rightVersion := node.right.id.Version()

var leftOffset uint32
var rightOffset uint32

// If the child node is in the same changeset, store its 1-based file offset.
// fileIdx is already 1-based (set to Count() after append), and 0 means no offset.
if leftVersion >= uint64(cs.StartVersion()) {
leftOffset = node.left.fileIdx
}
if rightVersion >= uint64(cs.StartVersion()) {
rightOffset = node.right.fileIdx
}

layout := BranchLayout{
Id: np.id,
Left: leftRef,
Right: rightRef,
Left: node.left.id,
Right: node.right.id,
LeftOffset: leftOffset,
RightOffset: rightOffset,
KeyOffset: keyOffset,
Height: node.height,
Size: uint32(node.size), // TODO check overflow
Expand Down
84 changes: 23 additions & 61 deletions iavl/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ type Compactor struct {
versionsWriter *StructWriter[VersionInfo]
kvlogWriter *KVLogWriter

leafOffsetRemappings map[uint32]uint32
keyCache map[string]uint32
keyCache map[string]uint32
// offsetCache holds the updated 1-based offsets of nodes affected by compacting.
// these are then used to update BranchLayout's left and right offsets.
offsetCache map[NodeID]uint32

// Running totals across all processed changesets
leafOrphanCount uint32
Expand Down Expand Up @@ -71,18 +73,18 @@ func NewCompacter(logger *slog.Logger, reader *Changeset, opts CompactOptions, s
}

c := &Compactor{
logger: logger,
criteria: opts.RetainCriteria,
compactWAL: opts.CompactWAL,
treeStore: store,
files: newFiles,
originalKvLogPath: reader.files.KVLogPath(),
kvlogWriter: kvlogWriter,
leavesWriter: NewStructWriter[LeafLayout](newFiles.leavesFile),
branchesWriter: NewStructWriter[BranchLayout](newFiles.branchesFile),
versionsWriter: NewStructWriter[VersionInfo](newFiles.versionsFile),
keyCache: make(map[string]uint32),
leafOffsetRemappings: make(map[uint32]uint32),
logger: logger,
criteria: opts.RetainCriteria,
compactWAL: opts.CompactWAL,
treeStore: store,
files: newFiles,
originalKvLogPath: reader.files.KVLogPath(),
kvlogWriter: kvlogWriter,
leavesWriter: NewStructWriter[LeafLayout](newFiles.leavesFile),
branchesWriter: NewStructWriter[BranchLayout](newFiles.branchesFile),
versionsWriter: NewStructWriter[VersionInfo](newFiles.versionsFile),
keyCache: make(map[string]uint32),
offsetCache: make(map[NodeID]uint32),
}

// Process first changeset immediately
Expand Down Expand Up @@ -161,8 +163,7 @@ func (c *Compactor) processChangeset(reader *Changeset) error {
return fmt.Errorf("failed to append leaf %s: %w", id, err)
}

oldLeafFileIdx := leafStartOffset + j
c.leafOffsetRemappings[oldLeafFileIdx] = uint32(c.leavesWriter.Count()) - 1
c.offsetCache[id] = uint32(c.leavesWriter.Count())
}

newBranchStartIdx := uint32(0)
Expand Down Expand Up @@ -191,24 +192,11 @@ func (c *Compactor) processChangeset(reader *Changeset) error {
newBranchEndIdx = id.Index()
newBranchCount++

var err error
left := branch.Left
branch.Left, err = c.updateNodeRef(reader, left, skippedBranches)
if err != nil {
c.logger.Error("failed to update left ref",
"branchId", id,
"branchOrphanVersion", branch.OrphanVersion,
"leftRef", left)
return fmt.Errorf("failed to update left ref for branch %s: %w", id, err)
if newLeftOffset, ok := c.offsetCache[branch.Left]; ok {
branch.LeftOffset = newLeftOffset
}
right := branch.Right
branch.Right, err = c.updateNodeRef(reader, right, skippedBranches)
if err != nil {
c.logger.Error("failed to update right ref",
"branchId", id,
"branchOrphanVersion", branch.OrphanVersion,
"rightRef", right)
return fmt.Errorf("failed to update right ref for branch %s: %w", id, err)
if newRightOffset, ok := c.offsetCache[branch.Right]; ok {
branch.RightOffset = newRightOffset
}

if c.compactWAL {
Expand All @@ -229,10 +217,11 @@ func (c *Compactor) processChangeset(reader *Changeset) error {
branch.KeyOffset += kvOffsetDelta
}

err = c.branchesWriter.Append(&branch)
err := c.branchesWriter.Append(&branch)
if err != nil {
return fmt.Errorf("failed to append branch %s: %w", id, err)
}
c.offsetCache[id] = uint32(c.branchesWriter.Count())
}

verInfo = VersionInfo{
Expand Down Expand Up @@ -307,33 +296,6 @@ func (c *Compactor) Seal() (*Changeset, error) {
return cs, nil
}

func (c *Compactor) updateNodeRef(reader *Changeset, ref NodeRef, skipped int) (NodeRef, error) {
if ref.IsNodeID() {
return ref, nil
}
relPtr := ref.AsRelativePointer()
if relPtr.IsLeaf() {
oldOffset := relPtr.Offset()
newOffset, ok := c.leafOffsetRemappings[uint32(oldOffset)]
if !ok {
// Debug: look up the orphaned leaf
oldLeaf := reader.leavesData.UnsafeItem(uint32(oldOffset) - 1)
c.logger.Error("leaf remapping failed - orphaned leaf still referenced",
"leafOffset", oldOffset,
"leafId", oldLeaf.Id,
"leafOrphanVersion", oldLeaf.OrphanVersion,
"remappings", c.leafOffsetRemappings)
return 0, fmt.Errorf("failed to find remapping for leaf offset %d", oldOffset)
}
return NodeRef(NewNodeRelativePointer(true, int64(newOffset))), nil
} else {
// branch nodes we reduce by the number of skipped nodes
oldOffset := relPtr.Offset()
newOffset := oldOffset - int64(skipped)
return NodeRef(NewNodeRelativePointer(false, newOffset)), nil
}
}

func (c *Compactor) Abort() error {
err := c.files.Close()
if err != nil {
Expand Down
Loading