Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 116 additions & 45 deletions db/etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
"path/filepath"
"sync/atomic"

"github.com/erigontech/erigon/common/dir"

Check failure on line 28 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (macos-15)

other declaration of dir

Check failure on line 28 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-windows (windows-2025)

other declaration of dir

Check failure on line 28 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (ubuntu-24.04)

other declaration of dir

"golang.org/x/sync/errgroup"

"github.com/erigontech/erigon/common/dir"

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (macos-15)

"github.com/erigontech/erigon/common/dir" imported and not used

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (macos-15)

dir redeclared in this block

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-windows (windows-2025)

"github.com/erigontech/erigon/common/dir" imported and not used

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-windows (windows-2025)

dir redeclared in this block

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (ubuntu-24.04)

"github.com/erigontech/erigon/common/dir" imported and not used

Check failure on line 32 in db/etl/dataprovider.go

View workflow job for this annotation

GitHub Actions / tests-mac-linux (ubuntu-24.04)

dir redeclared in this block
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/common/mmap"
)

type dataProvider interface {
Expand All @@ -40,10 +42,17 @@
}

type fileDataProvider struct {
file *os.File
reader io.Reader
byteReader io.ByteReader // Different interface to the same object as reader
wg *errgroup.Group
file *os.File
mmapReader *mmapBytesReader // zero-copy reader over mmap'd data
mmapData []byte // mmap'd file content
mmapHandle2 *[mmap.MaxMapSize]byte // pointer handle for cleanup
wg *errgroup.Group
}

// mmapBytesReader tracks position for reading from mmap'd data
type mmapBytesReader struct {
data []byte // mmap'd file content
pos int // current read position
}

// FlushToDiskAsync - `doFsync` is true only for 'critical' collectors (which should not loose).
Expand All @@ -55,7 +64,7 @@
return nil, nil
}

provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}}
provider := &fileDataProvider{wg: &errgroup.Group{}}
provider.wg.Go(func() (err error) {
defer func() {
if allocator != nil {
Expand All @@ -82,7 +91,7 @@
}

var err error
provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}}
provider := &fileDataProvider{wg: &errgroup.Group{}}
provider.file, err = sortAndFlush(b, tmpdir)
if err != nil {
return nil, err
Expand Down Expand Up @@ -118,77 +127,139 @@
}

func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) {
if p.reader == nil {
_, err := p.file.Seek(0, 0)
if p.mmapReader == nil {
// Get file size by seeking to end
size, err := p.file.Seek(0, io.SeekEnd)
if err != nil {
return nil, nil, err
}
r := bufio.NewReaderSize(p.file, BufIOSize)
p.reader = r
p.byteReader = r
if size == 0 {
return nil, nil, io.EOF
}

// Memory-map the file
p.mmapData, p.mmapHandle2, err = mmap.Mmap(p.file, int(size))
if err != nil {
return nil, nil, fmt.Errorf("mmap failed: %w", err)
}

// Set sequential read pattern for better performance
if err := mmap.MadviseSequential(p.mmapData); err != nil {
_ = mmap.Munmap(p.mmapData, p.mmapHandle2)
return nil, nil, fmt.Errorf("madvise sequential failed: %w", err)
}

// Create zero-copy reader over mmap'd data
p.mmapReader = &mmapBytesReader{data: p.mmapData, pos: 0}
}
return readElementFromDiskZeroCopy(p.mmapReader)
}

// ReadVarint decodes a signed varint directly from mmap data
func (m *mmapBytesReader) ReadVarint() (int64, error) {
v, n := binary.Varint(m.data[m.pos:])
if n <= 0 {
if n == 0 {
return 0, io.EOF
}
return 0, fmt.Errorf("varint overflow")
}
m.pos += n
return v, nil
}

// ReadAt returns a slice directly from mmap data (zero-copy) at given length
// The returned slice points directly into the mmap'd memory
func (m *mmapBytesReader) ReadAt(length int) ([]byte, error) {
if m.pos+length > len(m.data) {
return nil, io.ErrUnexpectedEOF
}
return readElementFromDisk(p.reader, p.byteReader, keyBuf, valBuf)
result := m.data[m.pos : m.pos+length]
m.pos += length
return result, nil
}

func (p *fileDataProvider) Wait() error { return p.wg.Wait() }
func (p *fileDataProvider) Dispose() {
if p.file != nil { //invariant: safe to call multiple time
p.Wait()
file := p.file
p.file = nil

go func() {
filePath := file.Name()
file.Close()
_ = dir.RemoveFile(filePath)
}()
if p.file == nil {
return
}

p.Wait()

filePath := p.file.Name()
p.file.Close()
p.file = nil
_ = dir.RemoveFile(filePath)

// Note: We intentionally do NOT munmap here. The mmap'd memory remains mapped
// and valid for zero-copy slices returned to callers. The OS will unmap when
// the process exits or memory pressure requires it. This is safe for the ETL
// use case where data is consumed immediately before Close() is called.
}

func (p *fileDataProvider) String() string {
return fmt.Sprintf("%T(file: %s)", p, p.file.Name())
}

// readElementFromDisk reads key-value pairs from an io.Reader for testing
func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ([]byte, []byte, error) {
n, err := binary.ReadVarint(br)
if err != nil {
return nil, nil, err
}

var key []byte
if n >= 0 {
// Reallocate the slice or extend it if there is enough capacity
if keyBuf == nil || len(keyBuf)+int(n) > cap(keyBuf) {
newKeyBuf := make([]byte, len(keyBuf)+int(n))
copy(newKeyBuf, keyBuf)
keyBuf = newKeyBuf
} else {
keyBuf = keyBuf[:len(keyBuf)+int(n)]
}
if _, err = io.ReadFull(r, keyBuf[len(keyBuf)-int(n):]); err != nil {
key = make([]byte, n)
if _, err = io.ReadFull(r, key); err != nil {
return nil, nil, err
}
} else {
keyBuf = nil
}
if n, err = binary.ReadVarint(br); err != nil {

n, err = binary.ReadVarint(br)
if err != nil {
return nil, nil, err
}

var val []byte
if n >= 0 {
// Reallocate the slice or extend it if there is enough capacity
if valBuf == nil || len(valBuf)+int(n) > cap(valBuf) {
newValBuf := make([]byte, len(valBuf)+int(n))
copy(newValBuf, valBuf)
valBuf = newValBuf
} else {
valBuf = valBuf[:len(valBuf)+int(n)]
val = make([]byte, n)
if _, err = io.ReadFull(r, val); err != nil {
return nil, nil, err
}
if _, err = io.ReadFull(r, valBuf[len(valBuf)-int(n):]); err != nil {
}

return key, val, nil
}

// readElementFromDiskZeroCopy reads key-value pairs directly from mmap'd data
func readElementFromDiskZeroCopy(m *mmapBytesReader) ([]byte, []byte, error) {
keyLen, err := m.ReadVarint()
if err != nil {
return nil, nil, err
}

var keyBuf []byte
if keyLen >= 0 {
if keyBuf, err = m.ReadAt(int(keyLen)); err != nil {
return nil, nil, err
}
}

valLen, err := m.ReadVarint()
if err != nil {
return nil, nil, err
}

var valBuf []byte
if valLen >= 0 {
if valBuf, err = m.ReadAt(int(valLen)); err != nil {
return nil, nil, err
}
} else {
valBuf = nil
}
return keyBuf, valBuf, err

return keyBuf, valBuf, nil
}

type memoryDataProvider struct {
Expand Down
Loading