Skip to content

Commit f660b23

Browse files
committed
save
1 parent fb8ca00 commit f660b23

File tree

1 file changed

+24
-19
lines changed

1 file changed

+24
-19
lines changed

db/seg/seg_paged_rw.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"io"
2525
"sync"
2626

27+
"golang.org/x/sync/errgroup"
28+
2729
"github.com/erigontech/erigon/common/dbg"
2830
"github.com/erigontech/erigon/db/compress"
2931
)
@@ -43,8 +45,13 @@ func returnIfNotNil(item *pageWorkItem) {
4345
}
4446
}
4547

46-
func getPageResult() *pageResult { return pageResultPool.Get().(*pageResult) }
47-
func putPageResult(r *pageResult) { pageResultPool.Put(r) }
48+
func getPageResult() *pageResult { return pageResultPool.Get().(*pageResult) }
49+
func putPageResult(r *pageResult) {
50+
r.seq = 0
51+
r.data = r.data[:0]
52+
r.err = nil
53+
pageResultPool.Put(r)
54+
}
4855

4956
func GetFromPage(key, compressedPage []byte, compressionBuf []byte, compressionEnabled bool) (v []byte, compressionBufOut []byte) {
5057
var err error
@@ -298,12 +305,12 @@ type PagedWriter struct {
298305
numWorkers int
299306
workCh chan *pageWorkItem
300307
resultCh chan *pageResult
301-
wg sync.WaitGroup // tracks live worker goroutines
308+
eg *errgroup.Group // tracks live worker goroutines; cancels all on first error
302309
seqIn int // next seq to assign to work item
303310
seqOut int // next seq to write to parent
304311
workersShutdown bool // tracks if workers have been shut down
305312
pendingResults map[int]*pageResult // out-of-order results waiting for seqOut
306-
ctx context.Context // optional context for cancellation
313+
ctx context.Context // caller context for cancellation
307314

308315
// Metrics (optional, for diagnostics and testing)
309316
pagesCompressed int // number of pages processed through workers
@@ -314,43 +321,41 @@ func (c *PagedWriter) initWorkers() {
314321
c.workCh = make(chan *pageWorkItem, queueDepth)
315322
c.resultCh = make(chan *pageResult, queueDepth)
316323
c.pendingResults = make(map[int]*pageResult, queueDepth)
317-
c.wg.Add(c.numWorkers)
324+
var egCtx context.Context
325+
c.eg, egCtx = errgroup.WithContext(c.ctx)
318326
for range c.numWorkers {
319-
go c.compressionWorker()
327+
c.eg.Go(func() error { return c.compressionWorker(egCtx) })
320328
}
321329
}
322330

323-
func (c *PagedWriter) compressionWorker() {
324-
defer c.wg.Done()
325-
331+
func (c *PagedWriter) compressionWorker(ctx context.Context) error {
326332
processItem := func(item *pageWorkItem) {
327333
defer pageWorkItemPool.Put(item)
328334

329335
result := getPageResult()
330336
result.seq = item.seq
331-
result.err = nil
332337

333338
// Compress directly into result.data (no extra copy, each result owns its buffer)
334-
result.data, _ = compress.EncodeZstdIfNeed(result.data[:0], item.uncompressedData, c.compressionEnabled)
339+
_, result.data = compress.EncodeZstdIfNeed(result.data[:0], item.uncompressedData, c.compressionEnabled)
335340

336341
// Send result, respecting context cancellation
337342
select {
338343
case c.resultCh <- result:
339-
case <-c.ctx.Done():
340-
putPageResult(result) // return result to pool if context cancelled
344+
case <-ctx.Done():
345+
putPageResult(result)
341346
}
342347
}
343348

344349
for {
345350
select {
346351
case item, ok := <-c.workCh:
347352
if !ok {
348-
return // channel closed
353+
return nil // channel closed
349354
}
350355
processItem(item)
351356

352-
case <-c.ctx.Done():
353-
return // context cancelled
357+
case <-ctx.Done():
358+
return ctx.Err()
354359
}
355360
}
356361
}
@@ -508,7 +513,7 @@ func (c *PagedWriter) Flush() error {
508513
c.workersShutdown = true
509514
}
510515
defer func() {
511-
c.wg.Wait() // ensure all worker goroutines have exited, even on error
516+
c.eg.Wait() //nolint:errcheck // ensure all worker goroutines have exited, even on error
512517
c.resetPage()
513518
}()
514519

@@ -520,10 +525,10 @@ func (c *PagedWriter) Flush() error {
520525
return err
521526
}
522527
case <-c.ctx.Done():
523-
return c.ctx.Err()
528+
return c.eg.Wait()
524529
}
525530
}
526-
return nil
531+
return c.eg.Wait()
527532
}
528533

529534
func (c *PagedWriter) bytesUncompressed() (wholePage []byte, notEmpty bool) {

0 commit comments

Comments
 (0)