Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ linters:
- testableexamples
- testifylint
- thelper
- tparallel
- unconvert
- usestdlibvars
- unused
Expand Down
2 changes: 2 additions & 0 deletions sae/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
)

func TestRecoverFromDatabase(t *testing.T) {
t.Parallel()

sutOpt, vmTime := withVMTime(t, time.Unix(saeparams.TauSeconds, 0))

var srcDB database.Database
Expand Down
2 changes: 1 addition & 1 deletion sae/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (vm *VM) ethRPCServer() (*rpc.Server, error) {
{
"eth",
immediateReceipts{
vm.exec,
vm,
ethapi.NewTransactionAPI(b, new(ethapi.AddrLocker)),
},
},
Expand Down
59 changes: 41 additions & 18 deletions sae/rpc_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,54 @@ import (
"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/libevm/ethapi"

"github.com/ava-labs/strevm/blocks"
"github.com/ava-labs/strevm/saexec"
)

type immediateReceipts struct {
exec *saexec.Executor
vm *VM
*ethapi.TransactionAPI
}

func (ir immediateReceipts) GetTransactionReceipt(ctx context.Context, h common.Hash) (map[string]any, error) {
r, ok, err := ir.exec.RecentReceipt(ctx, h)
if err != nil {
return nil, err
}
if !ok {
// The transaction has either not been included yet, or it was cleared
// from the [saexec.Executor] cache but is on disk. The standard
// mechanism already differentiates between these scenarios.
return ir.TransactionAPI.GetTransactionReceipt(ctx, h)
var _ *saexec.Executor // protect the import for IDE comment resolution

// [saexec.Executor.RecentReceipt] will only return false if the transaction
// is yet to be included in an accepted block, or if it was executed so long
// ago that it is no longer in the cache. Assuming a tx "known" to the
// network, the former implies that it is in the mempool and the latter
// requires a fallback to regular receipt retrieval. Since
// [saexec.Executor.Enqueue] returning without error guarantees that
// RecentReceipt() will return known, still-cached receipts, we only have to
// handle the unknown ones.

// A buffer of 1 avoids a race between our call to RecentReceipt() and a
// concurrent call to Enqueue() by [VM.AcceptBlock], which only sends an
// `accepted` event after enqueuing.
ch := make(chan *blocks.Block, 1)
sub := ir.vm.exec.SubscribeEnqueueEvent(ch)
defer sub.Unsubscribe()

for ; ; <-ch {
Copy link
Copy Markdown
Contributor

@alarso16 alarso16 Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for loop construction was super strange to me. It might just be because I've never seen it, but I had to whip out go.dev/play to double-check the behavior for this. I now think is equivalent of trying the inner loop once before waiting on the channel at all.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we reformat it like this:

for {
     // do all normal logic

    select {
    case <-ch:
        // wait for new block, or your song
    case <-ctx.Done():
        return ctx.Err()
    }
}

I think the behavior is clearer

// The mempool might have dropped the tx for reasons other than block
// inclusion, in which case we'll fall back on regular receipt retrival,
// which already handles unknown transactions.
known := ir.vm.mempool.Pool.Has(h)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been tracing code for a while, and I don't understand when transactions may be dropped from the mempool. Obviously if they're outpriced or something, but when are they removed explicitly?


r, ok, err := ir.vm.exec.RecentReceipt(ctx, h)
if err != nil {
return nil, err
}
if ok {
return r.MarshalForRPC(), nil
}
if !known {
return ir.TransactionAPI.GetTransactionReceipt(ctx, h)
}

// 'Cause if at first you don't succeed
// You can dust it off and try again
// Dust yourself off and try again, try again
// 𝄢 ♪♩♪♩♩|♪♩♪♩♩
}
return ethapi.MarshalReceipt(
r.Receipt,
r.BlockHash,
r.BlockNumber.Uint64(),
r.Signer,
r.Tx,
int(r.TransactionIndex), //nolint:gosec // Known to not overflow
), nil
}
31 changes: 30 additions & 1 deletion sae/rpc_receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package sae
import (
"math/big"
"testing"
"time"

"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/core/types"
"github.com/ava-labs/libevm/params"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestImmediateReceipts(t *testing.T) {
Expand All @@ -29,7 +31,7 @@ func TestImmediateReceipts(t *testing.T) {
}
notBlocked := txs[0]

b := sut.runConsensusLoop(t, txs[:]...)
b := sut.runConsensusLoop(t, txs...)
sut.testRPC(ctx, t, rpcTest{
method: "eth_getTransactionReceipt",
args: []any{notBlocked.Hash()},
Expand All @@ -46,3 +48,30 @@ func TestImmediateReceipts(t *testing.T) {
})
assert.Falsef(t, b.Executed(), "%T.Executed()", b)
}

func TestCallGetReceiptBeforeAcceptance(t *testing.T) {
t.Parallel()

ctx, sut := newSUT(t, 1)

tx := sut.wallet.SetNonceAndSign(t, 0, &types.LegacyTx{
To: &zeroAddr,
Gas: params.TxGas,
GasPrice: big.NewInt(1),
})
sut.mustSendTx(t, tx)
sut.requireInMempool(t, tx.Hash())

t.Run("start_fetching_receipt_before_inclusion", func(t *testing.T) {
t.Parallel()
got, err := sut.TransactionReceipt(ctx, tx.Hash())
require.NoErrorf(t, err, "%T.TransactionReceipt()", sut.Client)
require.Equalf(t, tx.Hash(), got.TxHash, "%T.TxHash", got)
})

t.Run("accept_block_while_waiting_on_receipt", func(t *testing.T) {
t.Parallel()
time.Sleep(500 * time.Millisecond) // <------------- Noteworthy
sut.runConsensusLoop(t)
})
}
1 change: 0 additions & 1 deletion sae/worstcase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (g *guzzler) guzzle(env vm.PrecompileEnvironment, input []byte) ([]byte, er
return nil, nil
}

//nolint:tparallel // Why should we call t.Parallel at the top level by default?
func TestWorstCase(t *testing.T) {
flags := worstCaseFuzzFlags
t.Logf("Flags: %+v", flags)
Expand Down
4 changes: 4 additions & 0 deletions saexec/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ var errExecutorClosed = errors.New("saexec.Executor closed")
// Enqueue pushes a new block to the FIFO queue. If [Executor.Close] is called
// before [blocks.Block.Executed] returns true then there is no guarantee that
// the block will be executed.
//
// A successful return of Enqueue guarantees that [Executor.RecentReceipt] will
// block until the respective transaction has been executed.
func (e *Executor) Enqueue(ctx context.Context, block *blocks.Block) error {
e.createReceiptBuffers(block)
select {
case e.queue <- block:
e.enqueueEvents.Send(block)
if n := len(e.queue); n == cap(e.queue) {
// If this happens then increase the channel's buffer size.
e.log.Warn(
Expand Down
14 changes: 14 additions & 0 deletions saexec/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/core/types"
"github.com/ava-labs/libevm/libevm/ethapi"

"github.com/ava-labs/strevm/blocks"
)
Expand Down Expand Up @@ -45,6 +46,19 @@ type Receipt struct {
Tx *types.Transaction
}

// MarshalForRPC returns [ethapi.MarshalReceipt] with all arguments sourced from
// `r`.
func (r *Receipt) MarshalForRPC() map[string]any {
return ethapi.MarshalReceipt(
r.Receipt,
r.BlockHash,
r.BlockNumber.Uint64(),
r.Signer,
r.Tx,
int(r.TransactionIndex), //nolint:gosec // Known to not overflow
)
}

// RecentReceipt returns the receipt for the specified [types.Transaction] hash,
// as soon as it is ready for issuance, even if later transactions in the same
// block are still executing. It caches recent values for an indefinite period,
Expand Down
9 changes: 5 additions & 4 deletions saexec/saexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type Executor struct {
queue chan *blocks.Block
lastExecuted atomic.Pointer[blocks.Block]

headEvents event.FeedOf[core.ChainHeadEvent]
chainEvents event.FeedOf[core.ChainEvent]
logEvents event.FeedOf[[]*types.Log]
receipts *syncMap[common.Hash, chan *Receipt]
enqueueEvents event.FeedOf[*blocks.Block]
headEvents event.FeedOf[core.ChainHeadEvent]
chainEvents event.FeedOf[core.ChainEvent]
logEvents event.FeedOf[[]*types.Log]
receipts *syncMap[common.Hash, chan *Receipt]

chainContext *chainContext
chainConfig *params.ChainConfig
Expand Down
8 changes: 8 additions & 0 deletions saexec/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import (
"github.com/ava-labs/libevm/core"
"github.com/ava-labs/libevm/core/types"
"github.com/ava-labs/libevm/event"

"github.com/ava-labs/strevm/blocks"
)

// SubscribeEnqueueEvent returns a new subscription for each block for which
// [Executor.Enqueue] returns successfully.
func (e *Executor) SubscribeEnqueueEvent(ch chan<- *blocks.Block) event.Subscription {
return e.enqueueEvents.Subscribe(ch)
}

func (e *Executor) sendPostExecutionEvents(b *types.Block, receipts types.Receipts) {
e.headEvents.Send(core.ChainHeadEvent{Block: b})

Expand Down