Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions execution/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@ type parallelExecutor struct {
rws *exec.ResultsQueue
workerCount int
blockExecutors map[uint64]*blockExecutor
accumulator *shards.Accumulator
}

func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u Unwinder,
startBlockNum uint64, offsetFromBlockBeginning uint64, maxBlockNum uint64, blockLimit uint64,
initialTxNum uint64, inputTxNum uint64, initialCycle bool, rwTx kv.TemporalRwTx,
stepsInDb float64, accumulator *shards.Accumulator, readAhead chan uint64, logEvery *time.Ticker) (*types.Header, kv.TemporalRwTx, error) {

pe.accumulator = accumulator

var asyncTxChan mdbx.TxApplyChan
var asyncTx kv.TemporalTx

Expand Down Expand Up @@ -656,7 +659,7 @@ func (pe *parallelExecutor) execLoop(ctx context.Context) (err error) {
blockExecutor.versionMap.FlushVersionedWrites(finalWrites, true, "")
}

stateWriter := state.NewBufferedWriter(pe.rs, nil)
stateWriter := state.NewBufferedWriter(pe.rs, pe.accumulator)
if err = ibs.MakeWriteSet(txTask.EvmBlockContext.Rules(txTask.Config), stateWriter); err != nil {
return state.StateUpdates{}, err
}
Expand Down Expand Up @@ -1520,7 +1523,7 @@ func (be *blockExecutor) nextResult(ctx context.Context, pe *parallelExecutor, r
}
}

stateWriter := state.NewBufferedWriter(pe.rs, nil)
stateWriter := state.NewBufferedWriter(pe.rs, pe.accumulator)

_, addReads, addWrites, err := txResult.finalize(prevReceipt, pe.cfg.engine, be.versionMap, stateReader, stateWriter)

Expand Down
11 changes: 10 additions & 1 deletion execution/stagedsync/exec3_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type serialExecutor struct {
blobGasUsed uint64
lastBlockResult *blockResult
worker *exec.Worker

// accumulator for the current block; set at StartChange and used by the
// block-end stateWriter so that AuRa system-call nonce changes are
// included in the txpool state-diff batch.
accumulator *shards.Accumulator
}

func warmTxsHashes(block *types.Block) {
Expand Down Expand Up @@ -124,6 +129,7 @@ func (se *serialExecutor) exec(ctx context.Context, execStage *StageState, u Unw
return se.getHeader(ctx, hash, number)
}), se.cfg.engine, se.cfg.author, se.cfg.chainConfig)

se.accumulator = accumulator // keep in sync for executeBlock's stateWriter
if accumulator != nil {
txs, err := se.cfg.blockReader.RawTransactions(context.Background(), se.applyTx, b.NumberU64(), b.NumberU64())
if err != nil {
Expand Down Expand Up @@ -402,7 +408,10 @@ func (se *serialExecutor) executeBlock(ctx context.Context, tasks []exec.Task, i
}
}

stateWriter := state.NewWriter(se.doms.AsPutDel(se.applyTx), nil, txTask.TxNum)
// Pass se.accumulator so that AuRa / system-call nonce changes
// are included in the txpool state-diff batch (fixes empty block
// production on Gnosis Chain caused by stale pending txns).
stateWriter := state.NewWriter(se.doms.AsPutDel(se.applyTx), se.accumulator, txTask.TxNum)

if err = ibs.MakeWriteSet(txTask.Rules(), stateWriter); err != nil {
panic(err)
Expand Down
1 change: 0 additions & 1 deletion txnprovider/txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remoteproto.State
if err != nil {
return err
}

defer coreTx.Rollback()

block := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight
Expand Down
119 changes: 119 additions & 0 deletions txnprovider/txpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,3 +1979,122 @@ func TestZombieQueuedEviction(t *testing.T) {
assert.Equal(count, pending2, "all consecutive txns should be pending (no gaps, sufficient balance)")
})
}

