Skip to content

Commit a921495

Browse files
Alexey Sharpclaude
authored andcommitted
Bisect: original PR commit + gemini fix + mdbx fix only
Contains only the original changes (Pool QueueWithRetry, cache filterDirtyFiles regexp, Close→Release lifecycle) plus the mdbx buffered channel fix. No diagnostic commits, no clearInProgress fix, no force-schedule fallback, no test harness changes. Purpose: isolate whether the "Wrong trie root" devnet failures are caused by the original lifecycle change or by the fix commits. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f6fd796 commit a921495

File tree

4 files changed

+51
-10
lines changed

4 files changed

+51
-10
lines changed

db/kv/mdbx/kv_mdbx.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ func NewAsyncTx(tx kv.Tx, queueSize int) *asyncTx {
819819
}
820820

821821
func (a *asyncTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
822-
rc := make(chan error)
822+
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
823823
a.requests <- &applyTx{rc, a.Tx, f}
824824
select {
825825
case err := <-rc:
@@ -843,7 +843,7 @@ func NewAsyncRwTx(tx kv.RwTx, queueSize int) *asyncRwTx {
843843
}
844844

845845
func (a *asyncRwTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
846-
rc := make(chan error)
846+
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
847847
a.requests <- &applyTx{rc, a.RwTx, f}
848848
select {
849849
case err := <-rc:
@@ -854,7 +854,7 @@ func (a *asyncRwTx) Apply(ctx context.Context, f func(kv.Tx) error) error {
854854
}
855855

856856
func (a *asyncRwTx) ApplyRw(ctx context.Context, f func(kv.RwTx) error) error {
857-
rc := make(chan error)
857+
rc := make(chan error, 1) // buffered: if caller abandons via ctx.Done(), the mdbx thread must not block
858858
a.requests <- &applyRwTx{rc, a.RwTx, f}
859859
select {
860860
case err := <-rc:

db/state/dirty_files.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,16 @@ func (i *FilesItem) closeFilesAndRemove() {
258258
}
259259
}
260260

261+
var filterDirtyFilesReCache sync.Map // pattern string → *regexp.Regexp
262+
261263
func filterDirtyFiles(fileNames []string, stepSize, stepsInFrozenFile uint64, filenameBase, ext string, logger log.Logger) (res []*FilesItem) {
262-
re := regexp.MustCompile(`^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`)
264+
pattern := `^v(\d+(?:\.\d+)?)-` + filenameBase + `\.(\d+)-(\d+)\.` + ext + `$`
265+
reVal, ok := filterDirtyFilesReCache.Load(pattern)
266+
if !ok {
267+
re := regexp.MustCompile(pattern)
268+
reVal, _ = filterDirtyFilesReCache.LoadOrStore(pattern, re)
269+
}
270+
re := reVal.(*regexp.Regexp)
263271
var err error
264272

265273
for _, name := range fileNames {

execution/exec/txtask.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,41 @@ type QueueWithRetry struct {
699699
capacity int
700700
}
701701

702+
var queuePool sync.Pool
703+
702704
func NewQueueWithRetry(capacity int) *QueueWithRetry {
705+
if v := queuePool.Get(); v != nil {
706+
q := v.(*QueueWithRetry)
707+
if q.capacity == capacity && q.newTasks != nil {
708+
return q
709+
}
710+
// If capacity is wrong or channel is nil, we don't put it back here;
711+
// another Get() might return a valid one, or we'll allocate a new one.
712+
}
703713
return &QueueWithRetry{newTasks: make(chan Task, capacity), capacity: capacity}
704714
}
705715

716+
// Release drains the queue and returns it to the pool for reuse.
717+
// The channel is preserved (not closed), avoiding reallocation of the
718+
// 100K-element buffer on the next NewQueueWithRetry call.
719+
// Must be called only after all producers and consumers have stopped.
720+
func (q *QueueWithRetry) Release() {
721+
q.lock.Lock()
722+
if q.newTasks == nil {
723+
q.lock.Unlock()
724+
return
725+
}
726+
// Drain channel.
727+
for len(q.newTasks) > 0 {
728+
<-q.newTasks
729+
}
730+
// Clear retry heap, keep backing array.
731+
q.retires = q.retires[:0]
732+
q.closed = false
733+
q.lock.Unlock()
734+
queuePool.Put(q)
735+
}
736+
706737
func (q *QueueWithRetry) NewTasksLen() int {
707738
q.lock.Lock()
708739
defer q.lock.Unlock()
@@ -733,7 +764,7 @@ func (q *QueueWithRetry) Add(ctx context.Context, t Task) {
733764
newTasks := q.newTasks
734765
q.lock.Unlock()
735766

736-
if !closed {
767+
if !closed && newTasks != nil {
737768
select {
738769
case <-ctx.Done():
739770
return
@@ -747,12 +778,12 @@ func (q *QueueWithRetry) Add(ctx context.Context, t Task) {
747778
// No limit on amount of txs added by this method.
748779
func (q *QueueWithRetry) ReTry(t Task) {
749780
q.lock.Lock()
750-
if q.closed {
781+
newTasks := q.newTasks
782+
if q.closed || newTasks == nil {
751783
q.lock.Unlock()
752784
return
753785
}
754786
heap.Push(&q.retires, t)
755-
newTasks := q.newTasks
756787
q.lock.Unlock()
757788
select {
758789
case newTasks <- nil:

execution/stagedsync/exec3_parallel.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -905,15 +905,17 @@ func (pe *parallelExecutor) run(ctx context.Context) (context.Context, context.C
905905

906906
pe.execLoopGroup.Go(func() error {
907907
defer pe.rws.Close()
908-
defer pe.in.Close()
908+
defer execLoopCtxCancel() // cancel workers' context when exec loop exits
909909
pe.resetWorkers(execLoopCtx, pe.rs, nil)
910910
return pe.execLoop(execLoopCtx)
911911
})
912912

913913
return execLoopCtx, func() {
914914
execLoopCtxCancel()
915-
defer pe.stopWorkers()
916-
defer pe.in.Close()
915+
defer func() {
916+
pe.stopWorkers()
917+
pe.in.Release()
918+
}()
917919

918920
if err := pe.wait(ctx); err != nil {
919921
pe.logger.Debug("exec loop cancel failed", "err", err)

0 commit comments

Comments
 (0)