Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions db/etl/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,198 @@ func (b *sortableBuffer) Sort() {
slices.SortFunc(b.entries, cmp)
}

func (b *sortableBuffer) SortByKeyAndValue() {
data := b.data
cmp := func(a, b entryLoc) int {
aKey := data[a.offset : a.offset+max(a.keyLen, 0)]
bKey := data[b.offset : b.offset+max(b.keyLen, 0)]
if c := bytes.Compare(aKey, bKey); c != 0 {
return c
}
aValOff := a.offset + max(a.keyLen, 0)
bValOff := b.offset + max(b.keyLen, 0)
aVal := data[aValOff : aValOff+max(a.valLen, 0)]
bVal := data[bValOff : bValOff+max(b.valLen, 0)]
if c := bytes.Compare(aVal, bVal); c != 0 {
return c
}
return int(a.insertionOrder - b.insertionOrder)
}
if slices.IsSortedFunc(b.entries, cmp) {
return
}
slices.SortFunc(b.entries, cmp)
}

// SortByKeyAndValueGrouped sorts entries by (key, value, insertionOrder) using a
// single-pass approach optimised for the common case where entries are already
// key-sorted (e.g. txNum-keyed tables where txNums always increase during exec).
//
// Algorithm:
// 1. Scan entries pairwise. As long as keys are non-decreasing, sort each
// completed equal-key group by value using an inline insertion sort with the
// key length pre-computed once per group (avoids per-comparison overhead).
// 2. If a key is ever found to decrease (data not key-sorted), fall back to a
// full O(n log n) sort by (key, value, insertionOrder).
//
// Compared to the naive SortByKeyAndValue, this avoids:
// - The O(n) IsSortedFunc scan that nearly always fails for txNum data
// (values within a key group are in insertion order, not value-sorted)
// - A redundant second O(n) scan for group boundaries
// - Per-comparison max(keyLen,0) arithmetic (precomputed once per group)
// - slices.SortFunc call overhead for small groups (inline insertion sort)
func (b *sortableBuffer) SortByKeyAndValueGrouped() {
data := b.data
entries := b.entries
if len(entries) <= 1 {
return
}

start := 0
for i := 1; i <= len(entries); i++ {
if i < len(entries) {
ci, prev := entries[i], entries[i-1]
// Fast path: if both keys are 8 bytes (the txNum case), compare as
// big-endian uint64 inline to avoid the bytes.Compare function-call
// overhead (~3-5 ns per call × 500k iterations is meaningful).
var c int
if ci.keyLen == 8 && prev.keyLen == 8 {
ck := binary.BigEndian.Uint64(data[ci.offset:])
pk := binary.BigEndian.Uint64(data[prev.offset:])
if ck < pk {
c = -1
} else if ck > pk {
c = 1
}
} else {
kl := max(ci.keyLen, 0)
prevKl := max(prev.keyLen, 0)
c = bytes.Compare(
data[ci.offset:ci.offset+kl],
data[prev.offset:prev.offset+prevKl],
)
}
if c == 0 {
continue // same key — extend current group
}
if c > 0 {
// Key advanced — current group is complete
if i-start > 1 {
sortGroupByVal(entries[start:i], data, int(max(entries[start].keyLen, 0)))
}
start = i
continue
}
// c < 0: key went backwards — data is not key-sorted; fall back.
} else {
// End of entries — flush final group.
if i-start > 1 {
sortGroupByVal(entries[start:i], data, int(max(entries[start].keyLen, 0)))
}
return
}

// Full fallback: sort everything by (key, value, insertionOrder).
// Groups already value-sorted above remain valid; the full sort is correct
// regardless of intermediate state.
slices.SortFunc(entries, func(a, b entryLoc) int {
aKey := data[a.offset : a.offset+max(a.keyLen, 0)]
bKey := data[b.offset : b.offset+max(b.keyLen, 0)]
if c := bytes.Compare(aKey, bKey); c != 0 {
return c
}
aValOff := a.offset + max(a.keyLen, 0)
bValOff := b.offset + max(b.keyLen, 0)
aVal := data[aValOff : aValOff+max(a.valLen, 0)]
bVal := data[bValOff : bValOff+max(b.valLen, 0)]
if c := bytes.Compare(aVal, bVal); c != 0 {
return c
}
return int(a.insertionOrder - b.insertionOrder)
})
return
}
}

// sortGroupByVal sorts a slice of entryLoc by (value, insertionOrder).
// kl is the pre-computed key length shared by all entries in the group.
//
// For small groups (≤ insertionSortThreshold) uses an inline insertion sort with
// all comparison logic expanded directly in the loop — no function-call overhead.
// Uses a uint64 prefix fast-path: when both values are ≥8 bytes, loads and compares
// the first 8 bytes as a big-endian uint64 without calling bytes.Compare (~3-5 ns
// per call saved). The current element's 8-byte prefix (ep) is hoisted out of the
// inner loop so it is loaded only once per outer iteration. Falls back to
// bytes.Compare when prefixes are equal or values are <8 bytes.
const insertionSortThreshold = 24

