Skip to content

Commit a4e6e9a

Browse files
macfarlaclaudepinges
authored
Dispatch snap server request processing off Netty event loop (#10083)
* Dispatch snap server request processing off the Netty event loop SnapProtocolManager.processMessage() was calling snapMessages.dispatch() synchronously on the Netty event loop thread for incoming GET_* snap requests. These handlers do heavy work (trie traversal, proof generation, DB reads) that can take several seconds, blocking the event loop and preventing the server from responding to concurrent ETH protocol messages such as GET_BLOCK_HEADERS. This caused the client to accumulate 5+ (eth,3) timeouts and disconnect the server with TIMEOUT. Fix: for even-coded snap request messages, dispatch the heavy snapMessages.dispatch() work and the subsequent send() to EthScheduler's service thread pool. The Netty event loop now returns immediately after dispatching, staying free to handle ETH messages. Response messages (odd codes) are already handled by dispatchMessage() and need no changes. - Add REQUEST_CODES to SnapV2 so GET_BLOCK_ACCESS_LISTS (0x08) requests from snap/2 peers are scheduled off the Netty event loop Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com> * Catch FramingException inside scheduled snap request task --------- Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Stefan Pingel <16143240+pinges@users.noreply.github.com>
1 parent 88df77c commit a4e6e9a

File tree

6 files changed

+91
-32
lines changed

6 files changed

+91
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
symbols in JVM space. [besu-native #308](https://github.com/besu-eth/besu-native/pull/308)
88

99
### Additions and Improvements
10+
- Dispatch snap server request processing (GET_ACCOUNT_RANGE, GET_STORAGE_RANGE, GET_BYTECODES, GET_TRIE_NODES, GET_BLOCK_ACCESS_LISTS) off the Netty event loop to prevent heavy trie/DB work from blocking ETH protocol message handling [#10083](https://github.com/besu-eth/besu/pull/10083)
1011
- Add DiscV5 discovery metrics (`discv5_live_nodes_current`, `discv5_total_nodes_current`) to track node counts in the routing table [#9692](https://github.com/besu-eth/besu/issues/9692)
1112

1213
## 26.3.0

app/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,7 @@ public BesuController build() {
852852
worldStateStorageCoordinator,
853853
ethPeers,
854854
snapMessages,
855+
scheduler,
855856
synchronizer);
856857

857858
final MiningCoordinator miningCoordinator =
@@ -1268,13 +1269,15 @@ private Optional<SnapProtocolManager> createSnapProtocolManager(
12681269
final WorldStateStorageCoordinator worldStateStorageCoordinator,
12691270
final EthPeers ethPeers,
12701271
final EthMessages snapMessages,
1272+
final EthScheduler ethScheduler,
12711273
final Synchronizer synchronizer) {
12721274
return Optional.of(
12731275
new SnapProtocolManager(
12741276
worldStateStorageCoordinator,
12751277
syncConfig.getSnapSyncConfiguration(),
12761278
ethPeers,
12771279
snapMessages,
1280+
ethScheduler,
12781281
protocolSchedule,
12791282
protocolContext,
12801283
synchronizer));

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
2222
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
2323
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
24+
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
25+
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
26+
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV2;
2427
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
2528
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
2629
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
@@ -39,6 +42,7 @@
3942
import java.util.List;
4043
import java.util.Map;
4144
import java.util.Optional;
45+
import java.util.concurrent.CancellationException;
4246

4347
import com.google.common.collect.ImmutableList;
4448
import org.slf4j.Logger;
@@ -50,17 +54,20 @@ public class SnapProtocolManager implements ProtocolManager {
5054
private final List<Capability> supportedCapabilities;
5155
private final EthPeers ethPeers;
5256
private final EthMessages snapMessages;
57+
private final EthScheduler ethScheduler;
5358

5459
public SnapProtocolManager(
5560
final WorldStateStorageCoordinator worldStateStorageCoordinator,
5661
final SnapSyncConfiguration snapConfig,
5762
final EthPeers ethPeers,
5863
final EthMessages snapMessages,
64+
final EthScheduler ethScheduler,
5965
final ProtocolSchedule protocolSchedule,
6066
final ProtocolContext protocolContext,
6167
final Synchronizer synchronizer) {
6268
this.ethPeers = ethPeers;
6369
this.snapMessages = snapMessages;
70+
this.ethScheduler = ethScheduler;
6471
this.supportedCapabilities = calculateCapabilities(protocolSchedule);
6572
new SnapServer(
6673
snapConfig, snapMessages, worldStateStorageCoordinator, protocolContext, synchronizer);
@@ -116,21 +123,10 @@ public void processMessage(final Capability cap, final Message message) {
116123
return;
117124
}
118125

119-
Optional<MessageData> maybeResponseData = Optional.empty();
126+
// Decode the snap message. FramingException (decompression failure) is a protocol violation.
127+
final MessageData messageData;
120128
try {
121-
final MessageData messageData = AbstractSnapMessageData.create(message);
122-
final EthMessage decodedEthMessage = new EthMessage(ethPeer, messageData);
123-
124-
// This will handle responses
125-
ethPeers.dispatchMessage(ethPeer, decodedEthMessage, getSupportedProtocol());
126-
127-
// This will handle requests
128-
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
129-
decodedEthMessage.getData().unwrapMessageData();
130-
maybeResponseData =
131-
snapMessages
132-
.dispatch(new EthMessage(ethPeer, requestIdAndEthMessage.getValue()), cap)
133-
.map(responseData -> responseData.wrapMessageData(requestIdAndEthMessage.getKey()));
129+
messageData = AbstractSnapMessageData.create(message);
134130
} catch (final FramingException e) {
135131
LOG.atDebug()
136132
.setMessage("Disconnecting peer {} due to decompression failure for message code {}")
@@ -139,25 +135,70 @@ public void processMessage(final Capability cap, final Message message) {
139135
.setCause(e)
140136
.log();
141137
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
142-
} catch (final RLPException e) {
143-
LOG.debug(
144-
"Received malformed message code={} (BREACH_OF_PROTOCOL), disconnecting: {}",
145-
code,
146-
ethPeer,
147-
e);
148-
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
138+
return;
139+
}
140+
final EthMessage decodedEthMessage = new EthMessage(ethPeer, messageData);
141+
142+
// Dispatch to pending response handlers (no-op for inbound requests).
143+
ethPeers.dispatchMessage(ethPeer, decodedEthMessage, getSupportedProtocol());
144+
145+
// GET_* requests are handled off the Netty event loop to avoid blocking ETH protocol traffic.
146+
if (SnapV1.REQUEST_CODES.contains(code) || SnapV2.REQUEST_CODES.contains(code)) {
147+
scheduleSnapRequest(ethPeer, decodedEthMessage, cap, code);
148+
}
149+
}
150+
151+
private void scheduleSnapRequest(
152+
final EthPeer ethPeer,
153+
final EthMessage decodedEthMessage,
154+
final Capability cap,
155+
final int code) {
156+
ethScheduler
157+
.scheduleServiceTask(
158+
() -> {
159+
Optional<MessageData> maybeResponseData = Optional.empty();
160+
try {
161+
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
162+
decodedEthMessage.getData().unwrapMessageData();
163+
maybeResponseData =
164+
snapMessages
165+
.dispatch(new EthMessage(ethPeer, requestIdAndEthMessage.getValue()), cap)
166+
.map(
167+
responseData ->
168+
responseData.wrapMessageData(requestIdAndEthMessage.getKey()));
169+
} catch (final FramingException | RLPException e) {
170+
LOG.debug(
171+
"Received malformed snap message code={} (BREACH_OF_PROTOCOL), disconnecting: {}",
172+
code,
173+
ethPeer,
174+
e);
175+
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
176+
}
177+
maybeResponseData.ifPresent(responseData -> sendSnapResponse(ethPeer, responseData));
178+
})
179+
.exceptionally(
180+
e -> {
181+
if (!(e instanceof CancellationException)) {
182+
LOG.atWarn()
183+
.setMessage("Unexpected error handling snap request code={} from peer {}")
184+
.addArgument(code)
185+
.addArgument(ethPeer::getLoggableId)
186+
.setCause(e)
187+
.log();
188+
}
189+
return null;
190+
});
191+
}
192+
193+
private void sendSnapResponse(final EthPeer ethPeer, final MessageData responseData) {
194+
try {
195+
ethPeer.send(responseData, getSupportedProtocol());
196+
} catch (final PeerConnection.PeerNotConnected e) {
197+
LOG.atTrace()
198+
.setMessage("Peer disconnected before we could respond - nothing to do {}")
199+
.addArgument(e.getMessage())
200+
.log();
149201
}
150-
maybeResponseData.ifPresent(
151-
responseData -> {
152-
try {
153-
ethPeer.send(responseData, getSupportedProtocol());
154-
} catch (final PeerConnection.PeerNotConnected error) {
155-
LOG.atTrace()
156-
.setMessage("Peer disconnected before we could respond - nothing to do {}")
157-
.addArgument(error.getMessage())
158-
.log();
159-
}
160-
});
161202
}
162203

163204
@Override

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/SnapV1.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package org.hyperledger.besu.ethereum.eth.messages.snap;
1616

17+
import java.util.Set;
18+
1719
public final class SnapV1 {
1820

1921
public static final int GET_ACCOUNT_RANGE = 0x00;
@@ -25,6 +27,10 @@ public final class SnapV1 {
2527
public static final int GET_TRIE_NODES = 0x06;
2628
public static final int TRIE_NODES = 0x07;
2729

30+
/** The set of inbound request message codes that the snap server must handle. */
31+
public static final Set<Integer> REQUEST_CODES =
32+
Set.of(GET_ACCOUNT_RANGE, GET_STORAGE_RANGE, GET_BYTECODES, GET_TRIE_NODES);
33+
2834
private SnapV1() {
2935
// Holder for constants only
3036
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/SnapV2.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package org.hyperledger.besu.ethereum.eth.messages.snap;
1616

17+
import java.util.Set;
18+
1719
public final class SnapV2 {
1820

1921
public static final int GET_ACCOUNT_RANGE = 0x00;
@@ -25,6 +27,9 @@ public final class SnapV2 {
2527
public static final int GET_BLOCK_ACCESS_LISTS = 0x08;
2628
public static final int BLOCK_ACCESS_LISTS = 0x09;
2729

30+
/** The set of snap/2-only inbound request message codes that the snap server must handle. */
31+
public static final Set<Integer> REQUEST_CODES = Set.of(GET_BLOCK_ACCESS_LISTS);
32+
2833
private SnapV2() {
2934
// Holder for constants only
3035
}

ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManagerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
2525
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
2626
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
27+
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
2728
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
2829
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
2930
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@@ -54,6 +55,7 @@ class SnapProtocolManagerTest {
5455
@Mock private ProtocolSchedule protocolSchedule;
5556
@Mock private ProtocolContext protocolContext;
5657
@Mock private Synchronizer synchronizer;
58+
@Mock private EthScheduler ethScheduler;
5759
@Mock private EthPeer ethPeer;
5860

5961
private SnapProtocolManager snapProtocolManager;
@@ -67,6 +69,7 @@ void setUp() {
6769
snapConfig,
6870
ethPeers,
6971
snapMessages,
72+
ethScheduler,
7073
protocolSchedule,
7174
protocolContext,
7275
synchronizer);

0 commit comments

Comments
 (0)