Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewTransactionStreamerForTest(t *testing.T, ctx context.Context, ownerAddre
Fail(t, err)
}
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(ctx, arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
inbox, err := NewTransactionStreamer(ctx, arbDb, bc.Config(), execSeq, nil, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
if err != nil {
Fail(t, err)
}
Expand Down
14 changes: 8 additions & 6 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.Bold.MinimumGapToParentAssertion = 0

config.TransactionStreamer.DisableBroadcastDuringSync = true
return &config
}

Expand All @@ -231,6 +231,7 @@ func ConfigDefaultL2Test() *Config {
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig
config.TransactionStreamer.DisableBroadcastDuringSync = false
config.Bold.MinimumGapToParentAssertion = 0

return &config
Expand Down Expand Up @@ -887,11 +888,12 @@ func getTransactionStreamer(
l2Config *params.ChainConfig,
exec execution.ExecutionClient,
broadcastServer *broadcaster.Broadcaster,
syncMonitor *SyncMonitor,
configFetcher ConfigFetcher,
fatalErrChan chan error,
) (*TransactionStreamer, error) {
transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, syncMonitor, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
Copy link
Contributor

@diegoximenes diegoximenes Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncMonitor depends on TransactionStreamer today, and this creates a circular dependency between them 😬.

This indicates to me that they could be better placed in a single component.
How about moving SyncMonitor logic to TransactionStreamer and then dropping SyncMonitor?

I created this linear task in order to move logic from SyncMonitor to ConsensusExecutionSyncer, but it is OK to move it TransactionStreamer 🙂.
ConsensusExecutionSyncer already depends on TransactionStreamer, and can call it for what it needs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a PR to remove the circular dependency: #3538. If you like the approach then I'll rebase this PR ono that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am re-reading your comment now and I think I got a bit carried away when I saw NIT-3649 and the work I did on #3538 isn't really related to the circular dependency you're talking about here, rather it's related to breaking the consensus<->execution circular dependency.

I've spent some time thinking about how to break the circular dependency between TransactionStreamer and arbnode.SyncMonitor, and TransactionStreamer and BroadcastSyncChecker and I can't see a good way to do it without moving all the logic into TransactionStreamer, which I don't really like because SyncMonitor already has a fairly clear responsibility. I am thinking about alternatives now...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in 65c6435

Remove TransactionStreamer<->SyncMonitor circ dep
TransactionStreamer now pushes updates to the SyncMonitor of the
currentMessageCount and feedPendingMessageCount every time it updates
either of these in its own state. This means SyncMonitor doesn't need to
have a reference to TransactionStreamer.

The logic for checking if the TransactionStreamer should broadcast to
the feed has been moved into TransactionStreamer, with the core decision
logic split out for testing.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1142,7 +1144,7 @@ func createNodeImpl(
return nil, err
}

txStreamer, err := getTransactionStreamer(ctx, arbDb, l2Config, executionClient, broadcastServer, configFetcher, fatalErrChan)
txStreamer, err := getTransactionStreamer(ctx, arbDb, l2Config, executionClient, broadcastServer, syncMonitor, configFetcher, fatalErrChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1521,9 +1523,9 @@ func (n *Node) Start(ctx context.Context) error {
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
// Also make sure to call initialize on the sync monitor after the inbox reader, tx streamer, and block validator are started.
// Else sync might call inbox reader or tx streamer before they are started, and it will lead to panic.
n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator)
// Also make sure to call initialize on the sync monitor after the inbox reader, and block validator are started.
// Else sync might call inbox reader before it is started, and it will lead to panic.
n.SyncMonitor.Initialize(n.InboxReader, n.SeqCoordinator)
n.SyncMonitor.Start(ctx)
if n.ConsensusExecutionSyncer != nil {
n.ConsensusExecutionSyncer.Start(ctx)
Expand Down
40 changes: 19 additions & 21 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package arbnode
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/spf13/pflag"
Expand All @@ -17,10 +18,13 @@ type SyncMonitor struct {
stopwaiter.StopWaiter
config func() *SyncMonitorConfig
inboxReader *InboxReader
txStreamer *TransactionStreamer
coordinator *SeqCoordinator
initialized bool

// Updates to these are pushed from the TransactionStreamer using UpdateMessageCount
currentMessageCount atomic.Uint64
feedPendingMessageCount atomic.Uint64

syncTargetLock sync.Mutex
nextSyncTarget arbutil.MessageIndex
syncTarget arbutil.MessageIndex
Expand Down Expand Up @@ -48,13 +52,18 @@ func SyncMonitorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".msg-lag", DefaultSyncMonitorConfig.MsgLag, "allowed msg lag while still considered in sync")
}

func (s *SyncMonitor) Initialize(inboxReader *InboxReader, txStreamer *TransactionStreamer, coordinator *SeqCoordinator) {
func (s *SyncMonitor) Initialize(inboxReader *InboxReader, coordinator *SeqCoordinator) {
s.inboxReader = inboxReader
s.txStreamer = txStreamer
s.coordinator = coordinator
s.initialized = true
}

// UpdateMessageCount updates the internal message count tracking
func (s *SyncMonitor) UpdateMessageCount(committed, feedPending arbutil.MessageIndex) {
s.currentMessageCount.Store(uint64(committed))
s.feedPendingMessageCount.Store(uint64(feedPending))
}

func (s *SyncMonitor) updateSyncTarget(ctx context.Context) time.Duration {
nextSyncTarget, err := s.maxMessageCount()
s.syncTargetLock.Lock()
Expand Down Expand Up @@ -89,14 +98,10 @@ func (s *SyncMonitor) GetMaxMessageCount() (arbutil.MessageIndex, error) {
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
return 0, err
}

pending := s.txStreamer.FeedPendingMessageCount()
if pending > msgCount {
msgCount = pending
msgCount := arbutil.MessageIndex(s.currentMessageCount.Load())
feedPending := arbutil.MessageIndex(s.feedPendingMessageCount.Load())
if feedPending > msgCount {
msgCount = feedPending
}

if s.inboxReader != nil {
Expand Down Expand Up @@ -149,14 +154,10 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} {
}
res["maxMessageCount"] = maxMsgCount

msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
res["msgCountError"] = err.Error()
return res
}
msgCount := arbutil.MessageIndex(s.currentMessageCount.Load())
res["msgCount"] = msgCount

res["feedPendingMessageCount"] = s.txStreamer.FeedPendingMessageCount()
res["feedPendingMessageCount"] = arbutil.MessageIndex(s.feedPendingMessageCount.Load())

if s.inboxReader != nil {
batchSeen := s.inboxReader.GetLastSeenBatchCount()
Expand Down Expand Up @@ -224,10 +225,7 @@ func (s *SyncMonitor) Synced() bool {
return false
}

msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
return false
}
msgCount := arbutil.MessageIndex(s.currentMessageCount.Load())

if syncTarget > msgCount {
return false
Expand Down
97 changes: 94 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge
syncMonitor *SyncMonitor

trackBlockMetadataFrom arbutil.MessageIndex
syncTillMessage arbutil.MessageIndex
Expand All @@ -84,6 +85,7 @@ type TransactionStreamerConfig struct {
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"`
DisableBroadcastDuringSync bool `koanf:"disable-broadcast-during-sync" reload:"hot"`
ShutdownOnBlockhashMismatch bool `koanf:"shutdown-on-blockhash-mismatch"`
}

Expand All @@ -95,6 +97,7 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
DisableBroadcastDuringSync: true,
ShutdownOnBlockhashMismatch: false,
}

Expand All @@ -104,6 +107,7 @@ var TestTransactionStreamerConfig = TransactionStreamerConfig{
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
DisableBroadcastDuringSync: true,
ShutdownOnBlockhashMismatch: false,
}

Expand All @@ -113,6 +117,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block")
f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number to start saving blockmetadata, 0 to disable")
f.Bool(prefix+".disable-broadcast-during-sync", DefaultTransactionStreamerConfig.DisableBroadcastDuringSync, "disable broadcasting historical messages during sync to prevent feed flooding")
f.Bool(prefix+".shutdown-on-blockhash-mismatch", DefaultTransactionStreamerConfig.ShutdownOnBlockhashMismatch, "if set the node gracefully shuts down upon detecting mismatch in feed and locally computed blockhash. This is turned off by default")
}

Expand All @@ -122,6 +127,7 @@ func NewTransactionStreamer(
chainConfig *params.ChainConfig,
exec execution.ExecutionClient,
broadcastServer *broadcaster.Broadcaster,
syncMonitor *SyncMonitor,
fatalErrChan chan<- error,
config TransactionStreamerConfigFetcher,
snapSyncConfig *SnapSyncConfig,
Expand All @@ -132,6 +138,7 @@ func NewTransactionStreamer(
db: db,
newMessageNotifier: make(chan struct{}, 1),
broadcastServer: broadcastServer,
syncMonitor: syncMonitor,
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
Expand Down Expand Up @@ -220,6 +227,15 @@ func (s *TransactionStreamer) ChainConfig() *params.ChainConfig {
return s.chainConfig
}

// updateSyncMonitor pushes the current message counts to the SyncMonitor
func (s *TransactionStreamer) updateSyncMonitor() {
if s.syncMonitor != nil {
committed, _ := s.GetMessageCount()
feedPending := s.FeedPendingMessageCount()
s.syncMonitor.UpdateMessageCount(committed, feedPending)
}
}

func (s *TransactionStreamer) cleanupInconsistentState() error {
// If it doesn't exist yet, set the message count to 0
hasMessageCount, err := s.db.Has(messageCountKey)
Expand All @@ -231,6 +247,7 @@ func (s *TransactionStreamer) cleanupInconsistentState() error {
if err != nil {
return err
}
s.updateSyncMonitor()
}
// TODO remove trailing messageCountToMessage and messageCountToBlockPrefix entries
return nil
Expand All @@ -251,6 +268,7 @@ func (s *TransactionStreamer) ReorgAtAndEndBatch(batch ethdb.Batch, firstMsgIdxR
if err != nil {
return err
}
s.updateSyncMonitor()
return nil
}

Expand Down Expand Up @@ -416,7 +434,7 @@ func (s *TransactionStreamer) addMessagesAndReorg(batch ethdb.Batch, msgIdxOfFir
BlockMetadata: nil,
})
}
s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd)
s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd, false)

if s.validator != nil {
err = s.validator.Reorg(s.GetContext(), msgIdxOfFirstMsgToAdd)
Expand Down Expand Up @@ -707,6 +725,9 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*message.Broad
}
}

// Update SyncMonitor with feed pending message count changes
s.updateSyncMonitor()

if s.broadcasterQueuedMessagesActiveReorg || len(s.broadcasterQueuedMessages) == 0 {
// Broadcaster never triggered reorg or no messages to add
return nil
Expand Down Expand Up @@ -1155,7 +1176,7 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
if s.trackBlockMetadataFrom == 0 || msgIdx < s.trackBlockMetadataFrom {
msgWithBlockInfo.BlockMetadata = nil
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx, true)

return nil
}
Expand Down Expand Up @@ -1223,14 +1244,82 @@ func (s *TransactionStreamer) writeMessage(msgIdx arbutil.MessageIndex, msg arbo
return nil
}

// ShouldBroadcastDuringSync determines if messages should be broadcast during sync based on sync
// state and message position. When not synced, it only allows broadcasting of messages within the
// last 2 batches to prevent flooding feed clients with historical data during sync.
// This is split out as a separate function that contains the core decision logic without extra
// dependencies for testing.
func ShouldBroadcastDuringSync(
synced bool,
firstMsgIdx arbutil.MessageIndex,
msgCount int,
batchCount uint64,
batchThresholdMsgCount arbutil.MessageIndex, // Message count at (batchCount - 2)
haveBatchMetadata bool, // Whether we successfully got batch metadata
) bool {
if msgCount == 0 {
return false
}

if synced {
return true
}

// We're not synced, so check if these messages are within the 2 batch backlog threshold.
// If we can't determine the threshold (no metadata or not enough batches), fail open and broadcast.
if !haveBatchMetadata || batchCount < 2 {
return true
}

// Check if the LAST message in this batch is within the backlog threshold
// #nosec G115
lastMsgIdx := firstMsgIdx + arbutil.MessageIndex(msgCount) - 1
return lastMsgIdx >= batchThresholdMsgCount
}

// shouldBroadcastDuringSync gathers dependency data and calls the pure decision function
func (s *TransactionStreamer) shouldBroadcastDuringSync(firstMsgIdx arbutil.MessageIndex, msgCount int) bool {
// Gather sync status
synced := s.syncMonitor == nil || s.syncMonitor.Synced()

// Gather batch metadata
var batchCount uint64
var batchThresholdMsgCount arbutil.MessageIndex
haveBatchMetadata := false

if s.inboxReader != nil && s.inboxReader.tracker != nil {
count, err := s.inboxReader.tracker.GetBatchCount()
if err == nil {
batchCount = count
if batchCount >= 2 {
batchMeta, err := s.inboxReader.tracker.GetBatchMetadata(batchCount - 2)
if err == nil {
batchThresholdMsgCount = batchMeta.MessageCount
haveBatchMetadata = true
}
}
}
}

return ShouldBroadcastDuringSync(synced, firstMsgIdx, msgCount, batchCount, batchThresholdMsgCount, haveBatchMetadata)
}

func (s *TransactionStreamer) broadcastMessages(
msgs []arbostypes.MessageWithMetadataAndBlockInfo,
firstMsgIdx arbutil.MessageIndex,
force bool,
) {
if s.broadcastServer == nil {
return
}

// Check if we should broadcast during sync
if !force && s.config().DisableBroadcastDuringSync {
if !s.shouldBroadcastDuringSync(firstMsgIdx, len(msgs)) {
return
}
}

feedMsgs := make([]*message.BroadcastFeedMessage, 0, len(msgs))
for i, msg := range msgs {
idx := firstMsgIdx + arbutil.MessageIndex(i) // #nosec G115
Expand Down Expand Up @@ -1273,6 +1362,8 @@ func (s *TransactionStreamer) writeMessages(firstMsgIdx arbutil.MessageIndex, me
return err
}

s.updateSyncMonitor()

select {
case s.newMessageNotifier <- struct{}{}:
default:
Expand Down Expand Up @@ -1458,7 +1549,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
BlockHash: &msgResult.BlockHash,
BlockMetadata: msgAndBlockInfo.BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute, false)

messageTimer.Update(time.Since(start).Nanoseconds())

Expand Down
Loading
Loading