Skip to content

Commit cdb5451

Browse files
PagedWriter: parallel compress (#19503)
1 parent 9505b4f commit cdb5451

File tree

12 files changed

+330
-31
lines changed

12 files changed

+330
-31
lines changed

db/seg/seg_paged_rw.go

Lines changed: 239 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ package seg
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"encoding/binary"
2223
"fmt"
2324
"io"
25+
"sync"
2426

27+
"golang.org/x/sync/errgroup"
28+
29+
"github.com/erigontech/erigon/common/dbg"
2530
"github.com/erigontech/erigon/db/compress"
2631
)
2732

@@ -117,6 +122,16 @@ func WordsAmount2PagesAmount(wordsAmount int, pageSize int) (pagesAmount int) {
117122
return pagesAmount
118123
}
119124

125+
type pageWorkItem struct {
126+
seq int
127+
uncompressedData []byte // copy of uncompressed page; returned to pool after compression
128+
}
129+
130+
type pageResult struct {
131+
seq int
132+
data []byte // compressed page; returned to pool after write
133+
}
134+
120135
type PagedReader struct {
121136
file ReaderI
122137
isCompressed bool
@@ -227,12 +242,22 @@ func (g *PagedReader) Skip() (uint64, int) {
227242
return offset, len(v)
228243
}
229244

230-
func NewPagedWriter(parent CompressorI, compressionEnabled bool) *PagedWriter {
231-
return &PagedWriter{
245+
var workers = dbg.EnvInt("PAGED_WRITER_WORKERS", 1)
246+
247+
func NewPagedWriter(ctx context.Context, parent CompressorI, compressionEnabled bool) *PagedWriter {
248+
pw := &PagedWriter{
232249
parent: parent,
233250
pageSize: parent.GetValuesOnCompressedPage(),
234251
compressionEnabled: compressionEnabled,
252+
ctx: ctx,
253+
numWorkers: workers, //TODO: accept it as a parameter in next PR
235254
}
255+
if compressionEnabled && pw.pageSize > 1 {
256+
if pw.numWorkers > 1 {
257+
pw.initWorkers()
258+
}
259+
}
260+
return pw
236261
}
237262

238263
type CompressorI interface {
@@ -254,9 +279,104 @@ type PagedWriter struct {
254279
compressionEnabled bool
255280

256281
pairs int
282+
283+
numWorkers int
284+
workCh chan *pageWorkItem
285+
resultCh chan *pageResult
286+
eg *errgroup.Group // tracks workers + reducer; cancels all on first error
287+
egCtx context.Context // cancelled on first worker/reducer error
288+
seqIn int // next seq to assign to work item
289+
seqOut int // next seq to write to parent
290+
workersShutdown bool // tracks if workers have been shut down
291+
pendingResults map[int]*pageResult // out-of-order results waiting for seqOut
292+
ctx context.Context // caller context for cancellation
293+
294+
// Metrics (optional, for diagnostics and testing)
295+
pagesCompressed int // number of pages processed through workers
296+
}
297+
298+
func (c *PagedWriter) initWorkers() {
299+
queueDepth := c.numWorkers * 2
300+
c.workCh = make(chan *pageWorkItem, queueDepth)
301+
c.resultCh = make(chan *pageResult, queueDepth)
302+
c.pendingResults = make(map[int]*pageResult, queueDepth)
303+
c.eg, c.egCtx = errgroup.WithContext(c.ctx)
304+
305+
var workerWg sync.WaitGroup
306+
workerWg.Add(c.numWorkers)
307+
for range c.numWorkers {
308+
c.eg.Go(func() error {
309+
defer workerWg.Done()
310+
return c.compressionWorker(c.egCtx)
311+
})
312+
}
313+
go func() { workerWg.Wait(); close(c.resultCh) }()
314+
c.eg.Go(c.reducer)
315+
}
316+
317+
func (c *PagedWriter) compressionWorker(ctx context.Context) error {
318+
processItem := func(item *pageWorkItem) {
319+
defer putPageWorkItem(item)
320+
321+
result := getPageResult()
322+
result.seq = item.seq
323+
324+
// Compress directly into result.data (no extra copy, each result owns its buffer)
325+
_, result.data = compress.EncodeZstdIfNeed(result.data[:0], item.uncompressedData, c.compressionEnabled)
326+
327+
// Send result, respecting context cancellation
328+
select {
329+
case c.resultCh <- result:
330+
case <-ctx.Done():
331+
putPageResult(result)
332+
}
333+
}
334+
335+
for {
336+
select {
337+
case item, ok := <-c.workCh:
338+
if !ok {
339+
return nil // channel closed
340+
}
341+
processItem(item)
342+
343+
case <-ctx.Done():
344+
return ctx.Err()
345+
}
346+
}
257347
}
258348

259-
func (c *PagedWriter) Empty() bool { return c.pairs == 0 }
349+
func (c *PagedWriter) reducer() error {
350+
for r := range c.resultCh {
351+
c.pendingResults[r.seq] = r
352+
if err := c.writeInOrder(); err != nil {
353+
drainResultCh(c.resultCh)
354+
drainPendingResults(c.pendingResults)
355+
return err
356+
}
357+
}
358+
return nil
359+
}
360+
361+
func (c *PagedWriter) writeInOrder() error {
362+
for {
363+
r, ok := c.pendingResults[c.seqOut]
364+
if !ok {
365+
return nil
366+
}
367+
if _, err := c.parent.Write(r.data); err != nil {
368+
return err
369+
}
370+
delete(c.pendingResults, c.seqOut)
371+
putPageResult(r)
372+
c.seqOut++
373+
c.pagesCompressed++
374+
}
375+
}
376+
377+
func (c *PagedWriter) Empty() bool { return c.pairs == 0 }
378+
func (c *PagedWriter) IsAsyncCompression() bool { return c.numWorkers > 1 }
379+
func (c *PagedWriter) PagesCompressed() int { return c.pagesCompressed }
260380
func (c *PagedWriter) Close() {
261381
c.parent.Close()
262382
}
@@ -289,18 +409,42 @@ func (c *PagedWriter) writePage() error {
289409
return err
290410
}
291411

292-
uncompressedPage, ok := c.bytesUncompressed()
412+
// Synchronous path (single-threaded or disabled workers)
413+
if c.numWorkers <= 1 {
414+
uncompressedPage, ok := c.bytesUncompressed()
415+
c.resetPage()
416+
if !ok {
417+
return nil
418+
}
419+
420+
var compressedPage []byte
421+
c.compressionBuf, compressedPage = compress.EncodeZstdIfNeed(c.compressionBuf[:0], uncompressedPage, c.compressionEnabled)
422+
if _, err := c.parent.Write(compressedPage); err != nil {
423+
return err
424+
}
425+
return nil
426+
}
427+
428+
// Async path with parallel workers
429+
item := getPageWorkItem()
430+
var ok bool
431+
item.uncompressedData, ok = c.bytesUncompressedTo(item.uncompressedData)
293432
c.resetPage()
294433
if !ok {
434+
putPageWorkItem(item)
295435
return nil
296436
}
297437

298-
var compressedPage []byte
299-
c.compressionBuf, compressedPage = compress.EncodeZstdIfNeed(c.compressionBuf[:0], uncompressedPage, c.compressionEnabled)
300-
if _, err := c.parent.Write(compressedPage); err != nil {
301-
return err
438+
item.seq = c.seqIn
439+
c.seqIn++
440+
441+
select {
442+
case c.workCh <- item:
443+
return nil
444+
case <-c.egCtx.Done():
445+
putPageWorkItem(item)
446+
return c.egCtx.Err()
302447
}
303-
return nil
304448
}
305449

306450
func (c *PagedWriter) Add(k, v []byte) (err error) {
@@ -329,8 +473,24 @@ func (c *PagedWriter) Flush() error {
329473
if c.pageSize <= 1 {
330474
return nil
331475
}
332-
defer c.resetPage()
333-
return c.writePage()
476+
// Flush partial page
477+
if err := c.writePage(); err != nil {
478+
return err
479+
}
480+
if c.numWorkers <= 1 {
481+
c.resetPage()
482+
return nil
483+
}
484+
// Signal workers to stop; reducer drains resultCh and writes in order
485+
if !c.workersShutdown {
486+
close(c.workCh)
487+
c.workersShutdown = true
488+
}
489+
defer func() {
490+
c.eg.Wait() //nolint:errcheck
491+
c.resetPage()
492+
}()
493+
return c.eg.Wait()
334494
}
335495

336496
func (c *PagedWriter) bytesUncompressed() (wholePage []byte, notEmpty bool) {
@@ -359,6 +519,34 @@ func (c *PagedWriter) bytesUncompressed() (wholePage []byte, notEmpty bool) {
359519
return wholePage, true
360520
}
361521

522+
// bytesUncompressedTo encodes page into external buffer (no internal state modification)
523+
func (c *PagedWriter) bytesUncompressedTo(buf []byte) (wholePage []byte, notEmpty bool) {
524+
if len(c.kLengths) == 0 {
525+
return nil, false
526+
}
527+
528+
// Encode page metadata and keys/values into provided buffer
529+
neededSize := 1 + len(c.kLengths)*2*4 + len(c.keys) + len(c.vals)
530+
wholePage = growslice(buf[:0], neededSize)
531+
clear(wholePage)
532+
533+
wholePage[0] = uint8(len(c.kLengths)) // first byte is amount of vals
534+
lensBuf := wholePage[1:]
535+
for i, l := range c.kLengths {
536+
binary.BigEndian.PutUint32(lensBuf[i*4:(i+1)*4], uint32(l))
537+
}
538+
lensBuf = lensBuf[len(c.kLengths)*4:]
539+
for i, l := range c.vLengths {
540+
binary.BigEndian.PutUint32(lensBuf[i*4:(i+1)*4], uint32(l))
541+
}
542+
543+
// Append keys and values without modifying internal state
544+
wholePage = append(wholePage, c.keys...)
545+
wholePage = append(wholePage, c.vals...)
546+
547+
return wholePage, true
548+
}
549+
362550
func (c *PagedWriter) bytes() (wholePage []byte, notEmpty bool) {
363551
//TODO: alignment,compress+alignment
364552
wholePage, notEmpty = c.bytesUncompressed()
@@ -391,3 +579,43 @@ func growslice(b []byte, wantLength int) []byte {
391579
}
392580
return make([]byte, wantLength)
393581
}
582+
583+
// Global pools for page work items and results - optimized for GC
584+
var (
585+
pageWorkItemPool = sync.Pool{New: func() any { return &pageWorkItem{} }}
586+
pageResultPool = sync.Pool{New: func() any { return &pageResult{} }}
587+
)
588+
589+
func getPageWorkItem() *pageWorkItem { return pageWorkItemPool.Get().(*pageWorkItem) }
590+
func putPageWorkItem(item *pageWorkItem) {
591+
if item == nil {
592+
return
593+
}
594+
item.seq = 0
595+
item.uncompressedData = item.uncompressedData[:0]
596+
pageWorkItemPool.Put(item)
597+
}
598+
599+
func getPageResult() *pageResult { return pageResultPool.Get().(*pageResult) }
600+
func putPageResult(r *pageResult) {
601+
r.seq = 0
602+
r.data = r.data[:0]
603+
pageResultPool.Put(r)
604+
}
605+
606+
func drainResultCh(ch chan *pageResult) {
607+
for {
608+
select {
609+
case r := <-ch:
610+
putPageResult(r)
611+
default:
612+
return
613+
}
614+
}
615+
}
616+
617+
func drainPendingResults(m map[int]*pageResult) {
618+
for _, r := range m {
619+
putPageResult(r)
620+
}
621+
}

0 commit comments

Comments
 (0)