Skip to content
3 changes: 1 addition & 2 deletions arbnode/consensus_execution_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/headerreader"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (c *ConsensusExecutionSyncer) getFinalityData(
}
msgIdx := msgCount - 1
msgResult, err := c.txStreamer.ResultAtMessageIndex(msgIdx)
if errors.Is(err, gethexec.ResultNotFound) {
if errors.Is(err, execution.ErrResultNotFound) {
log.Debug("Message result not found, node out of sync", "msgIdx", msgIdx, "err", err)
return nil, nil
} else if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions changelog/pmikolajczyk-nit-4738.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Internal
- Replace string matching with JSON-RPC error codes in Consensus/Execution RPC clients
22 changes: 5 additions & 17 deletions consensus/consensusrpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package consensusrpcclient

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -40,53 +39,42 @@ func (c *ConsensusRPCClient) StopAndWait() {
c.StopWaiter.StopAndWait()
}

func convertError(err error) error {
if err == nil {
return nil
}
errStr := err.Error()
if strings.Contains(errStr, execution.ErrSequencerInsertLockTaken.Error()) {
return execution.ErrSequencerInsertLockTaken
}
return err
}

func (c *ConsensusRPCClient) GetL1Confirmations(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (uint64, error) {
var res uint64
err := c.client.CallContext(ctx, &res, consensus.RPCNamespace+"_getL1Confirmations", msgIdx)
return res, convertError(err)
return res, err
})
}

func (c *ConsensusRPCClient) FindBatchContainingMessage(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (uint64, error) {
var res uint64
err := c.client.CallContext(ctx, &res, consensus.RPCNamespace+"_findBatchContainingMessage", msgIdx)
return res, convertError(err)
return res, err
})
}

func (c *ConsensusRPCClient) BlockMetadataAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[common.BlockMetadata] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (common.BlockMetadata, error) {
var res common.BlockMetadata
err := c.client.CallContext(ctx, &res, consensus.RPCNamespace+"_blockMetadataAtMessageIndex", msgIdx)
return res, convertError(err)
return res, err
})
}

func (c *ConsensusRPCClient) WriteMessageFromSequencer(msgIdx arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (struct{}, error) {
var res struct{}
err := c.client.CallContext(ctx, &res, consensus.RPCNamespace+"_writeMessageFromSequencer", msgIdx, msgWithMeta, msgResult, blockMetadata)
return res, convertError(err)
return res, err
})
}

func (c *ConsensusRPCClient) ExpectChosenSequencer() containers.PromiseInterface[struct{}] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (struct{}, error) {
var res struct{}
err := c.client.CallContext(ctx, &res, consensus.RPCNamespace+"_expectChosenSequencer")
return res, convertError(err)
return res, err
})
}
169 changes: 169 additions & 0 deletions consensus/consensusrpcclient/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2026, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package consensusrpcclient

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/consensus"
"github.com/offchainlabs/nitro/consensus/consensusrpcserver"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/containers"
utilrpc "github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/testhelpers"
)

// mockConsensusService implements consensus.FullConsensusClient for testing.
type mockConsensusService struct {
err error
}

func (m *mockConsensusService) GetL1Confirmations(_ arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return containers.NewReadyPromise[uint64](0, m.err)
}

func (m *mockConsensusService) FindBatchContainingMessage(_ arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return containers.NewReadyPromise[uint64](0, m.err)
}

func (m *mockConsensusService) BlockMetadataAtMessageIndex(_ arbutil.MessageIndex) containers.PromiseInterface[common.BlockMetadata] {
return containers.NewReadyPromise[common.BlockMetadata](nil, m.err)
}

func (m *mockConsensusService) WriteMessageFromSequencer(_ arbutil.MessageIndex, _ arbostypes.MessageWithMetadata, _ execution.MessageResult, _ common.BlockMetadata) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise[struct{}](struct{}{}, m.err)
}

func (m *mockConsensusService) ExpectChosenSequencer() containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise[struct{}](struct{}{}, m.err)
}

func createMockConsensusNode(t *testing.T, errToReturn error) *node.Node {
t.Helper()
stackConf := node.DefaultConfig
stackConf.HTTPPort = 0
stackConf.DataDir = ""
stackConf.WSHost = "127.0.0.1"
stackConf.WSPort = 0
stackConf.WSModules = []string{consensus.RPCNamespace}
stackConf.P2P.NoDiscovery = true
stackConf.P2P.ListenAddr = ""

stack, err := node.New(&stackConf)
testhelpers.RequireImpl(t, err)

stack.RegisterAPIs([]rpc.API{{
Namespace: consensus.RPCNamespace,
Service: consensusrpcserver.NewConsensusRPCServer(&mockConsensusService{err: errToReturn}),
Public: true,
}})

testhelpers.RequireImpl(t, stack.Start())
t.Cleanup(func() { _ = stack.Close() })
return stack
}