func sortGroupByVal(group []entryLoc, data []byte, kl int) {
if len(group) <= insertionSortThreshold {
for i := 1; i < len(group); i++ {
e := group[i]
eVOff := int(e.offset) + kl
eVLen := int(max(e.valLen, 0))
// Hoist the current element's uint64 prefix out of the inner loop.
var ep uint64
eHas8 := eVLen >= 8
if eHas8 {
ep = binary.BigEndian.Uint64(data[eVOff:])
}
j := i - 1
for j >= 0 {
f := group[j]
fVOff := int(f.offset) + kl
fVLen := int(max(f.valLen, 0))
var c int
if eHas8 && fVLen >= 8 {
fp := binary.BigEndian.Uint64(data[fVOff:])
if fp != ep {
if fp < ep {
c = -1
} else {
c = 1
}
} else {
c = bytes.Compare(data[fVOff:fVOff+fVLen], data[eVOff:eVOff+eVLen])
}
} else {
c = bytes.Compare(data[fVOff:fVOff+fVLen], data[eVOff:eVOff+eVLen])
}
if c == 0 {
c = int(f.insertionOrder - e.insertionOrder)
}
if c <= 0 {
break
}
group[j+1] = group[j]
j--
}
group[j+1] = e
}
return
}
slices.SortFunc(group, func(a, b entryLoc) int {
aVOff := int(a.offset) + kl
bVOff := int(b.offset) + kl
aVLen := int(max(a.valLen, 0))
bVLen := int(max(b.valLen, 0))
if aVLen >= 8 && bVLen >= 8 {
ap := binary.BigEndian.Uint64(data[aVOff:])
bp := binary.BigEndian.Uint64(data[bVOff:])
if ap != bp {
if ap < bp {
return -1
}
return 1
}
}
if c := bytes.Compare(data[aVOff:aVOff+aVLen], data[bVOff:bVOff+bVLen]); c != 0 {
return c
}
return int(a.insertionOrder - b.insertionOrder)
})
}

func (b *sortableBuffer) CheckFlushSize() bool {
return b.Size() >= b.optimalSize
}
Expand Down
45 changes: 38 additions & 7 deletions db/etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type Collector struct {
sortAndFlushInBackground bool
sortAndFlushInBackgroundActive atomic.Bool // allow only 1 bg sort per Collector

// sortValues causes flush to use SortByKeyAndValue() instead of Sort() for sortableBuffer.
// This enables AppendDup for DupSort tables where values need to be sorted (e.g. ii/history keys).
sortValues bool

allocator *Allocator
}

Expand All @@ -91,6 +95,23 @@ func (c *Collector) SortAndFlushInBackground(v bool) *Collector {
return c
}

func (c *Collector) SortValues(v bool) *Collector {
c.sortValues = v
return c
}

// sortBuf sorts the collector's buffer, using SortByKeyAndValue when sortValues is set
// and the buffer supports it, otherwise falling back to Sort().
func (c *Collector) sortBuf() {
if c.sortValues {
if sb, ok := c.buf.(*sortableBuffer); ok {
sb.SortByKeyAndValue()
return
}
}
c.buf.Sort()
}

