Skip to content

Commit 499de32

Browse files
Add Reliable Indexer (Attempt 2) (#184)
* spike * fmt * nit * fmt * add fail certs test * add more coverage * spike * progress * add metrics * metrics tests * spike * progress * cleanup * simplify * more work * add some comments * fmt * nits * more handling * remove token * nits * push * fmt * nits * nits * progress * flatten * nit * nit * record * nits * nits * comments * provide constructor * cleanup * still complex * progress * remove duplicate metric * dedup * nit * cleanup visibility * cleanup * nits * recorder * producer/consumer * naming cleanup * nits * nits * simplify * nits * nits * nits * queue entry * remove useless labels * nit * cleanup * nits * nit * nit * nit * naming * backfiller * slim down * remove useless sync * update dashboard * add retriable fetch * fmt * nits * nit * remove raw * add jitter * nit * add resolver * consensus.rs * nits * nit * flatten * add better retry behavior * simpler name * nit * nits * max active * nits * nit * nits * cleanup consumer * nit * nits * nit
1 parent d8b2c95 commit 499de32

27 files changed

Lines changed: 3006 additions & 910 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ futures-util = "0.3.31"
4444
tracing = "0.1.41"
4545
tracing-subscriber = "0.3.19"
4646
governor = "0.10.2"
47-
prometheus-client = "0.22.3"
47+
prometheus-client = "0.24.0"
4848
clap = "4.5.18"
4949
tokio = "1.41.0"
5050
axum = "0.8.8"

chain/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ tracing = { workspace = true }
3333
tracing-subscriber = { workspace = true, features = ["fmt", "json"] }
3434
governor = { workspace = true }
3535
serde = { version = "1.0.218", features = ["derive"] }
36+
prometheus-client = { workspace = true }

chain/src/application.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::indexer;
12
use alto_types::{Block, Context, Scheme, EPOCH};
23
use commonware_consensus::{
34
marshal::{
@@ -7,8 +8,8 @@ use commonware_consensus::{
78
types::{Height, Round, View},
89
Heightable, Reporter,
910
};
10-
use commonware_cryptography::{ed25519, sha256, Digest, Digestible, Hasher, Sha256, Signer};
11-
use commonware_runtime::{Clock, Metrics, Spawner};
11+
use commonware_cryptography::{ed25519, sha256, Digest as _, Digestible, Hasher, Sha256, Signer};
12+
use commonware_runtime::{Clock, Metrics, Spawner, Storage};
1213
use commonware_utils::{Acknowledgement, SystemTimeExt};
1314
use futures::StreamExt;
1415
use rand::Rng;
@@ -22,11 +23,12 @@ const GENESIS: &[u8] = b"commonware is neat";
2223
const SYNCHRONY_BOUND: u64 = 500;
2324

2425
#[derive(Clone)]
25-
pub struct Application {
26+
pub struct Application<E: Clock + Storage + Metrics> {
2627
genesis: Arc<Block>,
28+
backfiller: Option<indexer::Producer<E>>,
2729
}
2830

29-
impl Application {
31+
impl<E: Clock + Storage + Metrics> Application<E> {
3032
pub fn new() -> Self {
3133
let genesis_context = Context {
3234
round: Round::new(EPOCH, View::zero()),
@@ -36,19 +38,25 @@ impl Application {
3638
let genesis = Block::new(genesis_context, Sha256::hash(GENESIS), Height::zero(), 0);
3739
Self {
3840
genesis: Arc::new(genesis),
41+
backfiller: None,
3942
}
4043
}
44+
45+
pub(crate) fn with_backfiller(mut self, backfiller: indexer::Producer<E>) -> Self {
46+
self.backfiller = Some(backfiller);
47+
self
48+
}
4149
}
4250

43-
impl Default for Application {
51+
impl<E: Clock + Storage + Metrics> Default for Application<E> {
4452
fn default() -> Self {
4553
Self::new()
4654
}
4755
}
4856

49-
impl<E> commonware_consensus::Application<E> for Application
57+
impl<E: Clock + Storage + Metrics> commonware_consensus::Application<E> for Application<E>
5058
where
51-
E: Rng + Spawner + Metrics + Clock,
59+
E: Rng + Spawner + Metrics + Clock + Storage,
5260
{
5361
type SigningScheme = Scheme;
5462
type Context = Context;
@@ -65,7 +73,7 @@ where
6573
) -> Option<Self::Block> {
6674
let parent = ancestry.next().await?;
6775

68-
// Create a new block
76+
// Create a new block.
6977
let mut current = runtime_context.current().epoch_millis();
7078
if current <= parent.timestamp {
7179
current = parent.timestamp + 1;
@@ -80,9 +88,9 @@ where
8088
}
8189
}
8290

83-
impl<E> commonware_consensus::VerifyingApplication<E> for Application
91+
impl<E: Clock + Storage + Metrics> commonware_consensus::VerifyingApplication<E> for Application<E>
8492
where
85-
E: Rng + Spawner + Metrics + Clock,
93+
E: Rng + Spawner + Metrics + Clock + Storage,
8694
{
8795
async fn verify<A: BlockProvider<Block = Self::Block>>(
8896
&mut self,
@@ -96,7 +104,7 @@ where
96104
return false;
97105
};
98106

99-
// Verify the block
107+
// Verify the block.
100108
if block.timestamp <= parent.timestamp {
101109
return false;
102110
}
@@ -108,16 +116,22 @@ where
108116
// The height and digest invariants are enforced in `Marshaled`:
109117
// - The block height must be one greater than the parent's height.
110118
// - The block's parent digest must match the parent's digest.
111-
112119
true
113120
}
114121
}
115122

116-
impl Reporter for Application {
123+
impl<E: Clock + Storage + Metrics> Reporter for Application<E> {
117124
type Activity = Update<Block>;
118125

119126
async fn report(&mut self, activity: Self::Activity) {
120127
if let Update::Block(block, ack_rx) = activity {
128+
// Cache the finalized block in memory and enqueue its digest
129+
// before acking so the consumer can recover it across restarts.
130+
if let Some(backfiller) = &self.backfiller {
131+
backfiller.record(&block).await;
132+
}
133+
134+
// Acknowledge the block.
121135
info!(height = %block.height(), "finalized block");
122136
ack_rx.acknowledge();
123137
}

chain/src/engine.rs

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
application::Application,
3-
indexer::{self, Indexer},
3+
indexer::{self, Client},
44
};
55
use alto_types::{Activity, Block, Finalization, Scheme, EPOCH, EPOCH_LENGTH, NAMESPACE};
66
use commonware_broadcast::buffered;
@@ -28,7 +28,7 @@ use commonware_runtime::{
2828
buffer::paged::CacheRef, spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics,
2929
Spawner, Storage, ThreadPooler,
3030
};
31-
use commonware_storage::archive::immutable;
31+
use commonware_storage::{archive::immutable, queue};
3232
use commonware_utils::channel::mpsc;
3333
use commonware_utils::{ordered::Set, NZU16};
3434
use commonware_utils::{NZUsize, NZU64};
@@ -37,19 +37,20 @@ use governor::clock::Clock as GClock;
3737
use governor::Quota;
3838
use rand::{CryptoRng, Rng};
3939
use std::{
40-
num::NonZero,
40+
num::{NonZero, NonZeroUsize},
4141
time::{Duration, Instant},
4242
};
4343
use tracing::{error, info, warn};
4444

4545
/// Reporter type for [simplex::Engine].
46-
type Reporter<E, I> =
47-
Reporters<Activity, MarshalMailbox<Scheme, Standard<Block>>, Option<indexer::Pusher<E, I>>>;
46+
type Reporter<E, C> =
47+
Reporters<Activity, MarshalMailbox<Scheme, Standard<Block>>, Option<indexer::Pusher<E, C>>>;
4848

4949
/// To better support peers near tip during network instability, we multiply
5050
/// the consensus activity timeout by this factor.
5151
const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
5252
const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
53+
const QUEUE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(128);
5354
const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
5455
const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
5556
const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
@@ -66,7 +67,7 @@ const MAX_PENDING_ACKS: NonZero<usize> = NZUsize!(16);
6667
pub struct Config<
6768
B: Blocker<PublicKey = PublicKey>,
6869
P: Provider<PublicKey = PublicKey>,
69-
I: Indexer,
70+
C: Client,
7071
S: Strategy,
7172
> {
7273
pub blocker: B,
@@ -91,23 +92,25 @@ pub struct Config<
9192
pub max_fetch_size: usize,
9293
pub fetch_concurrent: usize,
9394
pub fetch_rate_per_peer: Quota,
95+
pub backfiller_max_active: NonZeroUsize,
96+
pub backfiller_retry: Duration,
9497

9598
pub strategy: S,
9699

97-
pub indexer: Option<I>,
100+
pub indexer: Option<C>,
98101
}
99102

100-
type Marshaled<E> = Deferred<E, Scheme, Application, Block, FixedEpocher>;
103+
type Marshaled<E> = Deferred<E, Scheme, Application<E>, Block, FixedEpocher>;
101104

102105
/// The engine that drives the [Application].
103106
#[allow(clippy::type_complexity)]
104-
pub struct Engine<E, B, P, S, I>
107+
pub struct Engine<E, B, P, S, C>
105108
where
106109
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
107110
B: Blocker<PublicKey = PublicKey>,
108111
P: Provider<PublicKey = PublicKey>,
109112
S: Strategy,
110-
I: Indexer,
113+
C: Client,
111114
{
112115
context: ContextCell<E>,
113116

@@ -125,19 +128,21 @@ where
125128
marshaled: Marshaled<E>,
126129

127130
consensus:
128-
Consensus<E, Scheme, Random, B, Digest, Marshaled<E>, Marshaled<E>, Reporter<E, I>, S>,
131+
Consensus<E, Scheme, Random, B, Digest, Marshaled<E>, Marshaled<E>, Reporter<E, C>, S>,
132+
133+
consumer: Option<indexer::Consumer<E, C>>,
129134
}
130135

131-
impl<E, B, P, S, I> Engine<E, B, P, S, I>
136+
impl<E, B, P, S, C> Engine<E, B, P, S, C>
132137
where
133138
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + ThreadPooler + Storage + Metrics,
134139
B: Blocker<PublicKey = PublicKey>,
135140
P: Provider<PublicKey = PublicKey>,
136141
S: Strategy,
137-
I: Indexer,
142+
C: Client,
138143
{
139144
/// Create a new [Engine].
140-
pub async fn new(context: E, cfg: Config<B, P, I, S>) -> Self {
145+
pub async fn new(context: E, cfg: Config<B, P, C, S>) -> Self {
141146
// Create the buffer
142147
let (buffer, buffer_mailbox) = buffered::Engine::new(
143148
context.with_label("buffer"),
@@ -266,27 +271,49 @@ where
266271
)
267272
.await;
268273

274+
// Create the reporter and, when an indexer is configured, a backfill
275+
// queue of finalized digests so block uploads can resume after
276+
// restarts.
277+
let (app, pusher, consumer) = if let Some(indexer) = cfg.indexer {
278+
let queue = queue::shared::init(
279+
context.with_label("queue"),
280+
queue::Config {
281+
partition: format!("{}-finalized-queue", cfg.partition_prefix),
282+
items_per_section: QUEUE_ITEMS_PER_SECTION,
283+
compression: None,
284+
codec_config: (),
285+
page_cache: page_cache.clone(),
286+
write_buffer: WRITE_BUFFER,
287+
},
288+
)
289+
.await
290+
.expect("failed to initialize finalized queue");
291+
let indexer = indexer::Indexer::new(
292+
context.with_label("indexer"),
293+
indexer,
294+
marshal_mailbox.clone(),
295+
queue,
296+
cfg.backfiller_max_active,
297+
cfg.backfiller_retry,
298+
)
299+
.await;
300+
let (producer, pusher, consumer) = indexer.split();
301+
let app = Application::new().with_backfiller(producer);
302+
(app, Some(pusher), Some(consumer))
303+
} else {
304+
(Application::new(), None, None)
305+
};
306+
269307
// Create the application
270-
let app = Application::new();
271308
let marshaled = Marshaled::new(
272309
context.with_label("marshaled"),
273310
app,
274311
marshal_mailbox.clone(),
275312
epocher,
276313
);
277314

278-
// Create the reporter
279-
let reporter = (
280-
marshal_mailbox.clone(),
281-
cfg.indexer.map(|indexer| {
282-
indexer::Pusher::new(
283-
context.with_label("indexer"),
284-
indexer,
285-
marshal_mailbox.clone(),
286-
)
287-
}),
288-
)
289-
.into();
315+
// Create the reporter.
316+
let reporter = (marshal_mailbox.clone(), pusher).into();
290317

291318
// Create the consensus engine
292319
let consensus = Consensus::new(
@@ -324,6 +351,8 @@ where
324351
marshal,
325352
marshaled,
326353
consensus,
354+
355+
consumer,
327356
}
328357
}
329358

@@ -391,14 +420,22 @@ where
391420
.marshal
392421
.start(self.marshaled, self.buffer_mailbox, marshal);
393422

423+
// Start draining queued block uploads before consensus so recovered work
424+
// resumes immediately on startup.
425+
let consumer_handle = self.consumer.map(indexer::Consumer::start);
426+
394427
// Start consensus
395428
//
396429
// We start the application prior to consensus to ensure we can handle enqueued events from consensus (otherwise
397430
// restart could block).
398431
let consensus_handle = self.consensus.start(pending, recovered, resolver);
399432

400433
// Wait for any actor to finish
401-
if let Err(e) = try_join_all(vec![buffer_handle, marshal_handle, consensus_handle]).await {
434+
let mut handles: Vec<Handle<()>> = vec![buffer_handle, marshal_handle, consensus_handle];
435+
if let Some(h) = consumer_handle {
436+
handles.push(h);
437+
}
438+
if let Err(e) = try_join_all(handles).await {
402439
error!(?e, "engine failed");
403440
} else {
404441
warn!("engine stopped");

0 commit comments

Comments
 (0)