diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index b0f5b871d6..b1e85fa53b 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -157,7 +157,7 @@ func NewTransactionStreamerForTest(t *testing.T, ctx context.Context, ownerAddre Fail(t, err) } execSeq := &execClientWrapper{execEngine, t} - inbox, err := NewTransactionStreamer(ctx, arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig) + inbox, err := NewTransactionStreamer(ctx, arbDb, bc.Config(), execSeq, nil, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig) if err != nil { Fail(t, err) } diff --git a/arbnode/node.go b/arbnode/node.go index 0f2ea75bf8..cc45aedcd7 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -56,6 +56,7 @@ import ( "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/testhelpers/env" "github.com/offchainlabs/nitro/wsbroadcastserver" ) @@ -210,6 +211,10 @@ func ConfigDefaultL1NonSequencerTest() *Config { config.Staker.Enable = false config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}} config.Bold.MinimumGapToParentAssertion = 0 + // Disable broadcast during sync only for path tests as it causes test timeouts + if env.GetTestStateScheme() == rawdb.PathScheme { + config.TransactionStreamer.DisableBroadcastDuringSync = false + } return &config } @@ -228,6 +233,10 @@ func ConfigDefaultL2Test() *Config { config.Staker.Enable = false config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}} config.TransactionStreamer = DefaultTransactionStreamerConfig + // Disable broadcast during sync only for path tests as it causes test timeouts + if env.GetTestStateScheme() == rawdb.PathScheme { + config.TransactionStreamer.DisableBroadcastDuringSync = false + } config.Bold.MinimumGapToParentAssertion = 0 return &config @@ -828,11 +837,12 @@ func getTransactionStreamer( l2Config *params.ChainConfig, exec execution.ExecutionClient, broadcastServer *broadcaster.Broadcaster, + syncMonitor *SyncMonitor, configFetcher ConfigFetcher, fatalErrChan chan error, ) (*TransactionStreamer, error) { transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) + txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, syncMonitor, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) if err != nil { return nil, err } @@ -1083,7 +1093,7 @@ func createNodeImpl( return nil, err } - txStreamer, err := getTransactionStreamer(ctx, arbDb, l2Config, executionClient, broadcastServer, configFetcher, fatalErrChan) + txStreamer, err := getTransactionStreamer(ctx, arbDb, l2Config, executionClient, broadcastServer, syncMonitor, configFetcher, fatalErrChan) if err != nil { return nil, err } @@ -1460,9 +1470,9 @@ func (n *Node) Start(ctx context.Context) error { if n.configFetcher != nil { n.configFetcher.Start(ctx) } - // Also make sure to call initialize on the sync monitor after the inbox reader, tx streamer, and block validator are started. - // Else sync might call inbox reader or tx streamer before they are started, and it will lead to panic. - n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator) + // Also make sure to call initialize on the sync monitor after the inbox reader, and block validator are started. + // Else sync might call inbox reader before it is started, and it will lead to panic. + n.SyncMonitor.Initialize(n.InboxReader, n.SeqCoordinator) n.SyncMonitor.Start(ctx) if n.ConsensusExecutionSyncer != nil { n.ConsensusExecutionSyncer.Start(ctx) diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index 625fc8cdef..7041ea57bc 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -3,6 +3,7 @@ package arbnode import ( "context" "sync" + "sync/atomic" "time" "github.com/spf13/pflag" @@ -17,10 +18,13 @@ type SyncMonitor struct { stopwaiter.StopWaiter config func() *SyncMonitorConfig inboxReader *InboxReader - txStreamer *TransactionStreamer coordinator *SeqCoordinator initialized bool + // Updates to these are pushed from the TransactionStreamer using UpdateMessageCount + currentMessageCount atomic.Uint64 + feedPendingMessageCount atomic.Uint64 + syncTargetLock sync.Mutex nextSyncTarget arbutil.MessageIndex syncTarget arbutil.MessageIndex @@ -48,13 +52,18 @@ func SyncMonitorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".msg-lag", DefaultSyncMonitorConfig.MsgLag, "allowed msg lag while still considered in sync") } -func (s *SyncMonitor) Initialize(inboxReader *InboxReader, txStreamer *TransactionStreamer, coordinator *SeqCoordinator) { +func (s *SyncMonitor) Initialize(inboxReader *InboxReader, coordinator *SeqCoordinator) { s.inboxReader = inboxReader - s.txStreamer = txStreamer s.coordinator = coordinator s.initialized = true } +// UpdateMessageCount updates the internal message count tracking +func (s *SyncMonitor) UpdateMessageCount(committed, feedPending arbutil.MessageIndex) { + s.currentMessageCount.Store(uint64(committed)) + s.feedPendingMessageCount.Store(uint64(feedPending)) +} + func (s *SyncMonitor) updateSyncTarget(ctx context.Context) time.Duration { nextSyncTarget, err := s.maxMessageCount() s.syncTargetLock.Lock() @@ -89,14 +98,10 @@ func (s *SyncMonitor) GetMaxMessageCount() (arbutil.MessageIndex, error) { } func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { - msgCount, err := s.txStreamer.GetMessageCount() - if err != nil { - return 0, err - } - - pending := s.txStreamer.FeedPendingMessageCount() - if pending > msgCount { - msgCount = pending + msgCount := arbutil.MessageIndex(s.currentMessageCount.Load()) + feedPending := arbutil.MessageIndex(s.feedPendingMessageCount.Load()) + if feedPending > msgCount { + msgCount = feedPending } if s.inboxReader != nil { @@ -149,14 +154,10 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} { } res["maxMessageCount"] = maxMsgCount - msgCount, err := s.txStreamer.GetMessageCount() - if err != nil { - res["msgCountError"] = err.Error() - return res - } + msgCount := arbutil.MessageIndex(s.currentMessageCount.Load()) res["msgCount"] = msgCount - res["feedPendingMessageCount"] = s.txStreamer.FeedPendingMessageCount() + res["feedPendingMessageCount"] = arbutil.MessageIndex(s.feedPendingMessageCount.Load()) if s.inboxReader != nil { batchSeen := s.inboxReader.GetLastSeenBatchCount() @@ -224,10 +225,7 @@ func (s *SyncMonitor) Synced() bool { return false } - msgCount, err := s.txStreamer.GetMessageCount() - if err != nil { - return false - } + msgCount := arbutil.MessageIndex(s.currentMessageCount.Load()) if syncTarget > msgCount { return false diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 77a6b9f5f9..d760cc1345 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -68,35 +68,39 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + syncMonitor *SyncMonitor trackBlockMetadataFrom arbutil.MessageIndex syncTillMessage arbutil.MessageIndex } type TransactionStreamerConfig struct { - MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` - MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` - ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` - SyncTillBlock uint64 `koanf:"sync-till-block"` - TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"` + MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` + MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` + ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + SyncTillBlock uint64 `koanf:"sync-till-block"` + TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"` + DisableBroadcastDuringSync bool `koanf:"disable-broadcast-during-sync" reload:"hot"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 50_000, - MaxReorgResequenceDepth: 1024, - ExecuteMessageLoopDelay: time.Millisecond * 100, - SyncTillBlock: 0, - TrackBlockMetadataFrom: 0, + MaxBroadcasterQueueSize: 50_000, + MaxReorgResequenceDepth: 1024, + ExecuteMessageLoopDelay: time.Millisecond * 100, + SyncTillBlock: 0, + TrackBlockMetadataFrom: 0, + DisableBroadcastDuringSync: true, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ - MaxBroadcasterQueueSize: 10_000, - MaxReorgResequenceDepth: 128 * 1024, - ExecuteMessageLoopDelay: time.Millisecond, - SyncTillBlock: 0, - TrackBlockMetadataFrom: 0, + MaxBroadcasterQueueSize: 10_000, + MaxReorgResequenceDepth: 128 * 1024, + ExecuteMessageLoopDelay: time.Millisecond, + SyncTillBlock: 0, + TrackBlockMetadataFrom: 0, + DisableBroadcastDuringSync: true, } func TransactionStreamerConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -105,6 +109,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block") f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number to start saving blockmetadata, 0 to disable") + f.Bool(prefix+".disable-broadcast-during-sync", DefaultTransactionStreamerConfig.DisableBroadcastDuringSync, "disable broadcasting historical messages during sync to prevent feed flooding") } func NewTransactionStreamer( @@ -113,6 +118,7 @@ func NewTransactionStreamer( chainConfig *params.ChainConfig, exec execution.ExecutionClient, broadcastServer *broadcaster.Broadcaster, + syncMonitor *SyncMonitor, fatalErrChan chan<- error, config TransactionStreamerConfigFetcher, snapSyncConfig *SnapSyncConfig, @@ -123,6 +129,7 @@ func NewTransactionStreamer( db: db, newMessageNotifier: make(chan struct{}, 1), broadcastServer: broadcastServer, + syncMonitor: syncMonitor, fatalErrChan: fatalErrChan, config: config, snapSyncConfig: snapSyncConfig, @@ -211,6 +218,15 @@ func (s *TransactionStreamer) ChainConfig() *params.ChainConfig { return s.chainConfig } +// updateSyncMonitor pushes the current message counts to the SyncMonitor +func (s *TransactionStreamer) updateSyncMonitor() { + if s.syncMonitor != nil { + committed, _ := s.GetMessageCount() + feedPending := s.FeedPendingMessageCount() + s.syncMonitor.UpdateMessageCount(committed, feedPending) + } +} + func (s *TransactionStreamer) cleanupInconsistentState() error { // If it doesn't exist yet, set the message count to 0 hasMessageCount, err := s.db.Has(messageCountKey) @@ -222,6 +238,7 @@ func (s *TransactionStreamer) cleanupInconsistentState() error { if err != nil { return err } + s.updateSyncMonitor() } // TODO remove trailing messageCountToMessage and messageCountToBlockPrefix entries return nil @@ -242,6 +259,7 @@ func (s *TransactionStreamer) ReorgAtAndEndBatch(batch ethdb.Batch, firstMsgIdxR if err != nil { return err } + s.updateSyncMonitor() return nil } @@ -406,7 +424,7 @@ func (s *TransactionStreamer) addMessagesAndReorg(batch ethdb.Batch, msgIdxOfFir BlockHash: &messagesResults[i].BlockHash, }) } - s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd) + s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd, false) if s.validator != nil { err = s.validator.Reorg(s.GetContext(), msgIdxOfFirstMsgToAdd) @@ -672,6 +690,9 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*message.Broad } } + // Update SyncMonitor with feed pending message count changes + s.updateSyncMonitor() + if s.broadcasterQueuedMessagesActiveReorg || len(s.broadcasterQueuedMessages) == 0 { // Broadcaster never triggered reorg or no messages to add return nil @@ -1118,7 +1139,7 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( if s.trackBlockMetadataFrom == 0 || msgIdx < s.trackBlockMetadataFrom { msgWithBlockInfo.BlockMetadata = nil } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx, true) return nil } @@ -1186,13 +1207,82 @@ func (s *TransactionStreamer) writeMessage(msgIdx arbutil.MessageIndex, msg arbo return nil } +// ShouldBroadcastDuringSync determines if messages should be broadcast during sync based on sync +// state and message position. When not synced, it only allows broadcasting of messages within the +// last 2 batches to prevent flooding feed clients with historical data during sync. +// This is split out as a separate function that contains the core decision logic without extra +// dependencies for testing. +func ShouldBroadcastDuringSync( + synced bool, + firstMsgIdx arbutil.MessageIndex, + msgCount int, + batchCount uint64, + batchThresholdMsgCount arbutil.MessageIndex, // Message count at (batchCount - 2) + haveBatchMetadata bool, // Whether we successfully got batch metadata +) bool { + if msgCount == 0 { + return false + } + + if synced { + return true + } + + // We're not synced, so check if these messages are within the 2 batch backlog threshold. + // If we can't determine the threshold (no metadata or not enough batches), fail open and broadcast. + if !haveBatchMetadata || batchCount < 2 { + return true + } + + // Check if the LAST message in this batch is within the backlog threshold + // #nosec G115 + lastMsgIdx := firstMsgIdx + arbutil.MessageIndex(msgCount) - 1 + return lastMsgIdx >= batchThresholdMsgCount +} + +// shouldBroadcastDuringSync gathers dependency data and calls the pure decision function +func (s *TransactionStreamer) shouldBroadcastDuringSync(firstMsgIdx arbutil.MessageIndex, msgCount int) bool { + // Gather sync status + synced := s.syncMonitor == nil || s.syncMonitor.Synced() + + // Gather batch metadata + var batchCount uint64 + var batchThresholdMsgCount arbutil.MessageIndex + haveBatchMetadata := false + + if s.inboxReader != nil && s.inboxReader.tracker != nil { + count, err := s.inboxReader.tracker.GetBatchCount() + if err == nil { + batchCount = count + if batchCount >= 2 { + batchMeta, err := s.inboxReader.tracker.GetBatchMetadata(batchCount - 2) + if err == nil { + batchThresholdMsgCount = batchMeta.MessageCount + haveBatchMetadata = true + } + } + } + } + + return ShouldBroadcastDuringSync(synced, firstMsgIdx, msgCount, batchCount, batchThresholdMsgCount, haveBatchMetadata) +} + func (s *TransactionStreamer) broadcastMessages( msgs []arbostypes.MessageWithMetadataAndBlockInfo, firstMsgIdx arbutil.MessageIndex, + force bool, ) { if s.broadcastServer == nil { return } + + // Check if we should broadcast during sync + if !force && s.config().DisableBroadcastDuringSync { + if !s.shouldBroadcastDuringSync(firstMsgIdx, len(msgs)) { + return + } + } + if err := s.broadcastServer.BroadcastMessages(msgs, firstMsgIdx); err != nil { log.Error("failed broadcasting messages", "firstMsgIdx", firstMsgIdx, "err", err) } @@ -1224,6 +1314,8 @@ func (s *TransactionStreamer) writeMessages(firstMsgIdx arbutil.MessageIndex, me return err } + s.updateSyncMonitor() + select { case s.newMessageNotifier <- struct{}{}: default: @@ -1405,7 +1497,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { BlockHash: &msgResult.BlockHash, BlockMetadata: msgAndBlockInfo.BlockMetadata, } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute, false) return msgIdxToExecute+1 <= consensusHeadMsgIdx } diff --git a/arbnode/transaction_streamer_test.go b/arbnode/transaction_streamer_test.go new file mode 100644 index 0000000000..d406d491bd --- /dev/null +++ b/arbnode/transaction_streamer_test.go @@ -0,0 +1,251 @@ +// Copyright 2021-2025, Offchain Labs, Inc. +// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md + +package arbnode + +import ( + "testing" + + "github.com/offchainlabs/nitro/arbutil" +) + +func TestShouldBroadcastDuringSync(t *testing.T) { + tests := []struct { + name string + synced bool + firstMsgIdx arbutil.MessageIndex + msgCount int + batchCount uint64 + batchThresholdMsgCount arbutil.MessageIndex + haveBatchMetadata bool + expectedBroadcast bool + description string + }{ + { + name: "empty batch - never broadcast", + msgCount: 0, + expectedBroadcast: false, + description: "Empty message batches should never be broadcast", + }, + { + name: "synced state - always broadcast", + synced: true, + msgCount: 5, + expectedBroadcast: true, + description: "When node is synced, should always broadcast", + }, + { + name: "no batch metadata - fail open and broadcast", + synced: false, + msgCount: 5, + haveBatchMetadata: false, + expectedBroadcast: true, + description: "When batch metadata is unavailable, fail open and broadcast", + }, + { + name: "batch info error with valid batch count - fail open and broadcast", + synced: false, + firstMsgIdx: 195, + msgCount: 5, + batchCount: 10, + batchThresholdMsgCount: 199, + haveBatchMetadata: false, // This simulates the error getting metadata + expectedBroadcast: true, + description: "When batch metadata fetch fails (despite valid batch count), fail open and broadcast", + }, + { + name: "insufficient batches - fail open and broadcast", + synced: false, + msgCount: 5, + batchCount: 1, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "When less than 2 batches exist, fail open and broadcast", + }, + { + name: "not synced - messages before threshold - no broadcast", + synced: false, + firstMsgIdx: 100, + msgCount: 10, + batchCount: 10, + batchThresholdMsgCount: 200, // Messages 100-109 are all before threshold of 200 + haveBatchMetadata: true, + expectedBroadcast: false, + description: "When not synced and all messages are before backlog threshold, should not broadcast", + }, + { + name: "not synced - last message at threshold - broadcast", + synced: false, + firstMsgIdx: 195, + msgCount: 5, + batchCount: 10, + batchThresholdMsgCount: 199, // Messages 195-199, last message (199) is at threshold + haveBatchMetadata: true, + expectedBroadcast: true, + description: "When not synced but last message reaches threshold, should broadcast", + }, + { + name: "not synced - last message after threshold - broadcast", + synced: false, + firstMsgIdx: 200, + msgCount: 5, + batchCount: 10, + batchThresholdMsgCount: 199, // Messages 200-204, last message (204) is after threshold + haveBatchMetadata: true, + expectedBroadcast: true, + description: "When not synced but last message is after threshold, should broadcast", + }, + { + name: "not synced - boundary case - first message before, last after threshold", + synced: false, + firstMsgIdx: 195, + msgCount: 10, + batchCount: 10, + batchThresholdMsgCount: 199, // Messages 195-204, crosses threshold at message 199 + haveBatchMetadata: true, + expectedBroadcast: true, + description: "When batch spans threshold boundary, should broadcast entire batch", + }, + { + name: "edge case - single message exactly at threshold", + synced: false, + firstMsgIdx: 100, + msgCount: 1, + batchCount: 5, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Single message exactly at threshold should broadcast", + }, + { + name: "edge case - single message just before threshold", + synced: false, + firstMsgIdx: 99, + msgCount: 1, + batchCount: 5, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: false, + description: "Single message just before threshold should not broadcast", + }, + { + name: "edge case - single message just after threshold", + synced: false, + firstMsgIdx: 101, + msgCount: 1, + batchCount: 5, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Single message just after threshold should broadcast", + }, + { + name: "batch count exactly 2 - should work normally", + synced: false, + firstMsgIdx: 50, + msgCount: 5, + batchCount: 2, + batchThresholdMsgCount: 45, // batch 0 ends at message 45 + haveBatchMetadata: true, + expectedBroadcast: true, + description: "With exactly 2 batches, threshold calculation should work", + }, + { + name: "large batch spanning multiple old batches", + synced: false, + firstMsgIdx: 1000, + msgCount: 500, + batchCount: 100, + batchThresholdMsgCount: 1200, // Messages 1000-1499, threshold at 1200 + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Large batch that includes messages after threshold should broadcast", + }, + { + name: "metadata available but batch count is 0", + synced: false, + firstMsgIdx: 10, + msgCount: 5, + batchCount: 0, + batchThresholdMsgCount: 0, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Zero batch count should fail open and broadcast", + }, + // Threshold calculation test cases + { + name: "threshold calc - all messages before threshold", + synced: false, + firstMsgIdx: 90, + msgCount: 5, // messages 90-94 + batchCount: 10, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: false, + description: "All messages before threshold should not broadcast", + }, + { + name: "threshold calc - first messages before, last at threshold", + synced: false, + firstMsgIdx: 96, + msgCount: 5, // messages 96-100 + batchCount: 10, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Messages reaching threshold should broadcast", + }, + { + name: "threshold calc - all messages after threshold", + synced: false, + firstMsgIdx: 101, + msgCount: 5, // messages 101-105 + batchCount: 10, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "All messages after threshold should broadcast", + }, + { + name: "threshold calc - single message at threshold boundary", + synced: false, + firstMsgIdx: 100, + msgCount: 1, // message 100 + batchCount: 10, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Single message at threshold boundary should broadcast", + }, + { + name: "threshold calc - large batch spanning threshold", + synced: false, + firstMsgIdx: 50, + msgCount: 100, // messages 50-149 + batchCount: 10, + batchThresholdMsgCount: 100, + haveBatchMetadata: true, + expectedBroadcast: true, + description: "Large batch spanning threshold should broadcast", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ShouldBroadcastDuringSync( + tt.synced, + tt.firstMsgIdx, + tt.msgCount, + tt.batchCount, + tt.batchThresholdMsgCount, + tt.haveBatchMetadata, + ) + + if result != tt.expectedBroadcast { + t.Errorf("ShouldBroadcastDuringSync() = %v, expected %v. %s", result, tt.expectedBroadcast, tt.description) + t.Logf("Test parameters: synced=%v, firstMsgIdx=%d, msgCount=%d, batchCount=%d, threshold=%d, haveBatchMetadata=%v", + tt.synced, tt.firstMsgIdx, tt.msgCount, tt.batchCount, tt.batchThresholdMsgCount, tt.haveBatchMetadata) + } + }) + } +} diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index a49036d649..eb836d0efb 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -362,7 +362,7 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { } // check that nodeBOutputFeedReader also processed the transaction - _, err = WaitForTx(ctx, testClientNodeBOutputFeedReader.Client, tx.Hash(), time.Second*5) + _, err = WaitForTx(ctx, testClientNodeBOutputFeedReader.Client, tx.Hash(), time.Second*15) Require(t, err) l2balance, err = testClientNodeBOutputFeedReader.Client.BalanceAt(ctx, builder.L2Info.GetAddress("User2"), nil) Require(t, err)