Skip to content

Commit c194308

Browse files
authored
[r3.3] rpc: fix txpool_content crash: unknown (#18120)
Cherry-pick #18111
1 parent a0c55b4 commit c194308

File tree

2 files changed

+239
-2
lines changed

2 files changed

+239
-2
lines changed

txnprovider/txpool/pool.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2694,13 +2694,17 @@ func (p *TxPool) logStats() {
26942694
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx) {
26952695
var txns []*metaTxn
26962696
var senders []common.Address
2697+
var subPoolTypes []SubPoolType
2698+
var rlpValues [][]byte
26972699

26982700
p.lock.Lock()
26992701

27002702
p.all.ascendAll(func(mt *metaTxn) bool {
27012703
if sender, found := p.senders.senderID2Addr[mt.TxnSlot.SenderID]; found {
27022704
txns = append(txns, mt)
27032705
senders = append(senders, sender)
2706+
subPoolTypes = append(subPoolTypes, mt.currentSubPool)
2707+
rlpValues = append(rlpValues, mt.TxnSlot.Rlp)
27042708
}
27052709

27062710
return true
@@ -2709,7 +2713,7 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
27092713
p.lock.Unlock()
27102714

27112715
for i := range txns {
2712-
slotRlp := txns[i].TxnSlot.Rlp
2716+
slotRlp := rlpValues[i]
27132717
if slotRlp == nil {
27142718
v, err := tx.GetOne(kv.PoolTransaction, txns[i].TxnSlot.IDHash[:])
27152719
if err != nil {
@@ -2723,7 +2727,7 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
27232727
slotRlp = v[20:]
27242728
}
27252729

2726-
f(slotRlp, senders[i], txns[i].currentSubPool)
2730+
f(slotRlp, senders[i], subPoolTypes[i])
27272731
}
27282732
}
27292733

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Copyright 2025 The Erigon Authors
2+
// This file is part of Erigon.
3+
//
4+
// Erigon is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// Erigon is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package txpool
18+
19+
import (
20+
"context"
21+
"sync"
22+
"sync/atomic"
23+
"testing"
24+
"time"
25+
26+
"github.com/holiman/uint256"
27+
28+
"github.com/erigontech/erigon/common"
29+
"github.com/erigontech/erigon/common/log/v3"
30+
"github.com/erigontech/erigon/db/datadir"
31+
"github.com/erigontech/erigon/db/kv/kvcache"
32+
"github.com/erigontech/erigon/db/kv/memdb"
33+
"github.com/erigontech/erigon/db/kv/temporal/temporaltest"
34+
"github.com/erigontech/erigon/execution/chain"
35+
accounts3 "github.com/erigontech/erigon/execution/types/accounts"
36+
"github.com/erigontech/erigon/node/gointerfaces"
37+
"github.com/erigontech/erigon/node/gointerfaces/remoteproto"
38+
"github.com/erigontech/erigon/node/gointerfaces/txpoolproto"
39+
"github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg"
40+
)
41+
42+
// TestQueryAllWithoutPanicUnknown tries to reproduce https://github.com/erigontech/erigon/issues/18076 relying on
43+
// the TOCTOU between the deprecatedForEach locking window and the conversion of currentSubPool in GrpcServer.All().
44+
// It runs 3 concurrent loops: one repeatedly calling GrpcServer.All(), the others repeatedly triggering public
45+
// operations that reset currentSubPool to zero (mined removal and replacement), aiming to hit the race window.
46+
// If the panic("unknown") is triggered in the observation window, the test fails.
47+
func TestQueryAllWithoutPanicUnknown(t *testing.T) {
48+
if testing.Short() {
49+
t.Skip()
50+
}
51+
52+
const ObservationWindow = 10 * time.Second
53+
54+
ctx, cancel := context.WithTimeout(context.Background(), ObservationWindow)
55+
defer cancel()
56+
57+
// Prepare tx pool and core+pool DBs
58+
newTxns := make(chan Announcements, 1)
59+
chainDB := temporaltest.NewTestDB(t, datadir.New(t.TempDir()))
60+
poolDB := memdb.NewTestPoolDB(t)
61+
cfg := txpoolcfg.DefaultConfig
62+
cache := kvcache.New(kvcache.DefaultCoherentConfig)
63+
pool, err := New(ctx, newTxns, poolDB, chainDB, cfg, cache, chain.TestChainConfig, nil, nil, func() {}, nil, nil, log.New())
64+
if err != nil {
65+
t.Fatalf("new pool: %v", err)
66+
}
67+
68+
// Seed minimal chain state so the pool accepts local txns
69+
var stateVersionID uint64 = 0
70+
pendingBaseFee := uint64(200000)
71+
h256 := gointerfaces.ConvertHashToH256(common.Hash{})
72+
change := &remoteproto.StateChangeBatch{
73+
StateVersionId: stateVersionID,
74+
PendingBlockBaseFee: pendingBaseFee,
75+
BlockGasLimit: 1_000_000,
76+
ChangeBatch: []*remoteproto.StateChange{{BlockHeight: 0, BlockHash: h256}},
77+
}
78+
var addr common.Address
79+
addr[0] = 0xAB
80+
acc := accounts3.Account{Nonce: 0, Balance: *uint256.NewInt(10 * common.Ether)}
81+
accBlob := accounts3.SerialiseV3(&acc)
82+
change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remoteproto.AccountChange{
83+
Action: remoteproto.Action_UPSERT,
84+
Address: gointerfaces.ConvertAddressToH160(addr),
85+
Data: accBlob,
86+
})
87+
88+
// Apply state change
89+
if err := pool.OnNewBlock(ctx, change, TxnSlots{}, TxnSlots{}, TxnSlots{}); err != nil {
90+
t.Fatalf("OnNewBlock: %v", err)
91+
}
92+
93+
// Prepare two alternating local transactions with the same nonce to exercise replacement
94+
mkSlot := func(id byte, tip uint64) *TxnSlot {
95+
s := &TxnSlot{
96+
Tip: *uint256.NewInt(tip),
97+
FeeCap: *uint256.NewInt(tip),
98+
Gas: 21000,
99+
Nonce: 0,
100+
Rlp: []byte{id}, // ensure All() doesn't need DB to fetch
101+
}
102+
s.IDHash[0] = id
103+
return s
104+
}
105+
slotA := mkSlot(0xA1, 300000)
106+
slotB := mkSlot(0xB2, 400000) // higher to ensure replacement
107+
108+
// Add initial txn (A)
109+
var slots TxnSlots
110+
slots.Append(slotA, addr[:], true)
111+
discards, err := pool.AddLocalTxns(ctx, slots)
112+
if err != nil {
113+
t.Fatalf("AddLocalTxns(A): %v", err)
114+
}
115+
if len(discards) != 1 || discards[0] != txpoolcfg.Success {
116+
t.Fatalf("unexpected add result A: %+v", discards)
117+
}
118+
119+
// Build gRPC server for TxPool
120+
chainID := *uint256.NewInt(1)
121+
s := NewGrpcServer(ctx, pool, poolDB, nil, chainID, log.New())
122+
123+
var panicObserved atomic.Bool
124+
panicCh := make(chan struct{}, 1)
125+
126+
var allTasks sync.WaitGroup
127+
128+
// Reader task: repeatedly call GrpcServer.All() and catch the panic("unknown")
129+
allTasks.Add(1)
130+
go func() {
131+
defer allTasks.Done()
132+
for !panicObserved.Load() {
133+
func() {
134+
defer func() {
135+
if r := recover(); r != nil {
136+
if r == "unknown" {
137+
panicObserved.Store(true)
138+
select {
139+
case panicCh <- struct{}{}:
140+
default:
141+
}
142+
}
143+
}
144+
}()
145+
_, _ = s.All(ctx, &txpoolproto.AllRequest{})
146+
}()
147+
148+
// Either exit if the observation window is done or sleep a bit
149+
select {
150+
case <-ctx.Done():
151+
return
152+
default:
153+
time.Sleep(50 * time.Microsecond)
154+
}
155+
}
156+
}()
157+
158+
// Mutator task: alternate between replacement and mined-removal cycles
159+
allTasks.Add(1)
160+
go func() {
161+
defer allTasks.Done()
162+
for !panicObserved.Load() {
163+
// Replacement path: add B to replace A (or vice versa)
164+
var r TxnSlots
165+
r.Append(slotB, addr[:], true)
166+
_, _ = pool.AddLocalTxns(ctx, r)
167+
168+
// Now mined-removal path for whichever is present (use B here)
169+
var mined TxnSlots
170+
mined.Append(slotB, addr[:], true)
171+
_ = pool.OnNewBlock(ctx, &remoteproto.StateChangeBatch{ // keep the same base fee
172+
StateVersionId: stateVersionID,
173+
PendingBlockBaseFee: pendingBaseFee,
174+
BlockGasLimit: 1_000_000,
175+
ChangeBatch: []*remoteproto.StateChange{{BlockHeight: 0, BlockHash: h256}},
176+
}, TxnSlots{}, TxnSlots{}, mined)
177+
178+
// Re-add A again to keep cycling
179+
var r2 TxnSlots
180+
r2.Append(slotA, addr[:], true)
181+
_, _ = pool.AddLocalTxns(ctx, r2)
182+
183+
// Either exit if the observation window is done or sleep a bit
184+
select {
185+
case <-ctx.Done():
186+
return
187+
default:
188+
time.Sleep(50 * time.Microsecond)
189+
}
190+
}
191+
}()
192+
193+
// BaseFee churn task: alternates base fee above/below thresholds to force demotions/promotions across sub-pools
194+
// while sender mapping remains.
195+
allTasks.Add(1)
196+
go func() {
197+
defer allTasks.Done()
198+
flip := false
199+
for !panicObserved.Load() {
200+
var bf uint64
201+
if flip {
202+
bf = pendingBaseFee * 20 // very high to push below fee cap
203+
} else {
204+
bf = pendingBaseFee / 20 // very low to allow promotions
205+
}
206+
flip = !flip
207+
_ = pool.OnNewBlock(ctx, &remoteproto.StateChangeBatch{
208+
StateVersionId: stateVersionID,
209+
PendingBlockBaseFee: bf,
210+
BlockGasLimit: 1_000_000,
211+
ChangeBatch: []*remoteproto.StateChange{{BlockHeight: 0, BlockHash: h256}},
212+
}, TxnSlots{}, TxnSlots{}, TxnSlots{})
213+
214+
// Either exit if the observation window is done or sleep a bit
215+
select {
216+
case <-ctx.Done():
217+
return
218+
default:
219+
time.Sleep(75 * time.Microsecond)
220+
}
221+
}
222+
}()
223+
224+
// Wait for all tasks to finish
225+
allTasks.Wait()
226+
227+
select {
228+
case <-panicCh:
229+
t.Fatalf("panic(\"unknown\") triggered")
230+
case <-ctx.Done():
231+
// Success
232+
}
233+
}

0 commit comments

Comments
 (0)