Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct Config {

pub disable_upnp: bool,

/// Disable transaction relay (node won't request or broadcast transactions)
pub disable_relay_tx: bool,

/// A scale factor to apply to memory allocation bounds
pub ram_scale: f64,

Expand Down Expand Up @@ -97,6 +100,7 @@ impl Config {
#[cfg(feature = "devnet-prealloc")]
initial_utxo_set: Default::default(),
disable_upnp: false,
disable_relay_tx: false,
ram_scale: 1.0,
retention_period_days: None,
}
Expand Down
5 changes: 5 additions & 0 deletions kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct Args {
pub disable_dns_seeding: bool,
#[serde(rename = "nogrpc")]
pub disable_grpc: bool,
pub disable_relay_tx: bool,
pub ram_scale: f64,
pub retention_period_days: Option<f64>,

Expand Down Expand Up @@ -146,6 +147,7 @@ impl Default for Args {
disable_upnp: false,
disable_dns_seeding: false,
disable_grpc: false,
disable_relay_tx: false,
ram_scale: 1.0,
retention_period_days: None,
override_params_file: None,
Expand All @@ -160,6 +162,7 @@ impl Args {
pub fn apply_to_config(&self, config: &mut Config) {
config.utxoindex = self.utxoindex;
config.disable_upnp = self.disable_upnp;
config.disable_relay_tx = self.disable_relay_tx;
config.unsafe_rpc = self.unsafe_rpc;
config.enable_unsynced_mining = self.enable_unsynced_mining;
config.enable_mainnet_mining = self.enable_mainnet_mining;
Expand Down Expand Up @@ -391,6 +394,7 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0
.arg(arg!(--"disable-upnp" "Disable upnp").env("KASPAD_DISABLE_UPNP"))
.arg(arg!(--"nodnsseed" "Disable DNS seeding for peers").env("KASPAD_NODNSSEED"))
.arg(arg!(--"nogrpc" "Disable gRPC server").env("KASPAD_NOGRPC"))
.arg(arg!(--"disable-relay-tx" "Disable transaction relay. Node will not request or broadcast transactions to/from peers.").env("KASPAD_DISABLE_RELAY_TX"))
.arg(
Arg::new("ram-scale")
.long("ram-scale")
Expand Down Expand Up @@ -519,6 +523,7 @@ impl Args {
disable_upnp: arg_match_unwrap_or::<bool>(&m, "disable-upnp", defaults.disable_upnp),
disable_dns_seeding: arg_match_unwrap_or::<bool>(&m, "nodnsseed", defaults.disable_dns_seeding),
disable_grpc: arg_match_unwrap_or::<bool>(&m, "nogrpc", defaults.disable_grpc),
disable_relay_tx: arg_match_unwrap_or::<bool>(&m, "disable-relay-tx", defaults.disable_relay_tx),
ram_scale: arg_match_unwrap_or::<f64>(&m, "ram-scale", defaults.ram_scale),
retention_period_days: m.get_one::<f64>("retention-period-days").cloned().or(defaults.retention_period_days),

Expand Down
2 changes: 1 addition & 1 deletion protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ impl ConnectionInitializer for FlowContext {
let mut self_version_message = Version::new(local_address, self.node_id, network_name.clone(), None, PROTOCOL_VERSION);
self_version_message.add_user_agent(name(), version(), &self.config.user_agent_comments);
// TODO: get number of live services
// TODO: disable_relay_tx from config/cmd
self_version_message.disable_relay_tx = self.config.disable_relay_tx;

// Perform the handshake
let peer_version_message = handshake.handshake(self_version_message.into()).await?;
Expand Down
8 changes: 5 additions & 3 deletions protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use itertools::Itertools;
use kaspa_consensus_core::tx::TransactionId;
use kaspa_core::debug;
use kaspa_p2p_lib::{
Hub, make_message,
Hub, Router, make_message,
pb::{InvTransactionsMessage, KaspadMessage, kaspad_message::Payload},
};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -94,11 +94,13 @@ impl TransactionsSpread {
}

async fn broadcast(&self, msg: KaspadMessage, should_throttle: bool) {
// Skip peers that have set disable_relay_tx in their version handshake
let accepts_relay = |router: &std::sync::Arc<Router>| !router.properties().disable_relay_tx;
if should_throttle {
// TODO: Figure out a better number
self.hub.broadcast_to_some_peers(msg, 8).await
self.hub.broadcast_to_some_peers_filtered(msg, 8, accepts_relay).await
} else {
self.hub.broadcast(msg, None).await
self.hub.broadcast_filtered(msg, accepts_relay).await
}
}
}
7 changes: 7 additions & 0 deletions protocol/flows/src/v7/txrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ impl RelayTransactionsFlow {
}

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
// If our node has disabled transaction relay, drain inv messages without acting on them
if self.ctx.config.disable_relay_tx {
loop {
let _ = dequeue!(self.invs_route, Payload::InvTransactions)?;
}
}

// trace!("Starting relay transactions flow with {}", self.router.identity());
let mut throttling_state = ThrottlingState {
should_throttle: false,
Expand Down
23 changes: 23 additions & 0 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,29 @@ impl Hub {
}
}

/// Broadcast a message to all peers matching a predicate
pub async fn broadcast_filtered(&self, msg: KaspadMessage, predicate: impl Fn(&Arc<Router>) -> bool) {
let peers = self.peers.read().values().filter(|r| predicate(r)).cloned().collect::<Vec<_>>();
for router in peers {
let _ = router.enqueue(msg.clone()).await;
}
}

/// Broadcast a message to some number of peers matching a predicate
pub async fn broadcast_to_some_peers_filtered(
&self,
msg: KaspadMessage,
num_peers: usize,
predicate: impl Fn(&Arc<Router>) -> bool,
) {
assert!(num_peers > 0);
let peers: Vec<_> = self.peers.read().values().filter(|r| predicate(r)).cloned().collect();
let selected = peers.into_iter().choose_multiple(&mut rand::thread_rng(), num_peers);
for router in selected {
let _ = router.enqueue(msg.clone()).await;
}
}

/// Broadcast a vector of messages to all peers (except an optional filtered peer)
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>, filter_peer: Option<PeerKey>) {
if msgs.is_empty() {
Expand Down
Loading