Skip to content

Conversation

Tristan-Wilson
Copy link
Member

@Tristan-Wilson Tristan-Wilson commented Aug 21, 2025

Fixes NIT-3702

During node synchronization and catch-up, broadcasting all historical messages can flood connected feed clients with redundant data. This change introduces a check before broadcasting that only messages that are newer than the network's head minus the last two batches for backlog maintenance.

The new behavior is to only broadcast messages within the last 2 batches when not synced. New sequencer messages are always broadcast regardless of sync state (this is the sequencer feed, so messages produced by the sequencer must be in it).

This feature is enabled by default so operators don't need to make any config changes to start using it. It has hot-reloadable config flag that can be used to disable it:

--node.transaction-streamer.disable-broadcast-during-sync

Summary of new broadcasting behavior:

  • New sequencer messages: always broadcast
  • When synced: broadcast everything normally
  • When syncing: only broadcast last 2 batches
  • Fail-safe: any error condition defaults to broadcasting

During node synchronization and catch-up, broadcasting all historical messages
can flood connected feed clients with redundant data. This change introduces
a check before broadcasting that only messages that are newer than the
network's head minus the last two batches for backlog maintenance.

The new behavior is to only broadcast messages within the last 2 batches
when not synced. New sequencer messages are always broadcast regardless
of sync state (this is the sequencer feed, so messages produced by the
sequencer must be in it).

This feature is enabled by default so operators don't need to make any
config changes to start using it. It has hot-reloadable config flag that
can be used to disable it:
```
--node.transaction-streamer.disable-broadcast-during-sync
```

Summary of new broadcasting behavior:
- New sequencer messages: always broadcast
- When synced: broadcast everything normally
- When syncing: only broadcast last 2 batches
- Fail-safe: any error condition defaults to broadcasting
Copy link

codecov bot commented Aug 21, 2025

Codecov Report

❌ Patch coverage is 50.00000% with 18 lines in your changes missing coverage. Please review.
✅ Project coverage is 22.71%. Comparing base (a7ff1b3) to head (32d3a70).

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #3526   +/-   ##
=======================================
  Coverage   22.71%   22.71%           
=======================================
  Files         387      388    +1     
  Lines       58652    58683   +31     