// TestStalePendingEvictionViaMineNonce verifies that when the execution layer
// correctly emits a state-diff UPSERT for an AuRa/Gnosis system-transaction
// sender (fixed in exec3_serial.go by passing the accumulator to the block-end
// stateWriter), the txpool evicts the now-stale pending transactions.
//
// Scenario:
// 1. addr1 has two pending txns: T1 (nonce=0) and T2 (nonce=1).
// 2. The block mines T1 from the pool AND an AuRa system tx at nonce=1,
// advancing the on-chain nonce to 2. Only T1 appears in minedTxns.
// 3. The EL (after the exec3_serial.go fix) emits addr1 with nonce=2 in
// stateChanges. The pool receives this via OnNewBlock.
// 4. removeMined removes T1; onSenderStateChange(nonce=2) evicts T2 (nonce=1).
func TestStalePendingEvictionViaMineNonce(t *testing.T) {
asrt := assert.New(t)
req := require.New(t)
logger := log.New()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ch := make(chan Announcements, 100)
coreDB := temporaltest.NewTestDB(t, datadir.New(t.TempDir()))
cfg := txpoolcfg.DefaultConfig

// DummyCache reads directly from the DB — avoids coherence-version coupling.
pool, err := New(ctx, ch, nil, coreDB, cfg, kvcache.NewDummy(), chain.TestChainConfig, nil, nil, func() {}, nil, nil, logger, WithFeeCalculator(nil))
req.NoError(err)
req.NotNil(pool)

var addr1 [20]byte
addr1[0] = 1
h0 := gointerfaces.ConvertHashToH256([32]byte{})

// writeAccount writes addr1 to coreDB at the given nonce so that senders.info
// (which reads from the DB when using DummyCache) returns the expected value.
writeAccount := func(nonce, txNum uint64) {
tx, werr := coreDB.BeginTemporalRw(ctx)
req.NoError(werr)
defer tx.Rollback()
sd, werr := execctx.NewSharedDomains(ctx, tx, logger)
req.NoError(werr)
a := accounts3.Account{
Nonce: nonce, Balance: *uint256.NewInt(1 * common.Ether),
CodeHash: accounts.EmptyCodeHash, Incarnation: 1,
}
req.NoError(sd.DomainPut(kv.AccountsDomain, tx, addr1[:], accounts3.SerialiseV3(&a), txNum, nil))
req.NoError(sd.Flush(ctx, tx))
sd.Close()
req.NoError(tx.Commit())
}

serialiseAcc := func(nonce uint64) []byte {
a := accounts3.Account{
Nonce: nonce, Balance: *uint256.NewInt(1 * common.Ether),
CodeHash: accounts.EmptyCodeHash, Incarnation: 1,
}
return accounts3.SerialiseV3(&a)
}

// ── Step 1: write addr1 nonce=0 to DB and bootstrap pool ─────────────────
writeAccount(0, 0)
initChange := &remoteproto.StateChangeBatch{
StateVersionId: 0, PendingBlockBaseFee: 200_000, BlockGasLimit: 1_000_000,
ChangeBatch: []*remoteproto.StateChange{{BlockHeight: 0, BlockHash: h0}},
}
initChange.ChangeBatch[0].Changes = append(initChange.ChangeBatch[0].Changes, &remoteproto.AccountChange{
Action: remoteproto.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(addr1),
Data: serialiseAcc(0),
})
req.NoError(pool.OnNewBlock(ctx, initChange, TxnSlots{}, TxnSlots{}, TxnSlots{}))

// ── Step 2: add T1 (nonce=0) and T2 (nonce=1) to pending ────────────────
T1 := &TxnSlot{Tip: *uint256.NewInt(300_000), FeeCap: *uint256.NewInt(300_000), Gas: 100_000, Nonce: 0}
T1.IDHash[0] = 1
T2 := &TxnSlot{Tip: *uint256.NewInt(300_000), FeeCap: *uint256.NewInt(300_000), Gas: 100_000, Nonce: 1}
T2.IDHash[0] = 2
var slots TxnSlots
slots.Append(T1, addr1[:], true)
slots.Append(T2, addr1[:], true)
reasons, err := pool.AddLocalTxns(ctx, slots)
req.NoError(err)
for _, r := range reasons {
asrt.Equal(txpoolcfg.Success, r, r.String())
}
pending, _, _ := pool.CountContent()
asrt.Equal(2, pending, "both T1 and T2 should be in pending")

// ── Step 3: advance DB nonce to 2 (T1 mined + AuRa system tx at nonce=1) ─
writeAccount(2, 1)

// ── Step 4: OnNewBlock with the correct stateChanges from the fixed EL ───
// The exec3_serial.go fix ensures addr1 appears in stateChanges with
// nonce=2 because the block-end stateWriter now carries the accumulator.
h1 := gointerfaces.ConvertHashToH256([32]byte{1})
blockChange := &remoteproto.StateChangeBatch{
StateVersionId: 1, PendingBlockBaseFee: 200_000, BlockGasLimit: 1_000_000,
ChangeBatch: []*remoteproto.StateChange{{BlockHeight: 1, BlockHash: h1}},
}
blockChange.ChangeBatch[0].Changes = append(blockChange.ChangeBatch[0].Changes, &remoteproto.AccountChange{
Action: remoteproto.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(addr1),
Data: serialiseAcc(2), // EL now correctly emits nonce=2
})

minedT1 := &TxnSlot{Tip: *uint256.NewInt(300_000), FeeCap: *uint256.NewInt(300_000), Gas: 100_000, Nonce: 0}
minedT1.IDHash[0] = 1
var minedSlots TxnSlots
minedSlots.Append(minedT1, addr1[:], true)

req.NoError(pool.OnNewBlock(ctx, blockChange, TxnSlots{}, TxnSlots{}, minedSlots))

// ── Step 5: T1 removed by removeMined; T2 evicted by onSenderStateChange ─
// senderNonce=2 (from DB) > T2.Nonce=1 → NonceTooLow.
pending, _, queued := pool.CountContent()
asrt.Equal(0, pending, "T2 must be evicted: on-chain nonce=2 > T2.nonce=1")
asrt.Equal(0, queued, "no queued txns expected")
}
Loading