func (c *Collector) extractNextFunc(originalK, k []byte, v []byte) error {
if c.buf == nil && c.allocator != nil {
c.buf = c.allocator.Get()
Expand Down Expand Up @@ -122,7 +143,7 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error {
}

if canStoreInRam && len(c.dataProviders) == 0 {
c.buf.Sort()
c.sortBuf()
provider := KeepInRAM(c.buf)
c.allFlushed = true
c.dataProviders = append(c.dataProviders, provider)
Expand All @@ -132,7 +153,7 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error {
// go bg - but without server overloading
doInBackground := c.sortAndFlushInBackground && c.sortAndFlushInBackgroundActive.CompareAndSwap(false, true)
if !doInBackground {
provider, err := FlushToDisk(c.logPrefix, c.buf, c.tmpdir, c.logLvl)
provider, err := FlushToDisk(c.logPrefix, c.buf, c.tmpdir, c.logLvl, c.sortValues)
if err != nil {
return err
}
Expand All @@ -149,7 +170,7 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error {
c.buf = getBufferByType(c.bufType, datasize.ByteSize(fullBuf.SizeLimit()))
c.buf.Prealloc(prevLen/8, prevSize/8)
}
provider, err := FlushToDiskAsync(c.logPrefix, fullBuf, c.tmpdir, c.logLvl, c.allocator, &c.sortAndFlushInBackgroundActive)
provider, err := FlushToDiskAsync(c.logPrefix, fullBuf, c.tmpdir, c.logLvl, c.allocator, &c.sortAndFlushInBackgroundActive, c.sortValues)
if err != nil {
return err
}
Expand Down Expand Up @@ -204,6 +225,9 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
var canUseAppend bool
isDupSort := kv.ChaindataTablesCfg[bucket].Flags&kv.DupSort != 0

// prevLoadK/prevLoadV track the previous (key, value) to skip duplicates for DupSort AppendDup.
// Put handles duplicates idempotently, but AppendDup rejects them with MDBX_EKEYMISMATCH.
var prevLoadK, prevLoadV []byte
i := 0
loadNextFunc := func(_, k, v []byte) error {
if i == 0 {
Expand All @@ -229,6 +253,13 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
}
if canUseAppend {
if isDupSort {
// Skip duplicate (key, value) pairs — data is sorted by (key, value),
// so duplicates are always adjacent.
if bytes.Equal(k, prevLoadK) && bytes.Equal(v, prevLoadV) {
return nil
}
prevLoadK = append(prevLoadK[:0], k...)
prevLoadV = append(prevLoadV[:0], v...)
if err := cursor.(kv.RwCursorDupSort).AppendDup(k, v); err != nil {
return fmt.Errorf("%s: bucket: %s, appendDup: k=%x, %w", c.logPrefix, bucket, k, err)
}
Expand All @@ -254,10 +285,10 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
simpleLoad := func(k, v []byte) error {
return loadFunc(k, v, currentTable, loadNextFunc)
}
if err := mergeSortFiles(c.logPrefix, c.dataProviders, simpleLoad, args, c.buf); err != nil {
heapSortValues := haveSortingGuaranties && isDupSort && c.sortValues
if err := mergeSortFiles(c.logPrefix, c.dataProviders, simpleLoad, args, c.buf, heapSortValues); err != nil {
return fmt.Errorf("loadIntoTable %s: %w", toBucket, err)
}
//logger.Trace(fmt.Sprintf("[%s] ETL Load done", c.logPrefix), "bucket", bucket, "records", i)
return nil
}

Expand Down Expand Up @@ -286,14 +317,14 @@ func (c *Collector) Close() {
// for the next item, which is then added back to the heap.
// The subsequent iterations pop the heap again and load up the provider associated with it to get the next element after processing LoadFunc.
// this continues until all providers have reached their EOF.
func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs, buf Buffer) (err error) {
func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs, buf Buffer, sortValues ...bool) (err error) {
for _, provider := range providers {
if err := provider.Wait(); err != nil {
return err
}
}

h := &Heap{}
h := &Heap{sortValues: len(sortValues) > 0 && sortValues[0]}
heapInit(h)
for i, provider := range providers {
if key, value, err := provider.Next(); err == nil {
Expand Down
20 changes: 14 additions & 6 deletions db/etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type mmapBytesReader struct {
}

// FlushToDiskAsync - `doFsync` is true only for 'critical' collectors (which should not loose).
func FlushToDiskAsync(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, allocator *Allocator, inProgress *atomic.Bool) (dataProvider, error) {
func FlushToDiskAsync(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, allocator *Allocator, inProgress *atomic.Bool, sortValues bool) (dataProvider, error) {
if b.Len() == 0 {
if allocator != nil {
allocator.Put(b)
Expand All @@ -70,7 +70,7 @@ func FlushToDiskAsync(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, al
}
inProgress.Store(false)
}()
provider.file, err = sortAndFlush(b, tmpdir)
provider.file, err = sortAndFlush(b, tmpdir, sortValues)
if err != nil {
return err
}
Expand All @@ -83,14 +83,14 @@ func FlushToDiskAsync(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, al
}

// FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose).
func FlushToDisk(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl) (dataProvider, error) {
func FlushToDisk(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, sortValues bool) (dataProvider, error) {
if b.Len() == 0 {
return nil, nil
}

var err error
provider := &fileDataProvider{wg: &errgroup.Group{}}
provider.file, err = sortAndFlush(b, tmpdir)
provider.file, err = sortAndFlush(b, tmpdir, sortValues)
if err != nil {
return nil, err
}
Expand All @@ -99,8 +99,16 @@ func FlushToDisk(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl) (dataPr
return provider, nil
}

func sortAndFlush(b Buffer, tmpdir string) (*os.File, error) {
b.Sort()
func sortAndFlush(b Buffer, tmpdir string, sortValues bool) (*os.File, error) {
if sortValues {
if sb, ok := b.(*sortableBuffer); ok {
sb.SortByKeyAndValue()
} else {
b.Sort()
}
} else {
b.Sort()
}

// if we are going to create files in the system temp dir, we don't need any
// subfolders.
Expand Down
Loading
Loading