func TestConsensusClientErrorHandling(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tests := []struct {
name string
serverErr error
expectedErr error
}{
{
name: "ErrSequencerInsertLockTaken",
serverErr: execution.ErrSequencerInsertLockTaken,
expectedErr: execution.ErrSequencerInsertLockTaken,
},
{
name: "ErrSequencerInsertLockTaken wrapped",
serverErr: fmt.Errorf("consensus context: %w", execution.ErrSequencerInsertLockTaken),
expectedErr: execution.ErrSequencerInsertLockTaken,
},
{
name: "ErrRetrySequencer",
serverErr: execution.ErrRetrySequencer,
expectedErr: execution.ErrRetrySequencer,
},
{
name: "ErrRetrySequencer wrapped",
serverErr: fmt.Errorf("consensus context: %w", execution.ErrRetrySequencer),
expectedErr: execution.ErrRetrySequencer,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stack := createMockConsensusNode(t, tc.serverErr)

config := &utilrpc.ClientConfig{
URL: "self",
Timeout: 5 * time.Second,
}
testhelpers.RequireImpl(t, config.Validate())

client := NewConsensusRPCClient(func() *utilrpc.ClientConfig { return config }, stack)
testhelpers.RequireImpl(t, client.Start(ctx))
defer client.StopAndWait()

_, err := client.ExpectChosenSequencer().Await(ctx)

if err == nil {
t.Fatal("expected an error from server, got nil")
}
if !errors.Is(err, tc.expectedErr) {
t.Errorf("expected %v, got %v", tc.expectedErr, err)
}
})
}
}

// TestConsensusClientErrorNoFalsePositives verifies that a plain server error
// (arriving with the default JSON-RPC code -32000) does not match any sentinel.
func TestConsensusClientErrorNoFalsePositives(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

stack := createMockConsensusNode(t, errors.New("some unrelated failure"))

config := &utilrpc.ClientConfig{
URL: "self",
Timeout: 5 * time.Second,
}
testhelpers.RequireImpl(t, config.Validate())

client := NewConsensusRPCClient(func() *utilrpc.ClientConfig { return config }, stack)
testhelpers.RequireImpl(t, client.Start(ctx))
defer client.StopAndWait()

_, err := client.ExpectChosenSequencer().Await(ctx)
if err == nil {
t.Fatal("expected an error from server, got nil")
}

allSentinels := []error{
execution.ErrResultNotFound,
execution.ErrRetrySequencer,
execution.ErrSequencerInsertLockTaken,
}
for _, sentinel := range allSentinels {
if errors.Is(err, sentinel) {
t.Errorf("plain error should not match sentinel %v", sentinel)
}
}
}
3 changes: 1 addition & 2 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ var (

var (
ExecutionEngineBlockCreationStopped = errors.New("block creation stopped in execution engine")
ResultNotFound = errors.New("result not found")
BlockNumBeforeGenesis = errors.New("block number is before genesis")
)

Expand Down Expand Up @@ -1024,7 +1023,7 @@ func (s *ExecutionEngine) appendBlock(block *types.Block, statedb *state.StateDB

func (s *ExecutionEngine) resultFromHeader(header *types.Header) (*execution.MessageResult, error) {
if header == nil {
return nil, ResultNotFound
return nil, execution.ErrResultNotFound
}
info := types.DeserializeHeaderExtraInformation(header)
return &execution.MessageResult{
Expand Down
4 changes: 0 additions & 4 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package execution

import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -42,9 +41,6 @@ type ConsensusSyncData struct {
UpdatedAt time.Time
}

var ErrRetrySequencer = errors.New("please retry transaction")
var ErrSequencerInsertLockTaken = errors.New("insert lock taken")

// always needed
type ExecutionClient interface {
ArbOSVersionGetter
Expand Down
17 changes: 1 addition & 16 deletions execution/rpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ package rpcclient

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/node"

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -41,24 +39,11 @@ func (c *Client) StopAndWait() {
c.StopWaiter.StopAndWait()
}

func convertError(err error) error {
if err == nil {
return nil
}
errStr := err.Error()
if strings.Contains(errStr, execution.ErrRetrySequencer.Error()) {
return execution.ErrRetrySequencer
} else if strings.Contains(errStr, gethexec.ResultNotFound.Error()) {
return gethexec.ResultNotFound
}
return err
}

func sendRequest[T any](c *Client, method string, args ...any) containers.PromiseInterface[T] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (T, error) {
var res T
err := c.client.CallContext(ctx, &res, execution.RPCNamespace+method, args...)
return res, convertError(err)
return res, err
})
}

Expand Down
Loading
Loading