Skip to content

Commit 036fa8c

Browse files
Crescendo-related RAM optimizations & miscellaneous (#650)
* move genesis log to debug * disable p2p server if connect is active * log inbound/outbound counts * use local cache for pruning proof build for saving ram (the proof is held in memory) * do the same (local cache) for pruning proof received via p2p * prettify * logs (and 2x cache capacity) * fix rare edge case * fix comment * bump version to 0.17.1
1 parent 6e84ac6 commit 036fa8c

File tree

10 files changed

+181
-138
lines changed

10 files changed

+181
-138
lines changed

Cargo.lock

Lines changed: 58 additions & 58 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 56 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ members = [
6363

6464
[workspace.package]
6565
rust-version = "1.82.0"
66-
version = "0.17.0"
66+
version = "0.17.1"
6767
authors = ["Kaspa developers"]
6868
license = "ISC"
6969
repository = "https://github.com/kaspanet/rusty-kaspa"
@@ -80,61 +80,61 @@ include = [
8080
]
8181

8282
[workspace.dependencies]
83-
# kaspa-testing-integration = { version = "0.17.0", path = "testing/integration" }
84-
kaspa-addresses = { version = "0.17.0", path = "crypto/addresses" }
85-
kaspa-addressmanager = { version = "0.17.0", path = "components/addressmanager" }
86-
kaspa-bip32 = { version = "0.17.0", path = "wallet/bip32" }
87-
kaspa-cli = { version = "0.17.0", path = "cli" }
88-
kaspa-connectionmanager = { version = "0.17.0", path = "components/connectionmanager" }
89-
kaspa-consensus = { version = "0.17.0", path = "consensus" }
90-
kaspa-consensus-core = { version = "0.17.0", path = "consensus/core" }
91-
kaspa-consensus-client = { version = "0.17.0", path = "consensus/client" }
92-
kaspa-consensus-notify = { version = "0.17.0", path = "consensus/notify" }
93-
kaspa-consensus-wasm = { version = "0.17.0", path = "consensus/wasm" }
94-
kaspa-consensusmanager = { version = "0.17.0", path = "components/consensusmanager" }
95-
kaspa-core = { version = "0.17.0", path = "core" }
96-
kaspa-daemon = { version = "0.17.0", path = "daemon" }
97-
kaspa-database = { version = "0.17.0", path = "database" }
98-
kaspa-grpc-client = { version = "0.17.0", path = "rpc/grpc/client" }
99-
kaspa-grpc-core = { version = "0.17.0", path = "rpc/grpc/core" }
100-
kaspa-grpc-server = { version = "0.17.0", path = "rpc/grpc/server" }
101-
kaspa-hashes = { version = "0.17.0", path = "crypto/hashes" }
102-
kaspa-index-core = { version = "0.17.0", path = "indexes/core" }
103-
kaspa-index-processor = { version = "0.17.0", path = "indexes/processor" }
104-
kaspa-math = { version = "0.17.0", path = "math" }
105-
kaspa-merkle = { version = "0.17.0", path = "crypto/merkle" }
106-
kaspa-metrics-core = { version = "0.17.0", path = "metrics/core" }
107-
kaspa-mining = { version = "0.17.0", path = "mining" }
108-
kaspa-mining-errors = { version = "0.17.0", path = "mining/errors" }
109-
kaspa-muhash = { version = "0.17.0", path = "crypto/muhash" }
110-
kaspa-notify = { version = "0.17.0", path = "notify" }
111-
kaspa-p2p-flows = { version = "0.17.0", path = "protocol/flows" }
112-
kaspa-p2p-lib = { version = "0.17.0", path = "protocol/p2p" }
113-
kaspa-perf-monitor = { version = "0.17.0", path = "metrics/perf_monitor" }
114-
kaspa-pow = { version = "0.17.0", path = "consensus/pow" }
115-
kaspa-rpc-core = { version = "0.17.0", path = "rpc/core" }
116-
kaspa-rpc-macros = { version = "0.17.0", path = "rpc/macros" }
117-
kaspa-rpc-service = { version = "0.17.0", path = "rpc/service" }
118-
kaspa-txscript = { version = "0.17.0", path = "crypto/txscript" }
119-
kaspa-txscript-errors = { version = "0.17.0", path = "crypto/txscript/errors" }
120-
kaspa-utils = { version = "0.17.0", path = "utils" }
121-
kaspa-utils-tower = { version = "0.17.0", path = "utils/tower" }
122-
kaspa-utxoindex = { version = "0.17.0", path = "indexes/utxoindex" }
123-
kaspa-wallet = { version = "0.17.0", path = "wallet/native" }
124-
kaspa-wallet-cli-wasm = { version = "0.17.0", path = "wallet/wasm" }
125-
kaspa-wallet-keys = { version = "0.17.0", path = "wallet/keys" }
126-
kaspa-wallet-pskt = { version = "0.17.0", path = "wallet/pskt" }
127-
kaspa-wallet-core = { version = "0.17.0", path = "wallet/core" }
128-
kaspa-wallet-macros = { version = "0.17.0", path = "wallet/macros" }
129-
kaspa-wasm = { version = "0.17.0", path = "wasm" }
130-
kaspa-wasm-core = { version = "0.17.0", path = "wasm/core" }
131-
kaspa-wrpc-client = { version = "0.17.0", path = "rpc/wrpc/client" }
132-
kaspa-wrpc-proxy = { version = "0.17.0", path = "rpc/wrpc/proxy" }
133-
kaspa-wrpc-server = { version = "0.17.0", path = "rpc/wrpc/server" }
134-
kaspa-wrpc-wasm = { version = "0.17.0", path = "rpc/wrpc/wasm" }
135-
kaspa-wrpc-example-subscriber = { version = "0.17.0", path = "rpc/wrpc/examples/subscriber" }
136-
kaspad = { version = "0.17.0", path = "kaspad" }
137-
kaspa-alloc = { version = "0.17.0", path = "utils/alloc" }
83+
# kaspa-testing-integration = { version = "0.17.1", path = "testing/integration" }
84+
kaspa-addresses = { version = "0.17.1", path = "crypto/addresses" }
85+
kaspa-addressmanager = { version = "0.17.1", path = "components/addressmanager" }
86+
kaspa-bip32 = { version = "0.17.1", path = "wallet/bip32" }
87+
kaspa-cli = { version = "0.17.1", path = "cli" }
88+
kaspa-connectionmanager = { version = "0.17.1", path = "components/connectionmanager" }
89+
kaspa-consensus = { version = "0.17.1", path = "consensus" }
90+
kaspa-consensus-core = { version = "0.17.1", path = "consensus/core" }
91+
kaspa-consensus-client = { version = "0.17.1", path = "consensus/client" }
92+
kaspa-consensus-notify = { version = "0.17.1", path = "consensus/notify" }
93+
kaspa-consensus-wasm = { version = "0.17.1", path = "consensus/wasm" }
94+
kaspa-consensusmanager = { version = "0.17.1", path = "components/consensusmanager" }
95+
kaspa-core = { version = "0.17.1", path = "core" }
96+
kaspa-daemon = { version = "0.17.1", path = "daemon" }
97+
kaspa-database = { version = "0.17.1", path = "database" }
98+
kaspa-grpc-client = { version = "0.17.1", path = "rpc/grpc/client" }
99+
kaspa-grpc-core = { version = "0.17.1", path = "rpc/grpc/core" }
100+
kaspa-grpc-server = { version = "0.17.1", path = "rpc/grpc/server" }
101+
kaspa-hashes = { version = "0.17.1", path = "crypto/hashes" }
102+
kaspa-index-core = { version = "0.17.1", path = "indexes/core" }
103+
kaspa-index-processor = { version = "0.17.1", path = "indexes/processor" }
104+
kaspa-math = { version = "0.17.1", path = "math" }
105+
kaspa-merkle = { version = "0.17.1", path = "crypto/merkle" }
106+
kaspa-metrics-core = { version = "0.17.1", path = "metrics/core" }
107+
kaspa-mining = { version = "0.17.1", path = "mining" }
108+
kaspa-mining-errors = { version = "0.17.1", path = "mining/errors" }
109+
kaspa-muhash = { version = "0.17.1", path = "crypto/muhash" }
110+
kaspa-notify = { version = "0.17.1", path = "notify" }
111+
kaspa-p2p-flows = { version = "0.17.1", path = "protocol/flows" }
112+
kaspa-p2p-lib = { version = "0.17.1", path = "protocol/p2p" }
113+
kaspa-perf-monitor = { version = "0.17.1", path = "metrics/perf_monitor" }
114+
kaspa-pow = { version = "0.17.1", path = "consensus/pow" }
115+
kaspa-rpc-core = { version = "0.17.1", path = "rpc/core" }
116+
kaspa-rpc-macros = { version = "0.17.1", path = "rpc/macros" }
117+
kaspa-rpc-service = { version = "0.17.1", path = "rpc/service" }
118+
kaspa-txscript = { version = "0.17.1", path = "crypto/txscript" }
119+
kaspa-txscript-errors = { version = "0.17.1", path = "crypto/txscript/errors" }
120+
kaspa-utils = { version = "0.17.1", path = "utils" }
121+
kaspa-utils-tower = { version = "0.17.1", path = "utils/tower" }
122+
kaspa-utxoindex = { version = "0.17.1", path = "indexes/utxoindex" }
123+
kaspa-wallet = { version = "0.17.1", path = "wallet/native" }
124+
kaspa-wallet-cli-wasm = { version = "0.17.1", path = "wallet/wasm" }
125+
kaspa-wallet-keys = { version = "0.17.1", path = "wallet/keys" }
126+
kaspa-wallet-pskt = { version = "0.17.1", path = "wallet/pskt" }
127+
kaspa-wallet-core = { version = "0.17.1", path = "wallet/core" }
128+
kaspa-wallet-macros = { version = "0.17.1", path = "wallet/macros" }
129+
kaspa-wasm = { version = "0.17.1", path = "wasm" }
130+
kaspa-wasm-core = { version = "0.17.1", path = "wasm/core" }
131+
kaspa-wrpc-client = { version = "0.17.1", path = "rpc/wrpc/client" }
132+
kaspa-wrpc-proxy = { version = "0.17.1", path = "rpc/wrpc/proxy" }
133+
kaspa-wrpc-server = { version = "0.17.1", path = "rpc/wrpc/server" }
134+
kaspa-wrpc-wasm = { version = "0.17.1", path = "rpc/wrpc/wasm" }
135+
kaspa-wrpc-example-subscriber = { version = "0.17.1", path = "rpc/wrpc/examples/subscriber" }
136+
kaspad = { version = "0.17.1", path = "kaspad" }
137+
kaspa-alloc = { version = "0.17.1", path = "utils/alloc" }
138138

139139
# external
140140
aes = "0.8.3"

consensus/src/processes/pruning_proof/build.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use kaspa_consensus_core::{
55
blockhash::{BlockHashExtensions, BlockHashes},
66
header::Header,
77
pruning::PruningPointProof,
8-
BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
8+
BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
99
};
1010
use kaspa_core::debug;
1111
use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions, DB};
@@ -71,6 +71,11 @@ impl PruningProofManager {
7171
let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap();
7272
let (ghostdag_stores, selected_tip_by_level, roots_by_level) = self.calc_gd_for_all_levels(&pp_header, temp_db);
7373

74+
// The pruning proof can contain many duplicate headers (across levels), so we use a local cache in order
75+
// to make sure we hold a single Arc per header
76+
let mut cache: BlockHashMap<Arc<Header>> = BlockHashMap::with_capacity(4 * self.pruning_proof_m as usize);
77+
let mut get_header = |hash| cache.entry(hash).or_insert_with_key(|&hash| self.headers_store.get_header(hash).unwrap()).clone();
78+
7479
(0..=self.max_block_level)
7580
.map(|level| {
7681
let level = level as usize;
@@ -114,7 +119,7 @@ impl PruningProofManager {
114119
let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize);
115120
let mut queue = BinaryHeap::<Reverse<SortableBlock>>::new();
116121
let mut visited = BlockHashSet::new();
117-
queue.push(Reverse(SortableBlock::new(root, self.headers_store.get_header(root).unwrap().blue_work)));
122+
queue.push(Reverse(SortableBlock::new(root, get_header(root).blue_work)));
118123
while let Some(current) = queue.pop() {
119124
let current = current.0.hash;
120125
if !visited.insert(current) {
@@ -130,9 +135,9 @@ impl PruningProofManager {
130135
continue;
131136
}
132137

133-
headers.push(self.headers_store.get_header(current).unwrap());
138+
headers.push(get_header(current));
134139
for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() {
135-
queue.push(Reverse(SortableBlock::new(child, self.headers_store.get_header(child).unwrap().blue_work)));
140+
queue.push(Reverse(SortableBlock::new(child, get_header(child).blue_work)));
136141
}
137142
}
138143

@@ -232,14 +237,14 @@ impl PruningProofManager {
232237

233238
// We only have the headers store (which has level 0 blue_scores) to assemble the proof data from.
234239
// We need to look deeper at higher levels (2x deeper every level) to find 2M (plus margin) blocks at that level
235-
// TODO: uncomment when the full fix to minimize proof sizes come.
240+
// TODO: uncomment when the full fix to minimize proof sizes comes.
236241
// let mut required_base_level_depth = self.estimated_blue_depth_at_level_0(
237242
// level,
238243
// required_level_depth + 100, // We take a safety margin
239244
// current_dag_level,
240245
// );
241-
// NOTE: Starting from required_level_depth (a much lower starting point than normal) will typically require N iterations
242-
// for every N level higher than current dag level. This is fine since the steps per iteration are still exponential
246+
// NOTE: Starting from required_level_depth (a much lower starting point than normal) will typically require O(N) iterations
247+
// for level L + N where L is the current dag level. This is fine since the steps per iteration are still exponential
243248
// and so we will complete each level in not much more than N iterations per level.
244249
// We start here anyway so we can try to minimize the proof size when the current dag level goes down significantly.
245250
let mut required_base_level_depth = required_level_depth + 100;

consensus/src/processes/pruning_proof/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,11 @@ impl PruningProofManager {
383383
}
384384
}
385385
let proof = Arc::new(self.build_pruning_point_proof(pp));
386+
info!(
387+
"Built headers proof with overall {} headers ({} unique)",
388+
proof.iter().map(|l| l.len()).sum::<usize>(),
389+
proof.iter().flatten().unique_by(|h| h.hash).count()
390+
);
386391
cache_lock.replace(CachedPruningPointData { pruning_point: pp, data: proof.clone() });
387392
proof
388393
}

consensus/src/processes/pruning_proof/validate.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,15 @@ impl PruningProofManager {
110110
) {
111111
let proof_level_blue_work_diff = proof_selected_tip_gd.blue_work.saturating_sub(proof_common_ancestor_gd.blue_work);
112112
for parent in self.parents_manager.parents_at_level(&current_pp_header, level).iter().copied() {
113-
let parent_blue_work = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap();
114-
let parent_blue_work_diff = parent_blue_work.saturating_sub(common_ancestor_gd.blue_work);
115-
if parent_blue_work_diff.saturating_add(pruning_period_work)
116-
>= proof_level_blue_work_diff.saturating_add(prover_claimed_pruning_period_work)
113+
// Not all parents by level are guaranteed to be GD populated, but at least one of them will (the proof level selected tip)
114+
if let Some(parent_blue_work) = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap_option()
117115
{
118-
return Err(PruningImportError::PruningProofInsufficientBlueWork);
116+
let parent_blue_work_diff = parent_blue_work.saturating_sub(common_ancestor_gd.blue_work);
117+
if parent_blue_work_diff.saturating_add(pruning_period_work)
118+
>= proof_level_blue_work_diff.saturating_add(prover_claimed_pruning_period_work)
119+
{
120+
return Err(PruningImportError::PruningProofInsufficientBlueWork);
121+
}
119122
}
120123
}
121124

kaspad/src/daemon.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,16 +300,16 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
300300
let headers_store = DbHeadersStore::new(consensus_db, CachePolicy::Empty, CachePolicy::Empty);
301301

302302
if headers_store.has(config.genesis.hash).unwrap() {
303-
info!("Genesis is found in active consensus DB. No action needed.");
303+
debug!("Genesis is found in active consensus DB. No action needed.");
304304
} else {
305-
let msg = "Genesis not found in active consensus DB. This happens when Testnet 11 is restarted and your database needs to be fully deleted. Do you confirm the delete? (y/n)";
305+
let msg = "Genesis not found in active consensus DB. This happens when Testnets are restarted and your database needs to be fully deleted. Do you confirm the delete? (y/n)";
306306
get_user_approval_or_exit(msg, args.yes);
307307

308308
is_db_reset_needed = true;
309309
}
310310
}
311311
None => {
312-
info!("Consensus not initialized yet. Skipping genesis check.");
312+
debug!("Consensus not initialized yet. Skipping genesis check.");
313313
}
314314
}
315315
}
@@ -454,8 +454,9 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
454454
let connect_peers = args.connect_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect::<Vec<_>>();
455455
let add_peers = args.add_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect();
456456
let p2p_server_addr = args.listen.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_p2p_port());
457-
// connect_peers means no DNS seeding and no outbound peers
457+
// connect_peers means no DNS seeding and no outbound/inbound peers
458458
let outbound_target = if connect_peers.is_empty() { args.outbound_target } else { 0 };
459+
let inbound_limit = if connect_peers.is_empty() { args.inbound_limit } else { 0 };
459460
let dns_seeders = if connect_peers.is_empty() && !args.disable_dns_seeding { config.dns_seeders } else { &[] };
460461

461462
let grpc_server_addr = args.rpclisten.unwrap_or(ContextualNetAddress::loopback()).normalize(config.default_rpc_port());
@@ -556,7 +557,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
556557
add_peers,
557558
p2p_server_addr,
558559
outbound_target,
559-
args.inbound_limit,
560+
inbound_limit,
560561
dns_seeders,
561562
config.default_p2p_port(),
562563
p2p_tower_counters.clone(),

protocol/flows/src/service.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,12 @@ impl AsyncService for P2pService {
6565
// Prepare a shutdown signal receiver
6666
let shutdown_signal = self.shutdown.listener.clone();
6767

68-
let p2p_adaptor =
68+
let p2p_adaptor = if self.inbound_limit == 0 {
69+
Adaptor::client_only(self.flow_context.hub().clone(), self.flow_context.clone(), self.counters.clone())
70+
} else {
6971
Adaptor::bidirectional(self.listen, self.flow_context.hub().clone(), self.flow_context.clone(), self.counters.clone())
70-
.unwrap();
72+
.unwrap()
73+
};
7174
let connection_manager = ConnectionManager::new(
7275
p2p_adaptor.clone(),
7376
self.outbound_target,

protocol/flows/src/v5/ibd/flow.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
},
77
};
88
use futures::future::{join_all, select, try_join_all, Either};
9+
use itertools::Itertools;
910
use kaspa_consensus_core::{
1011
api::BlockValidationFuture,
1112
block::Block,
@@ -245,7 +246,11 @@ impl IbdFlow {
245246
// Pruning proof generation and communication might take several minutes, so we allow a long 10 minute timeout
246247
let msg = dequeue_with_timeout!(self.incoming_route, Payload::PruningPointProof, Duration::from_secs(600))?;
247248
let proof: PruningPointProof = msg.try_into()?;
248-
debug!("received proof with overall {} headers", proof.iter().map(|l| l.len()).sum::<usize>());
249+
info!(
250+
"Received headers proof with overall {} headers ({} unique)",
251+
proof.iter().map(|l| l.len()).sum::<usize>(),
252+
proof.iter().flatten().unique_by(|h| h.hash).count()
253+
);
249254

250255
let proof_metadata = PruningProofMetadata::new(relay_block.header.blue_work);
251256

protocol/p2p/src/convert/messages.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use kaspa_consensus_core::{
1515
use kaspa_hashes::Hash;
1616
use kaspa_utils::networking::{IpAddress, PeerId};
1717

18-
use std::sync::Arc;
18+
use std::{collections::HashMap, sync::Arc};
1919

2020
// ----------------------------------------------------------------------------
2121
// consensus_core to protowire
@@ -85,7 +85,23 @@ impl TryFrom<protowire::RequestIbdChainBlockLocatorMessage> for (Option<Hash>, O
8585
impl TryFrom<protowire::PruningPointProofMessage> for PruningPointProof {
8686
type Error = ConversionError;
8787
fn try_from(msg: protowire::PruningPointProofMessage) -> Result<Self, Self::Error> {
88-
msg.headers.into_iter().map(|v| v.try_into()).collect()
88+
// The pruning proof can contain many duplicate headers (across levels), so we use a local cache in order
89+
// to make sure we hold a single Arc per header
90+
let mut cache: HashMap<Hash, Arc<Header>> = HashMap::with_capacity(4000);
91+
msg.headers
92+
.into_iter()
93+
.map(|level| {
94+
level
95+
.headers
96+
.into_iter()
97+
.map(|x| {
98+
let header: Header = x.try_into()?;
99+
// Clone the existing Arc if found
100+
Ok(cache.entry(header.hash).or_insert_with(|| Arc::new(header)).clone())
101+
})
102+
.collect()
103+
})
104+
.collect()
89105
}
90106
}
91107

0 commit comments

Comments
 (0)