=======================================
+ Hits        13322    13332   +10     
- Misses      43296    43312   +16     
- Partials     2034     2039    +5     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@diegoximenes diegoximenes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🙂

) (*TransactionStreamer, error) {
transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
txStreamer, err := NewTransactionStreamer(ctx, arbDb, l2Config, exec, broadcastServer, syncMonitor, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
Copy link
Contributor

@diegoximenes diegoximenes Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncMonitor depends on TransactionStreamer today, and this creates a circular dependency between them 😬.

This indicates to me that they could be better placed in a single component.
How about moving SyncMonitor logic to TransactionStreamer and then dropping SyncMonitor?

I created this linear task in order to move logic from SyncMonitor to ConsensusExecutionSyncer, but it is OK to move it TransactionStreamer 🙂.
ConsensusExecutionSyncer already depends on TransactionStreamer, and can call it for what it needs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a PR to remove the circular dependency: #3538. If you like the approach then I'll rebase this PR ono that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am re-reading your comment now and I think I got a bit carried away when I saw NIT-3649 and the work I did on #3538 isn't really related to the circular dependency you're talking about here, rather it's related to breaking the consensus<->execution circular dependency.

I've spent some time thinking about how to break the circular dependency between TransactionStreamer and arbnode.SyncMonitor, and TransactionStreamer and BroadcastSyncChecker and I can't see a good way to do it without moving all the logic into TransactionStreamer, which I don't really like because SyncMonitor already has a fairly clear responsibility. I am thinking about alternatives now...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in 65c6435

Remove TransactionStreamer<->SyncMonitor circ dep
TransactionStreamer now pushes updates to the SyncMonitor of the
currentMessageCount and feedPendingMessageCount every time it updates
either of these in its own state. This means SyncMonitor doesn't need to
have a reference to TransactionStreamer.

The logic for checking if the TransactionStreamer should broadcast to
the feed has been moved into TransactionStreamer, with the core decision
logic split out for testing.

s.delayedBridge = delayedBridge

if s.syncMonitor != nil && inboxReader != nil && inboxReader.tracker != nil {
s.broadcastChecker = NewBroadcastSyncChecker(s.syncMonitor, inboxReader.tracker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also introducing a circular dependency:
TransactionStreamer -> BroadcastSyncChecker -> SyncMonitor -> TransactionStreamer

This also indicates to me that all of them are better placed in a single component.
How about moving BroadcastSyncChecker logic to TransactionStreamer too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed above.

s.inboxReader = inboxReader
s.delayedBridge = delayedBridge

if s.syncMonitor != nil && inboxReader != nil && inboxReader.tracker != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.inboxReader and s.inboxReader.tracker nil checks here are redundant, they will never be nil here, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the separate BroadcastSyncChecker.

}

// ShouldBroadcast determines if messages should be broadcast based on sync state and message position
func (b *BroadcastSyncChecker) ShouldBroadcast(firstMsgIdx arbutil.MessageIndex, msgCount int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding more information, through comments, of what motivated ShouldBroadcast's strategy, e.g., writing a comment here based on the PR description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments.

Comment on lines 69 to 76
{
name: "batch info error - always broadcast",
synced: false,
msgCount: 5,
batchInfoError: true,
expected: true,
description: "When batch info provider has errors, fail open and broadcast",
},
Copy link
Contributor

@diegoximenes diegoximenes Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShouldBroadcast could be returning true due to batchCount being equal to zero too.
How about using the same parameters used by "not synced - last message at threshold - broadcast" case, but setting batchInfoError to true instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for this.

}
}

func TestBroadcastSyncCheckerEdgeCases(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why not defining those threshold tests in TestBroadcastSyncChecker test cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved them into the main test

@Tristan-Wilson
Copy link
Member Author

Sorry for the assignee spam, was looking at the wrong PR

@Tristan-Wilson Tristan-Wilson changed the base branch from master to consensus-pushes-sync-info September 29, 2025 11:45
@Tristan-Wilson Tristan-Wilson marked this pull request as draft September 29, 2025 11:47
@Tristan-Wilson Tristan-Wilson force-pushed the fix-historical-feed-flood branch from 47b32a4 to 9baa2ad Compare September 29, 2025 13:28
@Tristan-Wilson Tristan-Wilson changed the base branch from consensus-pushes-sync-info to master September 29, 2025 13:29
Tristan-Wilson and others added 2 commits September 29, 2025 18:30
TransactionStreamer now pushes updates to the SyncMonitor of the
currentMessageCount and feedPendingMessageCount every time it updates
either of these in its own state. This means SyncMonitor doesn't need to
have a reference to TransactionStreamer.

The logic for checking if the TransactionStreamer should broadcast to
the feed has been moved into TransactionStreamer, with the core decision
logic split out for testing.
@Tristan-Wilson Tristan-Wilson marked this pull request as ready for review September 29, 2025 16:35
Copy link

github-actions bot commented Oct 3, 2025

❌ 24 Tests Failed:

Tests completed Failed Passed Skipped
1996 24 1972 0
View the top 3 failed tests by shortest run time
TestRedisProduceComplex/two_producers,_some_consumers_killed,_others_should_take_over_their_work,_unequal_number_of_requests_from_producers
Stack Traces | 2.140s run time
... [CONTENT TRUNCATED: Keeping last 20 lines]
�[36mDEBUG�[0m[10-07|02:05:01.600] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.600] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.600] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=0
�[36mDEBUG�[0m[10-07|02:05:01.600] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=1
�[36mDEBUG�[0m[10-07|02:05:01.605] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.605] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.605] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=0
�[36mDEBUG�[0m[10-07|02:05:01.605] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=1
�[36mDEBUG�[0m[10-07|02:05:01.610] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.611] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.611] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=0
�[36mDEBUG�[0m[10-07|02:05:01.611] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=1
�[36mDEBUG�[0m[10-07|02:05:01.616] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.616] redis producer: check responses starting
�[36mDEBUG�[0m[10-07|02:05:01.616] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=0 �[36mchecked�[0m=0
�[36mDEBUG�[0m[10-07|02:05:01.616] request timed out waiting for response   �[36mmsgId�[0m=1759802699614-12 �[36mallowedOldestId�[0m=1759802699616-0
�[36mDEBUG�[0m[10-07|02:05:01.616] checkResponses                           �[36mresponded�[0m=0   �[36merrored�[0m=1 �[36mchecked�[0m=1
    pubsub_test.go:320: Error awaiting responses from promises 1: [134]
�[36mDEBUG�[0m[10-07|02:05:01.718] Error destroying a stream group          �[36merror�[0m="dial tcp 127.0.0.1:42357: connect: connection refused"
--- FAIL: TestRedisProduceComplex/two_producers,_some_consumers_killed,_others_should_take_over_their_work,_unequal_number_of_requests_from_producers (2.14s)
TestSequencerFeed
Stack Traces | 5.240s run time
... [CONTENT TRUNCATED: Keeping last 20 lines]
TRACE[10-07|02:11:57.967] P2P networking is spinning down
DEBUG[10-07|02:11:57.968] RPC server shutting down
DEBUG[10-07|02:11:57.968] RPC connection read error                err=EOF
DEBUG[10-07|02:11:57.968] Journalled generator progress            progress=done
DEBUG[10-07|02:11:57.968] Journalled disk layer                    root=9dfbf2..cb68b4
DEBUG[10-07|02:11:57.968] Journalled diff layer                    root=253d18..b1ced8 parent=9dfbf2..cb68b4
DEBUG[10-07|02:11:57.968] Journalled diff layer                    root=a146d8..04977b parent=253d18..b1ced8
INFO [10-07|02:11:57.968] Writing cached state to disk             block=2 hash=893d45..5e2a47 root=a146d8..04977b
INFO [10-07|02:11:57.968] Persisted trie from memory database      nodes=27 flushnodes=0 size=4.13KiB  flushsize=0.00B time="95.498µs"  flushtime=0s gcnodes=0 gcsize=0.00B gctime="5.811µs" livenodes=17 livesize=2.87KiB
INFO [10-07|02:11:57.969] Writing cached state to disk             block=1 hash=717349..4cf2ec root=253d18..b1ced8
INFO [10-07|02:11:57.969] Persisted trie from memory database      nodes=17 flushnodes=0 size=2.87KiB  flushsize=0.00B time="49.562µs"  flushtime=0s gcnodes=0 gcsize=0.00B gctime=0s        livenodes=0  livesize=0.00B
INFO [10-07|02:11:57.969] Writing cached state to disk             block=1 hash=717349..4cf2ec root=253d18..b1ced8
INFO [10-07|02:11:57.969] Persisted trie from memory database      nodes=0  flushnodes=0 size=0.00B    flushsize=0.00B time=461ns       flushtime=0s gcnodes=0 gcsize=0.00B gctime=0s        livenodes=0  livesize=0.00B
INFO [10-07|02:11:57.969] Writing snapshot state to disk           root=9dfbf2..cb68b4
INFO [10-07|02:11:57.969] Persisted trie from memory database      nodes=0  flushnodes=0 size=0.00B    flushsize=0.00B time=331ns       flushtime=0s gcnodes=0 gcsize=0.00B gctime=0s        livenodes=0  livesize=0.00B
DEBUG[10-07|02:11:57.969] Dereferenced trie from memory database   nodes=0  size=0.00B    time=531ns       gcnodes=0 gcsize=0.00B gctime=220ns     livenodes=0  livesize=0.00B
DEBUG[10-07|02:11:57.969] Dereferenced trie from memory database   nodes=0  size=0.00B    time=100ns       gcnodes=0 gcsize=0.00B gctime=260ns     livenodes=0  livesize=0.00B
INFO [10-07|02:11:57.969] Blockchain stopped
TRACE[10-07|02:11:57.971] P2P networking is spinning down
--- FAIL: TestSequencerFeed (5.24s)
TestSequencerRejection
Stack Traces | 5.260s run time
... [CONTENT TRUNCATED: Keeping last 20 lines]
ERROR[10-07|02:24:53.124] Server connection timed out without receiving data url=ws://localhost:37317/feed err="read tcp4 127.0.0.1:50132-&gt;127.0.0.1:37317: i/o timeout"
ERROR[10-07|02:24:53.124] no connected feed
    seq_reject_test.go:69: goroutine 54 [running]:
        runtime/debug.Stack()
        	/opt/hostedtoolcache/go/1.25.1/x64/src/runtime/debug/stack.go:26 +0x5e
        github.com/offchainlabs/nitro/util/testhelpers.RequireImpl({0x4076d30, 0xc000422c40}, {0x4034c80, 0xc0024acba0}, {0x0, 0x0, 0x0})
        	/home/runner/work/nitro/nitro/util/testhelpers/testhelpers.go:29 +0x55
        github.com/offchainlabs/nitro/system_tests.Require(0xc000422c40, {0x4034c80, 0xc0024acba0}, {0x0, 0x0, 0x0})
        	/home/runner/work/nitro/nitro/system_tests/common_test.go:1722 +0x5d
        github.com/offchainlabs/nitro/system_tests.TestSequencerRejection(0xc000422c40)
        	/home/runner/work/nitro/nitro/system_tests/seq_reject_test.go:69 +0xad2
        testing.tRunner(0xc000422c40, 0x3cbff68)
        	/opt/hostedtoolcache/go/1.25.1/x64/src/testing/testing.go:1934 +0xea
        created by testing.(*T).Run in goroutine 1
        	/opt/hostedtoolcache/go/1.25.1/x64/src/testing/testing.go:1997 +0x465
        
    seq_reject_test.go:69: �[31;1m [] waitForTx (tx=0xbcccb6153e6592092eece469ecb65cbab31a4f23dab5d450939414e79ad0fd21) got: context deadline exceeded �[0;0m
ERROR[10-07|02:24:53.449] Server connection timed out without receiving data url=ws://localhost:37317/feed err="read tcp4 127.0.0.1:53048-&gt;127.0.0.1:37317: i/o timeout"
ERROR[10-07|02:24:53.449] no connected feed
--- FAIL: TestSequencerRejection (5.26s)

📣 Thoughts on this report? Let Codecov know! | Powered by Codecov

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants