Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions arbnode/mel/runner/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ func (m *MessageExtractor) initialize(ctx context.Context, current *fsm.CurrentS
// Start from the latest MEL state we have in the database
melState, err := m.melDB.GetHeadMelState(ctx)
if err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
// Initialize delayedMessageBacklog and add it to the melState
delayedMessageBacklog, err := mel.NewDelayedMessageBacklog(m.GetContext(), m.config.DelayedMessageBacklogCapacity, m.GetFinalizedDelayedMessagesRead)
if err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
if err = InitializeDelayedMessageBacklog(ctx, delayedMessageBacklog, m.melDB, melState, m.GetFinalizedDelayedMessagesRead); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
delayedMessageBacklog.CommitDirties()
melState.SetDelayedMessageBacklog(delayedMessageBacklog)
// Start mel state is now ready. Check if the state's parent chain block hash exists in the parent chain
startBlock, err := m.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(melState.ParentChainBlockNumber))
if err != nil {
return m.retryInterval, fmt.Errorf("failed to get start parent chain block: %d corresponding to head mel state from parent chain: %w", melState.ParentChainBlockNumber, err)
return m.config.RetryInterval, fmt.Errorf("failed to get start parent chain block: %d corresponding to head mel state from parent chain: %w", melState.ParentChainBlockNumber, err)
}
// We check if our head mel state's parentChainBlockHash matches the one on-chain, if it doesnt then we detected a reorg
if melState.ParentChainBlockHash != startBlock.Hash() {
Expand Down
41 changes: 27 additions & 14 deletions arbnode/mel/runner/mel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/bold/containers/fsm"
Expand All @@ -24,6 +25,10 @@ import (
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var (
stuckFSMIndicatingGauge = metrics.NewRegisteredGauge("arb/mel/stuck", nil) // 1-stuck, 0-not_stuck
)

// The default retry interval for the message extractor FSM. After each tick of the FSM,
// the extractor service stop waiter will wait for this duration before trying to act again.
const defaultRetryInterval = time.Second
Expand All @@ -32,8 +37,9 @@ type MessageExtractionConfig struct {
Enable bool `koanf:"enable"`
RetryInterval time.Duration `koanf:"retry-interval"`
DelayedMessageBacklogCapacity int `koanf:"delayed-message-backlog-capacity"`
BlocksToPrefetch uint64 `koanf:"blocks-to-prefetch" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
BlocksToPrefetch uint64 `koanf:"blocks-to-prefetch"`
ReadMode string `koanf:"read-mode"`
StallTolerance uint64 `koanf:"stall-tolerance"`
}

func (c *MessageExtractionConfig) Validate() error {
Expand All @@ -50,6 +56,7 @@ var DefaultMessageExtractionConfig = MessageExtractionConfig{
DelayedMessageBacklogCapacity: 100, // TODO: right default? setting to a lower value means more calls to l1reader
BlocksToPrefetch: 499, // 500 is the eth_getLogs block range limit
ReadMode: "latest",
StallTolerance: 10,
}

func MessageExtractionConfigAddOptions(prefix string, f *pflag.FlagSet) {
Expand All @@ -58,6 +65,7 @@ func MessageExtractionConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int(prefix+".delayed-message-backlog-capacity", DefaultMessageExtractionConfig.DelayedMessageBacklogCapacity, "target capacity of the delayed message backlog")
f.Uint64(prefix+".blocks-to-prefetch", DefaultMessageExtractionConfig.BlocksToPrefetch, "the number of blocks to prefetch relevant logs from")
f.String(prefix+".read-mode", DefaultMessageExtractionConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
f.Uint64(prefix+".stall-tolerance", DefaultMessageExtractionConfig.StallTolerance, "max times the MEL fsm is allowed to be stuck without logging error")
}

// TODO (ganesh): cleanup unused methods from this interface after checking with wasm mode
Expand All @@ -76,47 +84,43 @@ type ParentChainReader interface {
// blocks one by one to transform them into messages for the execution layer.
type MessageExtractor struct {
stopwaiter.StopWaiter
config *MessageExtractionConfig
config MessageExtractionConfig
parentChainReader ParentChainReader
logsPreFetcher *logsFetcher
addrs *chaininfo.RollupAddresses
melDB *Database
msgConsumer mel.MessageConsumer
dataProviders []daprovider.Reader
fsm *fsm.Fsm[action, FSMState]
retryInterval time.Duration
caughtUp bool
caughtUpChan chan struct{}
lastBlockToRead atomic.Uint64
stuckCount uint64
}

// Creates a message extractor instance with the specified parameters,
// including a parent chain reader, rollup addresses, and data providers
// to be used when extracting messages from the parent chain.
func NewMessageExtractor(
config MessageExtractionConfig,
parentChainReader ParentChainReader,
rollupAddrs *chaininfo.RollupAddresses,
melDB *Database,
msgConsumer mel.MessageConsumer,
dataProviders []daprovider.Reader,
retryInterval time.Duration,
) (*MessageExtractor, error) {
if retryInterval == 0 {
retryInterval = defaultRetryInterval
}
fsm, err := newFSM(Start)
if err != nil {
return nil, err
}
return &MessageExtractor{
config: config,
parentChainReader: parentChainReader,
addrs: rollupAddrs,
melDB: melDB,
msgConsumer: msgConsumer,
dataProviders: dataProviders,
fsm: fsm,
retryInterval: retryInterval,
config: &DefaultMessageExtractionConfig, //TODO: remove retryInterval as a struct instead use config
caughtUpChan: make(chan struct{}),
}, nil
}
Expand All @@ -135,10 +139,19 @@ func (m *MessageExtractor) Start(ctxIn context.Context) error {
}
return stopwaiter.CallIterativelyWith(
&m.StopWaiterSafe,
func(ctx context.Context, ignored struct{}) time.Duration {
func(ctx context.Context, _ struct{}) time.Duration {
actAgainInterval, err := m.Act(ctx)
if err != nil {
log.Error("Error in message extractor", "err", err)
m.stuckCount++ // an error implies no change in the fsm state
} else {
m.stuckCount = 0
}
if m.stuckCount > m.config.StallTolerance {
stuckFSMIndicatingGauge.Update(1)
log.Error("Message extractor has been stuck at the same fsm state past the stall-tolerance number of times", "state", m.fsm.Current().State.String(), "stuckCount", m.stuckCount, "err", err)
} else {
stuckFSMIndicatingGauge.Update(0)
}
return actAgainInterval
},
Expand All @@ -157,10 +170,10 @@ func (m *MessageExtractor) updateLastBlockToRead(ctx context.Context) time.Durat
}
if err != nil {
log.Error("Error fetching header to update last block to read in MEL", "err", err)
return m.retryInterval
return m.config.RetryInterval
}
m.lastBlockToRead.Store(header.Number.Uint64())
return m.retryInterval
return m.config.RetryInterval
}

func (m *MessageExtractor) CurrentFSMState() FSMState {
Expand Down Expand Up @@ -349,6 +362,6 @@ func (m *MessageExtractor) Act(ctx context.Context) (time.Duration, error) {
case Reorging:
return m.reorg(ctx, current)
default:
return m.retryInterval, fmt.Errorf("invalid state: %s", current.State)
return m.config.RetryInterval, fmt.Errorf("invalid state: %s", current.State)
}
}
26 changes: 25 additions & 1 deletion arbnode/mel/runner/mel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -23,6 +24,29 @@ import (

var _ ParentChainReader = (*mockParentChainReader)(nil)

func TestMessageExtractorStallTriggersMetric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := DefaultMessageExtractionConfig
cfg.StallTolerance = 2
cfg.RetryInterval = 100 * time.Millisecond
extractor, err := NewMessageExtractor(
cfg,
&mockParentChainReader{},
&chaininfo.RollupAddresses{},
NewDatabase(rawdb.NewMemoryDatabase()),
&mockMessageConsumer{},
[]daprovider.Reader{},
)
require.NoError(t, err)
require.True(t, stuckFSMIndicatingGauge.Snapshot().Value() == 0)
require.NoError(t, extractor.Start(ctx))
// MEL will be stuck at the 'Start' state as HeadMelState is not yet stored in the db
// so after RetryInterval*StallTolerance amount of time the metric should have been set to 1
time.Sleep(cfg.RetryInterval*time.Duration(cfg.StallTolerance) + 50*time.Millisecond) // #nosec G115
require.True(t, stuckFSMIndicatingGauge.Snapshot().Value() == 1)
}

func TestMessageExtractor(t *testing.T) {
ctx := context.Background()
emptyblk0 := types.NewBlock(&types.Header{Number: common.Big1}, nil, nil, nil)
Expand All @@ -45,12 +69,12 @@ func TestMessageExtractor(t *testing.T) {
melDb := NewDatabase(arbDb)
messageConsumer := &mockMessageConsumer{}
extractor, err := NewMessageExtractor(
DefaultMessageExtractionConfig,
parentChainReader,
&chaininfo.RollupAddresses{},
melDb,
messageConsumer,
[]daprovider.Reader{},
0,
)
extractor.StopWaiter.Start(ctx, extractor)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions arbnode/mel/runner/process_next_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
// Process the next block in the parent chain and extracts messages.
processAction, ok := current.SourceEvent.(processNextBlock)
if !ok {
return m.retryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
return m.config.RetryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
}
preState := processAction.melState
if preState.GetDelayedMessageBacklog() == nil { // Safety check since its relevant for native mode
return m.retryInterval, errors.New("detected nil DelayedMessageBacklog of melState, shouldnt be possible")
return m.config.RetryInterval, errors.New("detected nil DelayedMessageBacklog of melState, shouldnt be possible")
}
// If the current parent chain block is not safe/finalized we wait till it becomes safe/finalized as determined by the ReadMode
if m.config.ReadMode != "latest" && preState.ParentChainBlockNumber+1 > m.lastBlockToRead.Load() {
return m.retryInterval, nil
return m.config.RetryInterval, nil
}
parentChainBlock, err := m.parentChainReader.HeaderByNumber(
ctx,
Expand All @@ -60,9 +60,9 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
close(m.caughtUpChan)
}
}
return m.retryInterval, nil
return m.config.RetryInterval, nil
} else {
return m.retryInterval, err
return m.config.RetryInterval, err
}
}
if parentChainBlock.ParentHash != preState.ParentChainBlockHash {
Expand All @@ -73,7 +73,7 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
}
// Conditionally prefetch logs for upcoming block/s
if err = m.logsPreFetcher.fetch(ctx, preState); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
postState, msgs, delayedMsgs, batchMetas, err := melextraction.ExtractMessages(
ctx,
Expand All @@ -85,7 +85,7 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
m.logsPreFetcher,
)
if err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
// Begin the next FSM state immediately.
return 0, m.fsm.Do(saveMessages{
Expand Down
8 changes: 4 additions & 4 deletions arbnode/mel/runner/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import (
func (m *MessageExtractor) reorg(ctx context.Context, current *fsm.CurrentState[action, FSMState]) (time.Duration, error) {
reorgAction, ok := current.SourceEvent.(reorgToOldBlock)
if !ok {
return m.retryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
return m.config.RetryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
}
currentDirtyState := reorgAction.melState
if currentDirtyState.ParentChainBlockNumber == 0 {
return m.retryInterval, errors.New("invalid reorging stage, ParentChainBlockNumber of current mel state has reached 0")
return m.config.RetryInterval, errors.New("invalid reorging stage, ParentChainBlockNumber of current mel state has reached 0")
}
previousState, err := m.melDB.State(ctx, currentDirtyState.ParentChainBlockNumber-1)
if err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
// This adjusts delayedMessageBacklog
if err := currentDirtyState.ReorgTo(previousState); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
m.logsPreFetcher.reset()
return 0, m.fsm.Do(processNextBlock{
Expand Down
10 changes: 5 additions & 5 deletions arbnode/mel/runner/save_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ func (m *MessageExtractor) saveMessages(ctx context.Context, current *fsm.Curren
// Persists messages and a processed MEL state to the database.
saveAction, ok := current.SourceEvent.(saveMessages)
if !ok {
return m.retryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
return m.config.RetryInterval, fmt.Errorf("invalid action: %T", current.SourceEvent)
}
saveAction.postState.GetDelayedMessageBacklog().CommitDirties()
if err := m.melDB.SaveBatchMetas(ctx, saveAction.postState, saveAction.batchMetas); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
if err := m.melDB.SaveDelayedMessages(ctx, saveAction.postState, saveAction.delayedMessages); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
if err := m.msgConsumer.PushMessages(ctx, saveAction.preStateMsgCount, saveAction.messages); err != nil {
return m.retryInterval, err
return m.config.RetryInterval, err
}
if err := m.melDB.SaveState(ctx, saveAction.postState); err != nil {
log.Error("Error saving messages from MessageExtractor to MessageConsumer", "err", err)
return m.retryInterval, err
return m.config.RetryInterval, err
}
return 0, m.fsm.Do(processNextBlock{
melState: saveAction.postState,
Expand Down
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,12 @@ func getMessageExtractor(
}
}
msgExtractor, err := melrunner.NewMessageExtractor(
config.MessageExtraction,
l1client,
deployInfo,
melDB,
txStreamer,
dapReaders,
config.MessageExtraction.RetryInterval,
)
if err != nil {
return nil, err
Expand Down
Loading
Loading