Skip to content

Commit 3f203d4

Browse files
MatusKyselchee-chyuan
authored andcommitted
feat(eth): send big transactions by announce/retrieve only (#112)
* eth: send big transactions by announce/retrieve only * style: fix tx propagation test formatting
1 parent e6cfba2 commit 3f203d4

2 files changed

Lines changed: 111 additions & 2 deletions

File tree

crates/net/network/src/transactions/constants.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@ pub const SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE:
1414
/// Default is 128 KiB.
1515
pub const DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE: usize = 128 * 1024;
1616

17+
/// Max size of a transaction that will be broadcast in full to peers.
18+
///
19+
/// Transactions larger than this threshold will only be announced via
20+
/// [`NewPooledTransactionHashes`](reth_eth_wire::NewPooledTransactionHashes) and must be
21+
/// explicitly fetched by interested peers. This aligns with the BSC client behavior to reduce
22+
/// bandwidth usage for large transactions (e.g. contract deployments).
23+
///
24+
/// Default is 4096 bytes.
25+
pub const TX_MAX_BROADCAST_SIZE: usize = 4096;
26+
1727
/* ================ REQUEST-RESPONSE ================ */
1828

1929
/// Recommended soft limit for the number of hashes in a

crates/net/network/src/transactions/mod.rs

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use policy::NetworkPolicies;
2525

2626
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
2727

28-
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28+
use self::constants::{
29+
tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE,
30+
TX_MAX_BROADCAST_SIZE,
31+
};
2932
use crate::{
3033
budget::{
3134
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
@@ -1091,7 +1094,14 @@ where
10911094
// Only proceed if the transaction is not in the peer's list of seen
10921095
// transactions
10931096
if !peer.seen_transactions.contains(tx.tx_hash()) {
1094-
builder.push(tx);
1097+
// Large transactions are only announced as hashes in the normal
1098+
// pool fanout and must be fetched explicitly by peers. This matches
1099+
// BSC's txMaxBroadcastSize behavior.
1100+
if tx.size > TX_MAX_BROADCAST_SIZE {
1101+
builder.push_pooled(tx);
1102+
} else {
1103+
builder.push(tx);
1104+
}
10951105
}
10961106
}
10971107
}
@@ -1927,6 +1937,15 @@ impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
19271937
Self::Full(builder) => builder.push(transaction),
19281938
}
19291939
}
1940+
1941+
/// Appends a transaction as a hash-only announcement, regardless of whether this builder
1942+
/// would otherwise send full transactions.
1943+
fn push_pooled(&mut self, transaction: &PropagateTransaction<T>) {
1944+
match self {
1945+
Self::Pooled(builder) => builder.push(transaction),
1946+
Self::Full(builder) => builder.pooled.push(transaction),
1947+
}
1948+
}
19301949
}
19311950

19321951
/// Represents how the transactions should be sent to a peer if any.
@@ -3108,6 +3127,86 @@ mod tests {
31083127
assert_eq!(txs.len(), 1);
31093128
}
31103129

3130+
#[tokio::test]
3131+
async fn test_large_tx_broadcast_threshold() {
3132+
reth_tracing::init_test_tracing();
3133+
3134+
let (mut tx_manager, network) = new_tx_manager().await;
3135+
3136+
network.handle().update_sync_state(SyncState::Idle);
3137+
3138+
// Register two peers so we can test Basic on one and Forced on the other
3139+
let peer_id_1 = PeerId::random();
3140+
let (tx1, _rx1) = mpsc::channel::<PeerRequest>(1);
3141+
let session_info_1 = SessionInfo {
3142+
peer_id: peer_id_1,
3143+
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3144+
client_version: Arc::from(""),
3145+
capabilities: Arc::new(vec![].into()),
3146+
status: Arc::new(Default::default()),
3147+
version: EthVersion::Eth68,
3148+
peer_kind: PeerKind::Basic,
3149+
};
3150+
let messages_1: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id_1, tx1);
3151+
tx_manager.on_network_event(NetworkEvent::ActivePeerSession {
3152+
info: session_info_1,
3153+
messages: messages_1,
3154+
});
3155+
3156+
let mut factory = MockTransactionFactory::default();
3157+
3158+
// A small transaction (within TX_MAX_BROADCAST_SIZE) should be sent in full via Basic mode
3159+
let small_tx = Arc::new(factory.create_eip1559());
3160+
let small_propagate = vec![PropagateTransaction::pool_tx(small_tx.clone())];
3161+
let propagated = tx_manager.propagate_transactions(small_propagate, PropagationMode::Basic);
3162+
let prop_txs = propagated.0.get(small_tx.transaction.hash()).unwrap();
3163+
assert_eq!(prop_txs.len(), 1);
3164+
assert!(prop_txs[0].is_full(), "small tx should be broadcast in full");
3165+
3166+
// A large transaction (exceeding TX_MAX_BROADCAST_SIZE) should be hash-only in Basic mode
3167+
let mut large_valid_tx = factory.create_eip1559();
3168+
large_valid_tx.transaction.set_size(TX_MAX_BROADCAST_SIZE + 1);
3169+
let large_tx = Arc::new(large_valid_tx);
3170+
let large_propagate = vec![PropagateTransaction::pool_tx(large_tx.clone())];
3171+
let propagated = tx_manager.propagate_transactions(large_propagate, PropagationMode::Basic);
3172+
let prop_txs = propagated.0.get(large_tx.transaction.hash()).unwrap();
3173+
assert_eq!(prop_txs.len(), 1);
3174+
assert!(prop_txs[0].is_hash(), "large tx should be hash-only in Basic mode");
3175+
3176+
// Register a second peer to test Forced mode with a fresh seen set
3177+
let peer_id_2 = PeerId::random();
3178+
let (tx2, _rx2) = mpsc::channel::<PeerRequest>(1);
3179+
let session_info_2 = SessionInfo {
3180+
peer_id: peer_id_2,
3181+
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1),
3182+
client_version: Arc::from(""),
3183+
capabilities: Arc::new(vec![].into()),
3184+
status: Arc::new(Default::default()),
3185+
version: EthVersion::Eth68,
3186+
peer_kind: PeerKind::Basic,
3187+
};
3188+
let messages_2: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id_2, tx2);
3189+
tx_manager.on_network_event(NetworkEvent::ActivePeerSession {
3190+
info: session_info_2,
3191+
messages: messages_2,
3192+
});
3193+
3194+
// The same large transaction should be sent in full via Forced mode (e.g.
3195+
// broadcast_transactions before pool insertion)
3196+
let mut large_valid_tx_2 = factory.create_eip1559();
3197+
large_valid_tx_2.transaction.set_size(TX_MAX_BROADCAST_SIZE + 1);
3198+
let large_tx_2 = Arc::new(large_valid_tx_2);
3199+
let large_propagate_2 = vec![PropagateTransaction::pool_tx(large_tx_2.clone())];
3200+
let propagated =
3201+
tx_manager.propagate_transactions(large_propagate_2, PropagationMode::Forced);
3202+
let prop_txs = propagated.0.get(large_tx_2.transaction.hash()).unwrap();
3203+
// Forced mode should deliver to both peers in full
3204+
assert!(
3205+
prop_txs.iter().all(|p| p.is_full()),
3206+
"large tx should be broadcast in full in Forced mode"
3207+
);
3208+
}
3209+
31113210
#[tokio::test]
31123211
async fn test_propagate_full() {
31133212
reth_tracing::init_test_tracing();

0 commit comments

Comments
 (0)