diff --git a/db/etl/dataprovider.go b/db/etl/dataprovider.go index 0f366a93587..0673a5c2c21 100644 --- a/db/etl/dataprovider.go +++ b/db/etl/dataprovider.go @@ -29,7 +29,9 @@ import ( "golang.org/x/sync/errgroup" + "github.com/erigontech/erigon/common/dir" "github.com/erigontech/erigon/common/log/v3" + "github.com/erigontech/erigon/common/mmap" ) type dataProvider interface { @@ -40,10 +42,17 @@ type dataProvider interface { } type fileDataProvider struct { - file *os.File - reader io.Reader - byteReader io.ByteReader // Different interface to the same object as reader - wg *errgroup.Group + file *os.File + mmapReader *mmapBytesReader // zero-copy reader over mmap'd data + mmapData []byte // mmap'd file content + mmapHandle2 *[mmap.MaxMapSize]byte // pointer handle for cleanup + wg *errgroup.Group +} + +// mmapBytesReader tracks position for reading from mmap'd data +type mmapBytesReader struct { + data []byte // mmap'd file content + pos int // current read position } // FlushToDiskAsync - `doFsync` is true only for 'critical' collectors (which should not loose). @@ -55,7 +64,7 @@ func FlushToDiskAsync(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl, al return nil, nil } - provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}} + provider := &fileDataProvider{wg: &errgroup.Group{}} provider.wg.Go(func() (err error) { defer func() { if allocator != nil { @@ -82,7 +91,7 @@ func FlushToDisk(logPrefix string, b Buffer, tmpdir string, lvl log.Lvl) (dataPr } var err error - provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}} + provider := &fileDataProvider{wg: &errgroup.Group{}} provider.file, err = sortAndFlush(b, tmpdir) if err != nil { return nil, err @@ -118,77 +127,139 @@ func sortAndFlush(b Buffer, tmpdir string) (*os.File, error) { } func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) { - if p.reader == nil { - _, err := p.file.Seek(0, 0) + if p.mmapReader == nil { + // Get file size by seeking to end + size, err := p.file.Seek(0, io.SeekEnd) if err != nil { return nil, nil, err } - r := bufio.NewReaderSize(p.file, BufIOSize) - p.reader = r - p.byteReader = r + if size == 0 { + return nil, nil, io.EOF + } + + // Memory-map the file + p.mmapData, p.mmapHandle2, err = mmap.Mmap(p.file, int(size)) + if err != nil { + return nil, nil, fmt.Errorf("mmap failed: %w", err) + } + + // Set sequential read pattern for better performance + if err := mmap.MadviseSequential(p.mmapData); err != nil { + _ = mmap.Munmap(p.mmapData, p.mmapHandle2) + return nil, nil, fmt.Errorf("madvise sequential failed: %w", err) + } + + // Create zero-copy reader over mmap'd data + p.mmapReader = &mmapBytesReader{data: p.mmapData, pos: 0} + } + return readElementFromDiskZeroCopy(p.mmapReader) +} + +// ReadVarint decodes a signed varint directly from mmap data +func (m *mmapBytesReader) ReadVarint() (int64, error) { + v, n := binary.Varint(m.data[m.pos:]) + if n <= 0 { + if n == 0 { + return 0, io.EOF + } + return 0, fmt.Errorf("varint overflow") + } + m.pos += n + return v, nil +} +// ReadAt returns a slice directly from mmap data (zero-copy) at given length +// The returned slice points directly into the mmap'd memory +func (m *mmapBytesReader) ReadAt(length int) ([]byte, error) { + if m.pos+length > len(m.data) { + return nil, io.ErrUnexpectedEOF } - return readElementFromDisk(p.reader, p.byteReader, keyBuf, valBuf) + result := m.data[m.pos : m.pos+length] + m.pos += length + return result, nil } func (p *fileDataProvider) Wait() error { return p.wg.Wait() } func (p *fileDataProvider) Dispose() { - if p.file != nil { //invariant: safe to call multiple time - p.Wait() - file := p.file - p.file = nil - - go func() { - filePath := file.Name() - file.Close() - _ = dir.RemoveFile(filePath) - }() + if p.file == nil { + return } + + p.Wait() + + filePath := p.file.Name() + p.file.Close() + p.file = nil + _ = dir.RemoveFile(filePath) + + // Note: We intentionally do NOT munmap here. The mmap'd memory remains mapped + // and valid for zero-copy slices returned to callers. The OS will unmap when + // the process exits or memory pressure requires it. This is safe for the ETL + // use case where data is consumed immediately before Close() is called. } func (p *fileDataProvider) String() string { return fmt.Sprintf("%T(file: %s)", p, p.file.Name()) } +// readElementFromDisk reads key-value pairs from an io.Reader for testing func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ([]byte, []byte, error) { n, err := binary.ReadVarint(br) if err != nil { return nil, nil, err } + + var key []byte if n >= 0 { - // Reallocate the slice or extend it if there is enough capacity - if keyBuf == nil || len(keyBuf)+int(n) > cap(keyBuf) { - newKeyBuf := make([]byte, len(keyBuf)+int(n)) - copy(newKeyBuf, keyBuf) - keyBuf = newKeyBuf - } else { - keyBuf = keyBuf[:len(keyBuf)+int(n)] - } - if _, err = io.ReadFull(r, keyBuf[len(keyBuf)-int(n):]); err != nil { + key = make([]byte, n) + if _, err = io.ReadFull(r, key); err != nil { return nil, nil, err } - } else { - keyBuf = nil } - if n, err = binary.ReadVarint(br); err != nil { + + n, err = binary.ReadVarint(br) + if err != nil { return nil, nil, err } + + var val []byte if n >= 0 { - // Reallocate the slice or extend it if there is enough capacity - if valBuf == nil || len(valBuf)+int(n) > cap(valBuf) { - newValBuf := make([]byte, len(valBuf)+int(n)) - copy(newValBuf, valBuf) - valBuf = newValBuf - } else { - valBuf = valBuf[:len(valBuf)+int(n)] + val = make([]byte, n) + if _, err = io.ReadFull(r, val); err != nil { + return nil, nil, err } - if _, err = io.ReadFull(r, valBuf[len(valBuf)-int(n):]); err != nil { + } + + return key, val, nil +} + +// readElementFromDiskZeroCopy reads key-value pairs directly from mmap'd data +func readElementFromDiskZeroCopy(m *mmapBytesReader) ([]byte, []byte, error) { + keyLen, err := m.ReadVarint() + if err != nil { + return nil, nil, err + } + + var keyBuf []byte + if keyLen >= 0 { + if keyBuf, err = m.ReadAt(int(keyLen)); err != nil { + return nil, nil, err + } + } + + valLen, err := m.ReadVarint() + if err != nil { + return nil, nil, err + } + + var valBuf []byte + if valLen >= 0 { + if valBuf, err = m.ReadAt(int(valLen)); err != nil { return nil, nil, err } - } else { - valBuf = nil } - return keyBuf, valBuf, err + + return keyBuf, valBuf, nil } type memoryDataProvider struct {