Skip to content

Commit a179785

Browse files
[follower] Improve Sync Performance (#178)
* test better sync * progress * progress * http2 * more configs * more changes * progress * keep aligned * fmt * add pruning * nits * update version * fmt * add comment * cleanup archive * cleanup naming * add more * nits * add inline * add more * add signature threads * fix lint * fmt * attempt best-effort prune * progress * review * fmt * nit * [follower] Even Faster Sync (#179) * test-sync improvements * progress * nits * deps * nit * handle 0 * fmt * cleanup file naming * move tests * mailbox * nits * add const * nits * fmt * nits * accept mailbox size
1 parent d6de85d commit a179785

15 files changed

Lines changed: 883 additions & 572 deletions

File tree

Cargo.lock

Lines changed: 266 additions & 380 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,20 @@ license = "MIT OR Apache-2.0"
2020
alto-chain = { version = "0.0.19", path = "chain" }
2121
alto-client = { version = "0.0.19", path = "client" }
2222
alto-types = { version = "0.0.19", path = "types" }
23-
commonware-broadcast = "2026.2.0"
24-
commonware-codec = "2026.2.0"
25-
commonware-consensus = "2026.2.0"
26-
commonware-cryptography = "2026.2.0"
27-
commonware-deployer = { version = "2026.2.0", default-features = false }
28-
commonware-macros = "2026.2.0"
29-
commonware-p2p = "2026.2.0"
30-
commonware-resolver = "2026.2.0"
31-
commonware-runtime = "2026.2.0"
32-
commonware-storage = "2026.2.0"
33-
commonware-stream = "2026.2.0"
34-
commonware-utils = "2026.2.0"
35-
commonware-math = "2026.2.0"
36-
commonware-parallel = "2026.2.0"
23+
commonware-broadcast = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
24+
commonware-codec = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
25+
commonware-consensus = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
26+
commonware-cryptography = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
27+
commonware-deployer = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e", default-features = false }
28+
commonware-macros = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
29+
commonware-p2p = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
30+
commonware-resolver = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
31+
commonware-runtime = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
32+
commonware-storage = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
33+
commonware-stream = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
34+
commonware-utils = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
35+
commonware-math = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
36+
commonware-parallel = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
3737
thiserror = "2.0.12"
3838
bytes = "1.7.1"
3939
rand = "0.8.5"
@@ -50,7 +50,7 @@ tokio = "1.41.0"
5050
axum = "0.8.8"
5151
tower = "0.5.2"
5252
tower-http = "0.6.2"
53-
reqwest = "0.12.12"
53+
reqwest = { version = "0.12.12", default-features = false }
5454
tokio-tungstenite = "0.28.0"
5555

5656
[profile.bench]

chain/src/engine.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use commonware_p2p::{Blocker, Receiver, Sender};
2121
use commonware_parallel::Strategy;
2222
use commonware_resolver::Resolver;
2323
use commonware_runtime::{
24-
buffer::paged::CacheRef, spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
25-
ThreadPooler,
24+
buffer::paged::CacheRef, spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics,
25+
Spawner, Storage, ThreadPooler,
2626
};
2727
use commonware_storage::archive::immutable;
2828
use commonware_utils::channel::mpsc;
@@ -56,6 +56,7 @@ const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); // 1MB
5656
const PAGE_CACHE_PAGE_SIZE: NonZero<u16> = NZU16!(4_096); // 4KB
5757
const PAGE_CACHE_CAPACITY: NonZero<usize> = NZUsize!(8_192); // 32MB
5858
const MAX_REPAIR: NonZero<usize> = NZUsize!(20);
59+
const MAX_PENDING_ACKS: NonZero<usize> = NZUsize!(16);
5960

6061
/// Configuration for the [Engine].
6162
pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer, S: Strategy> {
@@ -90,12 +91,13 @@ type Marshaled<E> = ConsensusMarshaled<E, Scheme, Application, Block, FixedEpoch
9091

9192
/// The engine that drives the [Application].
9293
#[allow(clippy::type_complexity)]
93-
pub struct Engine<
94-
E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
94+
pub struct Engine<E, B, S, I>
95+
where
96+
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
9597
B: Blocker<PublicKey = PublicKey>,
9698
S: Strategy,
9799
I: Indexer,
98-
> {
100+
{
99101
context: ContextCell<E>,
100102

101103
buffer: buffered::Engine<E, PublicKey, Block>,
@@ -115,12 +117,12 @@ pub struct Engine<
115117
Consensus<E, Scheme, Random, B, Digest, Marshaled<E>, Marshaled<E>, Reporter<E, I>, S>,
116118
}
117119

118-
impl<
119-
E: Clock + GClock + Rng + CryptoRng + Spawner + ThreadPooler + Storage + Metrics,
120-
B: Blocker<PublicKey = PublicKey>,
121-
S: Strategy,
122-
I: Indexer,
123-
> Engine<E, B, S, I>
120+
impl<E, B, S, I> Engine<E, B, S, I>
121+
where
122+
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + ThreadPooler + Storage + Metrics,
123+
B: Blocker<PublicKey = PublicKey>,
124+
S: Strategy,
125+
I: Indexer,
124126
{
125127
/// Create a new [Engine].
126128
pub async fn new(context: E, cfg: Config<B, I, S>) -> Self {
@@ -137,7 +139,7 @@ impl<
137139
);
138140

139141
// Create the page cache
140-
let page_cache = CacheRef::new(PAGE_CACHE_PAGE_SIZE, PAGE_CACHE_CAPACITY);
142+
let page_cache = CacheRef::from_pooler(&context, PAGE_CACHE_PAGE_SIZE, PAGE_CACHE_CAPACITY);
141143

142144
// Initialize finalizations by height
143145
let start = Instant::now();
@@ -244,6 +246,7 @@ impl<
244246
value_write_buffer: WRITE_BUFFER,
245247
block_codec_config: (),
246248
max_repair: MAX_REPAIR,
249+
max_pending_acks: MAX_PENDING_ACKS,
247250
page_cache: page_cache.clone(),
248251
strategy: cfg.strategy.clone(),
249252
},

client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ bytes = { workspace = true }
2121
rand = { workspace = true }
2222
thiserror = { workspace = true }
2323
futures = { workspace = true }
24-
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
24+
reqwest = { workspace = true, features = ["rustls-tls-native-roots", "http2"] }
2525
rustls = { version = "0.23.23", default-features = false, features = ["std", "aws_lc_rs"] }
2626
rustls-native-certs = "0.8.2"
2727
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }

client/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,16 @@ impl<S: Strategy> ClientBuilder<S> {
110110
pub fn build(self) -> Client<S> {
111111
let certificate_verifier = Scheme::certificate_verifier(NAMESPACE, self.identity);
112112

113-
// Build HTTP client
114-
let mut http_builder = reqwest::Client::builder();
113+
// HTTP/2 multiplexes all requests over a single connection, so
114+
// DNS is only resolved once on the initial connect.
115+
let mut http_builder = reqwest::Client::builder()
116+
.tcp_nodelay(true)
117+
.connect_timeout(std::time::Duration::from_secs(5))
118+
.timeout(std::time::Duration::from_secs(10))
119+
.http2_adaptive_window(true)
120+
.http2_keep_alive_interval(std::time::Duration::from_secs(10))
121+
.http2_keep_alive_timeout(std::time::Duration::from_secs(5))
122+
.http2_keep_alive_while_idle(true);
115123
for cert_der in &self.tls_certs {
116124
let cert = reqwest::Certificate::from_der(cert_der).expect("invalid DER certificate");
117125
http_builder = http_builder.add_root_certificate(cert);

deploy/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,14 @@ After redeploying a cluster, update the identity (BLS12-381 threshold public key
113113

114114
```bash
115115
# Global cluster:
116-
OLD_KEY=$(grep '^identity:' follower/examples/global-config.yaml | sed 's/identity: "//;s/"//')
116+
OLD_KEY=$(grep '^identity:' follower/examples/global.yml | sed 's/identity: "//;s/"//')
117117
NEW_KEY="<new-key-hex>"
118-
sed -i '' "s/$OLD_KEY/$NEW_KEY/g" follower/examples/global-config.yaml inspector/src/main.rs
118+
sed -i '' "s/$OLD_KEY/$NEW_KEY/g" follower/examples/global.yml inspector/src/main.rs
119119

120120
# USA cluster:
121-
OLD_KEY=$(grep '^identity:' follower/examples/usa-config.yaml | sed 's/identity: "//;s/"//')
121+
OLD_KEY=$(grep '^identity:' follower/examples/usa.yml | sed 's/identity: "//;s/"//')
122122
NEW_KEY="<new-key-hex>"
123-
sed -i '' "s/$OLD_KEY/$NEW_KEY/g" follower/examples/usa-config.yaml
123+
sed -i '' "s/$OLD_KEY/$NEW_KEY/g" follower/examples/usa.yml
124124
```
125125

126126
#### Build Validator Binary

follower/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ _To deploy your own instance of `alto`, read the guide in [deploy](../deploy/REA
3939
| `identity` | Hex-encoded BLS12-381 threshold public key used to verify consensus signatures |
4040
| `directory` | Path to store finalized blocks and state |
4141
| `worker_threads` | Number of runtime worker threads |
42+
| `signature_threads` | Number of threads for signature verification |
4243
| `log_level` | Log verbosity (`trace`, `debug`, `info`, `warn`, `error`) |
4344
| `metrics_port` | Port for the Prometheus metrics endpoint |
4445
| `mailbox_size` | Capacity of internal actor mailboxes |
4546
| `max_repair` | Maximum concurrent block fetches during backfill |
4647
| `tip` | Start from the tip of the finalized chain instead of backfilling from genesis |
48+
| `pruning_depth` | Number of finalized blocks to retain before pruning (null to keep all) |
4749

4850
_See [examples/](./examples/) for sample configuration files._
4951

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ identity: "a5e8eb414379bf165cc6f717719c31cd87e062c4e3bd3647ad320dbb0b063b04f2882
1414
directory: "/tmp/alto-follower-global"
1515

1616
# Number of worker threads
17-
worker_threads: 4
17+
worker_threads: 2
18+
19+
# Number of threads for signature verification
20+
signature_threads: 4
1821

1922
# Log level (trace, debug, info, warn, error)
2023
log_level: "info"
@@ -23,11 +26,15 @@ log_level: "info"
2326
metrics_port: 9091
2427

2528
# Size of internal mailboxes
26-
mailbox_size: 1024
29+
mailbox_size: 4096
2730

2831
# Maximum number of blocks to fetch concurrently during backfill
29-
max_repair: 256
32+
max_repair: 512
3033

3134
# Start from the tip of the finalized chain on first run
3235
# Set to false to start from genesis (requires backfilling all historical blocks)
3336
tip: false
37+
38+
# Number of finalized blocks to retain before pruning
39+
# Omit or set to null to keep all blocks (no pruning)
40+
pruning_depth: null
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ identity: "87a81a6cb44377736201073f10e98934d002374a2e2d77968e00b315e60eda60878b1
1616
directory: "/tmp/alto-follower-usa"
1717

1818
# Number of worker threads
19-
worker_threads: 4
19+
worker_threads: 2
20+
21+
# Number of threads for signature verification
22+
signature_threads: 4
2023

2124
# Log level (trace, debug, info, warn, error)
2225
log_level: "info"
@@ -25,11 +28,15 @@ log_level: "info"
2528
metrics_port: 9092
2629

2730
# Size of internal mailboxes
28-
mailbox_size: 1024
31+
mailbox_size: 4096
2932

3033
# Maximum number of blocks to fetch concurrently during backfill
31-
max_repair: 256
34+
max_repair: 512
3235

3336
# Start from the tip of the finalized chain on first run
3437
# Set to false to start from genesis (requires backfilling all historical blocks)
3538
tip: true
39+
40+
# Number of finalized blocks to retain before pruning
41+
# Omit or set to null to keep all blocks (no pruning)
42+
pruning_depth: 100000

follower/src/application.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use crate::throughput::Throughput;
2+
use alto_types::{Block, Scheme};
3+
use commonware_consensus::{
4+
marshal::{self, Update},
5+
types::Height,
6+
Reporter,
7+
};
8+
use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Spawner};
9+
use commonware_utils::Acknowledgement;
10+
use futures::{channel::mpsc, SinkExt, StreamExt};
11+
use tracing::info;
12+
13+
const THROUGHPUT_WINDOW: std::time::Duration = std::time::Duration::from_secs(30);
14+
const PRUNE_INTERVAL: u64 = 10_000;
15+
16+
/// Formats an estimated time of arrival (ETA) based on the remaining work and rate.
17+
fn format_eta(remaining: u64, rate: f64) -> String {
18+
if remaining == 0 {
19+
return "0s".to_string();
20+
}
21+
if !rate.is_finite() || rate <= 0.0 {
22+
return "unknown".to_string();
23+
}
24+
25+
let secs = (remaining as f64 / rate) as u64;
26+
let (h, m, s) = (secs / 3600, (secs % 3600) / 60, secs % 60);
27+
if h > 0 {
28+
format!("{h}h{m:02}m{s:02}s")
29+
} else if m > 0 {
30+
format!("{m}m{s:02}s")
31+
} else {
32+
format!("{s}s")
33+
}
34+
}
35+
36+
/// Formats ETA when remaining work may be unknown (e.g. tip not received yet).
37+
fn format_eta_maybe(remaining: Option<u64>, rate: f64) -> String {
38+
match remaining {
39+
Some(remaining) => format_eta(remaining, rate),
40+
None => "unknown".to_string(),
41+
}
42+
}
43+
44+
/// A forwarder of [Update] messages to the [Application].
45+
#[derive(Clone)]
46+
pub(crate) struct Mailbox {
47+
tx: mpsc::Sender<Update<Block>>,
48+
}
49+
50+
impl Reporter for Mailbox {
51+
type Activity = Update<Block>;
52+
53+
async fn report(&mut self, activity: Self::Activity) {
54+
let _ = self.tx.send(activity).await;
55+
}
56+
}
57+
58+
/// A simple application that tracks just tracks the rate of block processing.
59+
pub(crate) struct Application<E: Clock + Spawner> {
60+
context: ContextCell<E>,
61+
rx: mpsc::Receiver<Update<Block>>,
62+
throughput: Throughput,
63+
tip: Option<Height>,
64+
mailbox: marshal::Mailbox<Scheme, Block>,
65+
pruning_depth: Option<u64>,
66+
}
67+
68+
impl<E: Clock + Spawner> Application<E> {
69+
pub(crate) fn new(
70+
context: E,
71+
mailbox: marshal::Mailbox<Scheme, Block>,
72+
mailbox_size: usize,
73+
pruning_depth: Option<u64>,
74+
) -> (Self, Mailbox) {
75+
let (tx, rx) = mpsc::channel(mailbox_size);
76+
let app = Self {
77+
context: ContextCell::new(context.clone()),
78+
rx,
79+
throughput: Throughput::new(THROUGHPUT_WINDOW),
80+
tip: None,
81+
mailbox,
82+
pruning_depth,
83+
};
84+
(app, Mailbox { tx })
85+
}
86+
87+
pub(crate) fn start(mut self) -> Handle<()> {
88+
spawn_cell!(self.context, self.run().await)
89+
}
90+
91+
async fn run(mut self) {
92+
while let Some(msg) = self.rx.next().await {
93+
match msg {
94+
Update::Tip(_, height, _) => {
95+
self.tip = Some(height);
96+
}
97+
Update::Block(block, ack) => {
98+
// This is where an application would process the
99+
// finalized block (e.g. update state, index transactions,
100+
// serve queries, etc.).
101+
let height = block.height.get();
102+
let bps = self.throughput.record(self.context.current());
103+
let remaining = self.tip.map(|t| t.get().saturating_sub(height));
104+
info!(
105+
height,
106+
tip = self.tip.map(|h| h.get()),
107+
bps = %format_args!("{bps:.2}"),
108+
eta = %format_args!("{}", format_eta_maybe(remaining, bps)),
109+
"processed block"
110+
);
111+
ack.acknowledge();
112+
113+
// Prune the archive if the height is a multiple of the prune interval.
114+
if let Some(depth) = self.pruning_depth.filter(|_| height % PRUNE_INTERVAL == 0)
115+
{
116+
let prune_to = height.saturating_sub(depth);
117+
if prune_to > 0 {
118+
self.mailbox.prune(Height::new(prune_to)).await;
119+
}
120+
}
121+
}
122+
}
123+
}
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::{format_eta, format_eta_maybe};
130+
131+
#[test]
132+
fn eta_is_unknown_when_rate_is_zero_and_remaining_non_zero() {
133+
assert_eq!(format_eta(42, 0.0), "unknown");
134+
}
135+
136+
#[test]
137+
fn eta_is_zero_when_no_remaining_work() {
138+
assert_eq!(format_eta(0, 0.0), "0s");
139+
}
140+
141+
#[test]
142+
fn eta_is_unknown_when_remaining_is_unknown() {
143+
assert_eq!(format_eta_maybe(None, 123.0), "unknown");
144+
}
145+
}

0 commit comments

Comments
 (0)