Skip to content
6 changes: 3 additions & 3 deletions db/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func NewAsyncTx(tx kv.Tx, queueSize int) *asyncTx {
}

func (a *asyncTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
rc := make(chan error)
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
a.requests <- &applyTx{rc, a.Tx, f}
select {
case err := <-rc:
Expand All @@ -843,7 +843,7 @@ func NewAsyncRwTx(tx kv.RwTx, queueSize int) *asyncRwTx {
}

func (a *asyncRwTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
rc := make(chan error)
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
a.requests <- &applyTx{rc, a.RwTx, f}
select {
case err := <-rc:
Expand All @@ -854,7 +854,7 @@ func (a *asyncRwTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
}

func (a *asyncRwTx) ApplyRw(ctx context.Context, f func(kv.RwTx) error) error {
rc := make(chan error)
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
a.requests <- &applyRwTx{rc, a.RwTx, f}
select {
case err := <-rc:
Expand Down
10 changes: 9 additions & 1 deletion db/state/dirty_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,16 @@ func (i *FilesItem) closeFilesAndRemove() {
}
}

var filterDirtyFilesReCache sync.Map // pattern string → *regexp.Regexp

func filterDirtyFiles(fileNames []string, stepSize, stepsInFrozenFile uint64, filenameBase, ext string, logger log.Logger) (res []*FilesItem) {
re := regexp.MustCompile(`^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`)
pattern := `^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`
reVal, ok := filterDirtyFilesReCache.Load(pattern)
if !ok {
re := regexp.MustCompile(pattern)
reVal, _ = filterDirtyFilesReCache.LoadOrStore(pattern, re)
}
re := reVal.(*regexp.Regexp)
var err error

for _, name := range fileNames {
Expand Down
37 changes: 34 additions & 3 deletions execution/exec/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,41 @@ type QueueWithRetry struct {
capacity int
}

var queuePool sync.Pool

func NewQueueWithRetry(capacity int) *QueueWithRetry {
if v := queuePool.Get(); v != nil {
q := v.(*QueueWithRetry)
if q.capacity == capacity && q.newTasks != nil {
return q
}
// If capacity is wrong or channel is nil, we don't put it back here;
// another Get() might return a valid one, or we'll allocate a new one.
}
return &QueueWithRetry{newTasks: make(chan Task, capacity), capacity: capacity}
}

// Release drains the queue and returns it to the pool for reuse.
// The channel is preserved (not closed), avoiding reallocation of the
// 100K-element buffer on the next NewQueueWithRetry call.
// Must be called only after all producers and consumers have stopped.
func (q *QueueWithRetry) Release() {
q.lock.Lock()
if q.newTasks == nil {
q.lock.Unlock()
return
}
// Drain channel.
for len(q.newTasks) > 0 {
<-q.newTasks
}
// Clear retry heap, keep backing array.
q.retires = q.retires[:0]
q.closed = false
q.lock.Unlock()
queuePool.Put(q)
}

func (q *QueueWithRetry) NewTasksLen() int {
q.lock.Lock()
defer q.lock.Unlock()
Expand Down Expand Up @@ -733,7 +764,7 @@ func (q *QueueWithRetry) Add(ctx context.Context, t Task) {
newTasks := q.newTasks
q.lock.Unlock()

if !closed {
if !closed && newTasks != nil {
select {
case <-ctx.Done():
return
Expand All @@ -747,12 +778,12 @@ func (q *QueueWithRetry) Add(ctx context.Context, t Task) {
// No limit on amount of txs added by this method.
func (q *QueueWithRetry) ReTry(t Task) {
q.lock.Lock()
if q.closed {
newTasks := q.newTasks
if q.closed || newTasks == nil {
q.lock.Unlock()
return
}
heap.Push(&q.retires, t)
newTasks := q.newTasks
q.lock.Unlock()
select {
case newTasks <- nil:
Expand Down
42 changes: 40 additions & 2 deletions execution/execmodule/execmoduletester/exec_module_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/big"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -148,9 +149,26 @@ type ExecModuleTester struct {

func (emt *ExecModuleTester) Close() {
emt.cancel()
if err := emt.bgComponentsEg.Wait(); err != nil {
require.Equal(emt.tb, context.Canceled, err) // upon waiting for clean exit we should get ctx cancelled

done := make(chan error, 1)
go func() {
done <- emt.bgComponentsEg.Wait()
}()

select {
case err := <-done:
if err != nil {
require.Equal(emt.tb, context.Canceled, err) // upon waiting for clean exit we should get ctx cancelled
}
case <-time.After(30 * time.Second):
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, true)
fmt.Fprintf(os.Stderr, "\n=== ExecModuleTester.Close() hung for 30s ===\nAll goroutines (%d bytes):\n%s\n=== END goroutine dump ===\n", n, buf[:n])
if err := <-done; err != nil {
require.Equal(emt.tb, context.Canceled, err)
}
}

if emt.Engine != nil {
emt.Engine.Close()
}
Expand Down Expand Up @@ -452,6 +470,26 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
// Wait for all the background snapshot retirements launched by any stages2.StageLoopIteration to finish
mock.retirementWg.Wait()
})

// Cancel mock.Ctx before the test binary deadline so that background
// goroutines (sentry pump loops, exec workers) exit on their own even
// if the test function is stuck. This keeps the timeout goroutine dump
// clean: only the truly deadlocked goroutines remain.
if t, ok := tb.(*testing.T); ok {
if deadline, ok := t.Deadline(); ok {
if remaining := time.Until(deadline) - 5*time.Second; remaining > 0 {
go func() {
timer := time.NewTimer(remaining)
defer timer.Stop()
select {
case <-timer.C:
ctxCancel()
case <-ctx.Done():
}
}()
}
}
}
}

// Committed genesis will be shared between download and mock sentry
Expand Down
132 changes: 129 additions & 3 deletions execution/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@ func (pe *parallelExecutor) execLoop(ctx context.Context) (err error) {
applyTx := pe.applyTx
pe.RUnlock()

stallTimer := time.NewTimer(30 * time.Second)
defer stallTimer.Stop()

for {
err := func() error {
pe.Lock()
Expand Down Expand Up @@ -594,20 +597,26 @@ func (pe *parallelExecutor) execLoop(ctx context.Context) (err error) {
if err := pe.processRequest(ctx, exec); err != nil {
return err
}
stallTimer.Reset(30 * time.Second)
continue
case <-ctx.Done():
return ctx.Err()
case nextResult, ok := <-pe.rws.ResultCh():
if !ok {
return nil
}
stallTimer.Reset(30 * time.Second)
closed, err := pe.rws.Drain(ctx, nextResult)
if err != nil {
return err
}
if closed {
return nil
}
case <-stallTimer.C:
pe.dumpStallState()
stallTimer.Reset(30 * time.Second)
continue
}

blockResult, err := pe.processResults(ctx, applyTx)
Expand Down Expand Up @@ -905,15 +914,17 @@ func (pe *parallelExecutor) run(ctx context.Context) (context.Context, context.C

pe.execLoopGroup.Go(func() error {
defer pe.rws.Close()
defer pe.in.Close()
defer execLoopCtxCancel() // cancel workers' context when exec loop exits
pe.resetWorkers(execLoopCtx, pe.rs, nil)
return pe.execLoop(execLoopCtx)
})

return execLoopCtx, func() {
execLoopCtxCancel()
defer pe.stopWorkers()
defer pe.in.Close()
defer func() {
pe.stopWorkers()
pe.in.Release()
}()

if err := pe.wait(ctx); err != nil {
pe.logger.Debug("exec loop cancel failed", "err", err)
Expand Down Expand Up @@ -946,6 +957,70 @@ func (pe *parallelExecutor) wait(ctx context.Context) error {
}
}

func (pe *parallelExecutor) dumpStallState() {
pe.RLock()
defer pe.RUnlock()

if len(pe.blockExecutors) == 0 {
pe.logger.Warn("[parallel-exec] execLoop stalled for 30s, no active blockExecutors",
"workerQueueLen", pe.in.Len(),
"resultsQueueLen", pe.rws.Len(),
)
return
}

for blockNum, be := range pe.blockExecutors {
maxValidated := be.validateTasks.maxComplete()
maxExecComplete := be.execTasks.maxComplete()

pe.logger.Warn("[parallel-exec] execLoop stalled for 30s — blockExecutor state",
"block", blockNum,
"totalTasks", len(be.tasks),
"maxValidated", maxValidated,
"maxExecComplete", maxExecComplete,
"execPending", fmt.Sprint(be.execTasks.pending),
"execInProgress", fmt.Sprint(be.execTasks.inProgress),
"execComplete", len(be.execTasks.complete),
"valPending", len(be.validateTasks.pending),
"valInProgress", len(be.validateTasks.inProgress),
"valComplete", len(be.validateTasks.complete),
"pubPending", len(be.publishTasks.pending),
"pubComplete", len(be.publishTasks.complete),
"workerQueueLen", pe.in.Len(),
"resultsQueueLen", pe.rws.Len(),
)

// Dump details for each pending exec task
for _, tx := range be.execTasks.pending {
if tx >= 0 && tx < len(be.tasks) {
blockers := be.execTasks.blocker[tx]
pe.logger.Warn("[parallel-exec] pending task detail",
"block", blockNum,
"taskIdx", tx,
"incarnation", be.txIncarnations[tx],
"aborted", be.execAborted[tx],
"failed", be.execFailed[tx],
"hasReads", be.blockIO.HasReads(be.tasks[tx].Version().TxIndex),
"blockedBy", fmt.Sprint(blockers),
)
}
}

// Also dump in-progress tasks that might be stuck
for _, tx := range be.execTasks.inProgress {
if tx >= 0 && tx < len(be.tasks) {
pe.logger.Warn("[parallel-exec] in-progress task detail",
"block", blockNum,
"taskIdx", tx,
"incarnation", be.txIncarnations[tx],
"aborted", be.execAborted[tx],
"failed", be.execFailed[tx],
)
}
}
}
}

type applyResult any

type blockResult struct {
Expand Down Expand Up @@ -1746,6 +1821,7 @@ func (be *blockExecutor) scheduleExecution(ctx context.Context, pe *parallelExec
}

maxValidated := be.validateTasks.maxComplete()
var scheduled, skipped int
for i := 0; i < len(toExecute); i++ {
nextTx := toExecute[i]
execTask := be.tasks[nextTx]
Expand All @@ -1763,7 +1839,13 @@ func (be *blockExecutor) scheduleExecution(ctx context.Context, pe *parallelExec
}
return state.VersionInvalid
}, false, "") != state.VersionValid) {
// Undo the takeNextPending → inProgress move: the task was
// not actually dispatched to a worker, so it must not remain
// in inProgress (phantom state) where it would block
// removeDependency from re-pushing it to pending later.
be.execTasks.clearInProgress(nextTx)
be.execTasks.pushPending(nextTx)
skipped++
continue
}
be.cntSpecExec++
Expand All @@ -1774,6 +1856,7 @@ func (be *blockExecutor) scheduleExecution(ctx context.Context, pe *parallelExec
}

be.cntExec++
scheduled++

if incarnation := be.txIncarnations[nextTx]; incarnation == 0 {
pe.in.Add(ctx, &taskVersion{
Expand All @@ -1795,6 +1878,49 @@ func (be *blockExecutor) scheduleExecution(ctx context.Context, pe *parallelExec
statsMutex: &be.Mutex})
}
}

// Stall prevention: if all pending tasks were skipped by the speculative
// check and no tasks were sent to workers, force-schedule the first
// pending task to prevent deadlock. Re-executing a task is always correct
// — the spec check is purely an optimization to avoid wasted work.
// A wasted re-execution is infinitely better than a deadlock.
if scheduled == 0 && skipped > 0 {
maxExecComplete := be.execTasks.maxComplete()

pe.logger.Warn("[parallel-exec] scheduleExecution skipped all pending tasks, force-scheduling",
"block", be.blockNum,
"totalTasks", len(be.tasks),
"maxValidated", maxValidated,
"maxExecComplete", maxExecComplete,
"skippedCount", skipped,
"execPending", fmt.Sprint(be.execTasks.pending),
"execInProgress", fmt.Sprint(be.execTasks.inProgress),
"execComplete", len(be.execTasks.complete),
"valComplete", len(be.validateTasks.complete),
"workerQueueLen", pe.in.Len(),
)

// Force-schedule the first pending task.
nextTx := be.execTasks.takeNextPending()
if nextTx >= 0 {
execTask := be.tasks[nextTx]
// Do NOT set skipCheck here — the task is not at the validation
// frontier (maxValidated+1), so its result must go through normal
// validation. Setting skipCheck would accept stale-state results
// and produce wrong trie roots.
be.cntExec++

version := execTask.Version()
version.Incarnation = be.txIncarnations[nextTx]
pe.in.ReTry(&taskVersion{
execTask: execTask,
version: version,
versionMap: be.versionMap,
profile: be.profile,
stats: be.stats,
statsMutex: &be.Mutex})
}
}
}

func MergeReadSets(a state.ReadSet, b state.ReadSet) state.ReadSet {
Expand Down
Loading