Skip to content

Let Non validators gossip QBFT messages #8562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- Support for block creation on networks running a pre-Byzantium fork is deprecated for removal in a future release, after that in order to update Besu on nodes that build blocks, your network needs to be upgraded at least to the Byzantium fork. The main reason is to simplify world state management during block creation, since before Byzantium for each selected transaction, the receipt must contain the root hash of the modified world state, and this does not play well with the new plugin features and future work on parallelism.

### Additions and Improvements
- Let Non validators gossip QBFT messages [#8562](https://github.com/hyperledger/besu/pull/8562)
- Add eth/69 protocol for optional use by using the `--Xeth-capability-max=69` flag (currently defaults to 68) [#8519](https://github.com/hyperledger/besu/pull/8519)
- `--Xplugin-rocksdb-blockchain-blob-garbage-collection-enabled` Adds ability to enabled BlobDB GC for BLOCKCHAIN column family [#8599](https://github.com/hyperledger/besu/pull/8599)
- `--Xplugin-rocksdb-blob-garbage-collection-age-cutoff`, `--Xplugin-rocksdb-blob-garbage-collection-force-threshold` BlobDB GC config options [#8599](https://github.com/hyperledger/besu/pull/8599)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftMiningCoordinator;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftProposerSelector;
import org.hyperledger.besu.consensus.common.bft.blockcreation.ProposerSelector;
import org.hyperledger.besu.consensus.common.bft.network.ValidatorPeers;
import org.hyperledger.besu.consensus.common.bft.network.Peers;
import org.hyperledger.besu.consensus.common.bft.protocol.BftProtocolManager;
import org.hyperledger.besu.consensus.common.bft.statemachine.BftEventHandler;
import org.hyperledger.besu.consensus.common.bft.statemachine.FutureMessageBuffer;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class QbftBesuControllerBuilder extends BesuControllerBuilder {
private BftEventQueue bftEventQueue;
private QbftConfigOptions qbftConfig;
private ForksSchedule<QbftConfigOptions> qbftForksSchedule;
private ValidatorPeers peers;
private Peers peers;
private TransactionValidatorProvider transactionValidatorProvider;
private BftConfigOptions bftConfigOptions;
private QbftExtraDataCodec qbftExtraDataCodec;
Expand Down Expand Up @@ -234,7 +234,8 @@ protected MiningCoordinator createMiningCoordinator(

// NOTE: peers should not be used for accessing the network as it does not enforce the
// "only send once" filter applied by the UniqueMessageMulticaster.
peers = new ValidatorPeers(validatorProvider, Istanbul100SubProtocol.NAME);
// Use Peers instead of ValidatorPeers to allow message relay by non-validators
peers = new Peers(Istanbul100SubProtocol.NAME);

final UniqueMessageMulticaster uniqueMessageMulticaster =
new UniqueMessageMulticaster(peers, qbftConfig.getGossipedHistoryLimit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.consensus.common.bft;

import org.hyperledger.besu.consensus.common.bft.network.ValidatorMulticaster;
import org.hyperledger.besu.consensus.common.bft.network.PeerMulticaster;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;

Expand All @@ -24,8 +24,8 @@
import com.google.common.annotations.VisibleForTesting;

/** The Unique message multicaster. */
public class UniqueMessageMulticaster implements ValidatorMulticaster {
private final ValidatorMulticaster multicaster;
public class UniqueMessageMulticaster implements PeerMulticaster {
private final PeerMulticaster multicaster;
private final MessageTracker gossipedMessageTracker;

/**
Expand All @@ -34,8 +34,7 @@ public class UniqueMessageMulticaster implements ValidatorMulticaster {
* @param multicaster Network connections to the remote validators
* @param gossipHistoryLimit Maximum messages to track as seen
*/
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final int gossipHistoryLimit) {
public UniqueMessageMulticaster(final PeerMulticaster multicaster, final int gossipHistoryLimit) {
this.multicaster = multicaster;
this.gossipedMessageTracker = new MessageTracker(gossipHistoryLimit);
}
Expand All @@ -48,7 +47,7 @@ public UniqueMessageMulticaster(
*/
@VisibleForTesting
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final MessageTracker gossipedMessageTracker) {
final PeerMulticaster multicaster, final MessageTracker gossipedMessageTracker) {
this.multicaster = multicaster;
this.gossipedMessageTracker = gossipedMessageTracker;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -19,21 +19,20 @@

import java.util.Collection;

/** The interface Validator multicaster. */
public interface ValidatorMulticaster {

/** Interface responsible for sending messages to network peers */
public interface PeerMulticaster {
/**
* Send.
* Send message to all peers
*
* @param message the message
* @param message The message to be sent
*/
void send(final MessageData message);

/**
* Send.
* Send message to all peers except those in denyList
*
* @param message the message
* @param denylist the denylist
* @param message The message to be sent
* @param denylist List of peers to exclude from message distribution
*/
void send(final MessageData message, final Collection<Address> denylist);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.common.bft.network;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Responsible for tracking all network peers which have a connection to this node, then
* multicasting packets to all connected peers.
*/
public class Peers implements PeerMulticaster, PeerConnectionTracker {
private static final Logger LOG = LoggerFactory.getLogger(Peers.class);

/**
* The connectionsByAddress map is a mapping of peer addresses to the set of PeerConnections
* associated with that address. This allows for multiple connections to the same peer address.
*/
protected final Map<Address, Set<PeerConnection>> connectionsByAddress =
new ConcurrentHashMap<>();

/**
* The protocolName is the name of the protocol being used for communication. This is used to
* identify the protocol when sending messages.
*/
protected final String protocolName;

/**
* Constructor for Peers.
*
* @param protocolName the name of the protocol
*/
public Peers(final String protocolName) {
this.protocolName = protocolName;
}

@Override
public void add(final PeerConnection newConnection) {
final Address peerAddress = newConnection.getPeerInfo().getAddress();
final Set<PeerConnection> connections =
connectionsByAddress.computeIfAbsent(
peerAddress, (k) -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
connections.add(newConnection);
}

@Override
public void remove(final PeerConnection removedConnection) {
final Address peerAddress = removedConnection.getPeerInfo().getAddress();
final Set<PeerConnection> connections = connectionsByAddress.get(peerAddress);
if (connections != null) {
connections.remove(removedConnection);
if (connections.isEmpty()) {
connectionsByAddress.remove(peerAddress);
}
}
}

@Override
public void send(final MessageData message) {
send(message, Collections.emptyList());
}

@Override
public void send(final MessageData message, final Collection<Address> denylist) {
final Collection<Address> allowList =
connectionsByAddress.keySet().stream()
.filter(address -> !denylist.contains(address))
.collect(Collectors.toSet());

sendMessageToSpecificAddresses(allowList, message);
}

/**
* Send a message to a specific set of addresses.
*
* @param recipients the recipients
* @param message the message
*/
protected void sendMessageToSpecificAddresses(
final Collection<Address> recipients, final MessageData message) {
LOG.trace(
"Sending message to peers messageCode={} recipients={} protocol={}",
message.getCode(),
recipients,
protocolName);
recipients.stream()
.map(connectionsByAddress::get)
.filter(Objects::nonNull)
.flatMap(Set::stream)
.forEach(
connection -> {
try {
connection.sendForProtocol(protocolName, message);
} catch (final PeerNotConnected peerNotConnected) {
LOG.trace(
"Lost connection to a validator. remoteAddress={} peerInfo={}",
connection.getRemoteAddress(),
connection.getPeerInfo());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,18 @@

import org.hyperledger.besu.consensus.common.validator.ValidatorProvider;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Responsible for tracking the network peers which have a connection to this node, then
* multicasting packets to ONLY the peers which have been identified as being validators.
*/
public class ValidatorPeers implements ValidatorMulticaster, PeerConnectionTracker {

private static final Logger LOG = LoggerFactory.getLogger(ValidatorPeers.class);
public class ValidatorPeers extends Peers {

// It's possible for multiple connections between peers to exist for brief periods, so map each
// address to a set of connections
private final Map<Address, Set<PeerConnection>> connectionsByAddress = new ConcurrentHashMap<>();
private final ValidatorProvider validatorProvider;
private final String protocolName;

/**
* Instantiates a new Validator peers.
Expand All @@ -52,32 +36,8 @@ public class ValidatorPeers implements ValidatorMulticaster, PeerConnectionTrack
* @param protocolName the protocol name
*/
public ValidatorPeers(final ValidatorProvider validatorProvider, final String protocolName) {
super(protocolName);
this.validatorProvider = validatorProvider;
this.protocolName = protocolName;
}

@Override
public void add(final PeerConnection newConnection) {
final Address peerAddress = newConnection.getPeerInfo().getAddress();
final Set<PeerConnection> connections =
connectionsByAddress.computeIfAbsent(
peerAddress, (k) -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
connections.add(newConnection);
}

@Override
public void remove(final PeerConnection removedConnection) {
final Address peerAddress = removedConnection.getPeerInfo().getAddress();
final Set<PeerConnection> connections = connectionsByAddress.get(peerAddress);
if (connections == null) {
return;
}
connections.remove(removedConnection);
}

@Override
public void send(final MessageData message) {
sendMessageToSpecificAddresses(getLatestValidators(), message);
}

@Override
Expand All @@ -89,27 +49,6 @@ public void send(final MessageData message, final Collection<Address> denylist)
sendMessageToSpecificAddresses(includedValidators, message);
}

private void sendMessageToSpecificAddresses(
final Collection<Address> recipients, final MessageData message) {
LOG.trace(
"Sending message to peers messageCode={} recipients={}", message.getCode(), recipients);
recipients.stream()
.map(connectionsByAddress::get)
.filter(Objects::nonNull)
.flatMap(Set::stream)
.forEach(
connection -> {
try {
connection.sendForProtocol(protocolName, message);
} catch (final PeerNotConnected peerNotConnected) {
LOG.trace(
"Lost connection to a validator. remoteAddress={} peerInfo={}",
connection.getRemoteAddress(),
connection.getPeerInfo());
}
});
}

private Collection<Address> getLatestValidators() {
return validatorProvider.getValidatorsAtHead();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.hyperledger.besu.consensus.common.bft.RoundTimer;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftBlockCreatorFactory;
import org.hyperledger.besu.consensus.common.bft.blockcreation.ProposerSelector;
import org.hyperledger.besu.consensus.common.bft.network.ValidatorMulticaster;
import org.hyperledger.besu.consensus.common.bft.network.PeerMulticaster;
import org.hyperledger.besu.consensus.common.validator.ValidatorProvider;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.datatypes.Address;
Expand All @@ -39,7 +39,7 @@ public class BftFinalState {
private final BlockTimer blockTimer;
private final BftBlockCreatorFactory<?> blockCreatorFactory;
private final Clock clock;
private final ValidatorMulticaster validatorMulticaster;
private final PeerMulticaster peerMulticaster;

/**
* Instantiates a new Bft final state.
Expand All @@ -48,7 +48,7 @@ public class BftFinalState {
* @param nodeKey the node key
* @param localAddress the local address
* @param proposerSelector the proposer selector
* @param validatorMulticaster the validator multicaster
* @param peerMulticaster the peer multicaster
* @param roundTimer the round timer
* @param blockTimer the block timer
* @param blockCreatorFactory the block creator factory
Expand All @@ -59,7 +59,7 @@ public BftFinalState(
final NodeKey nodeKey,
final Address localAddress,
final ProposerSelector proposerSelector,
final ValidatorMulticaster validatorMulticaster,
final PeerMulticaster peerMulticaster,
final RoundTimer roundTimer,
final BlockTimer blockTimer,
final BftBlockCreatorFactory<?> blockCreatorFactory,
Expand All @@ -72,7 +72,7 @@ public BftFinalState(
this.blockTimer = blockTimer;
this.blockCreatorFactory = blockCreatorFactory;
this.clock = clock;
this.validatorMulticaster = validatorMulticaster;
this.peerMulticaster = peerMulticaster;
}

/**
Expand Down Expand Up @@ -168,12 +168,12 @@ public Address getProposerForRound(final ConsensusRoundIdentifier roundIdentifie
}

/**
* Gets validator multicaster.
* Gets peer multicaster.
*
* @return the validator multicaster
* @return the peer multicaster
*/
public ValidatorMulticaster getValidatorMulticaster() {
return validatorMulticaster;
public PeerMulticaster getPeerMulticaster() {
return peerMulticaster;
}

/**
Expand Down
Loading
Loading