Skip to content

Commit 4833b9e

Browse files
committed
[glue/stateful] stream sync targets from finalized blocks
1 parent d28e265 commit 4833b9e

5 files changed

Lines changed: 229 additions & 325 deletions

File tree

glue/src/stateful/actor/bootstrap.rs

Lines changed: 66 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
//! - This reconciliation assumes databases were not manually rolled back or
2222
//! replaced out-of-band.
2323
//! - Any rewind failure is fatal and causes a panic.
24-
//! - Bootstrap then transitions to processing mode via
25-
//! [`ApplicationMailbox::sync_complete`] at marshal's processed anchor.
24+
//! - Bootstrap then hands the processed anchor and databases to the actor.
2625
//!
2726
//! The marshal only advances its processed height after it has durably stored
2827
//! the floor block, so reconciliation can read the processed block directly.
@@ -41,18 +40,16 @@
4140
//!
4241
//! 1. Extract the initial anchor and sync targets from the
4342
//! seed block.
44-
//! 2. Run [`StateSyncSet::sync`],
45-
//! which initializes and populates all databases via the provided
46-
//! resolvers. Tip updates stream in via the `target_updates` channel as
47-
//! new blocks finalize during the sync, so the final synced height is
48-
//! determined by the sync routine itself, not pre-determined.
49-
//! 3. Persist `sync_done = true` so subsequent boots skip state sync.
50-
//! 4. Raise the marshal floor to the synced height via
51-
//! [`MarshalMailbox::set_floor`], then assert that the marshal's processed
52-
//! height is at that floor.
53-
//! 5. Call [`ApplicationMailbox::sync_complete`] with the constructed databases
54-
//! and the synced digest, transitioning the actor into block-processing
55-
//! mode.
43+
//! 2. Raise marshal's floor to the initial target so finalized blocks stream
44+
//! contiguously while state sync runs.
45+
//! 3. Run [`StateSyncSet::sync`], which initializes and populates all databases
46+
//! via the provided resolvers. Finalized blocks acknowledged by the actor
47+
//! stream in via the `target_updates` channel during the sync, so the final
48+
//! synced height is determined by the sync routine itself, not pre-determined.
49+
//! 4. Persist `sync_done = true` so subsequent boots skip state sync.
50+
//! 5. Send the constructed databases and synced digest to the actor. The actor
51+
//! keeps acknowledging finalized blocks until marshal has delivered through
52+
//! the synced height, then transitions into block-processing mode.
5653
//!
5754
//! ## Crash during state sync
5855
//!
@@ -64,24 +61,27 @@
6461
6562
use crate::stateful::{
6663
db::{Anchor, DatabaseSet, StateSyncSet, SyncEngineConfig},
67-
Application, Mailbox as ApplicationMailbox,
64+
Application,
6865
};
6966
use commonware_consensus::{
7067
marshal::{
71-
core::{CommitmentFallback, Mailbox as MarshalMailbox, Variant as MarshalVariant},
68+
core::{Mailbox as MarshalMailbox, Variant as MarshalVariant},
7269
Identifier,
7370
},
7471
simplex::types::Finalization,
7572
types::{Height, Round},
7673
CertifiableBlock, Epochable, Heightable, Viewable,
7774
};
78-
use commonware_cryptography::{certificate::Scheme, Digest, Digestible};
75+
use commonware_cryptography::{certificate::Scheme, Digestible};
7976
use commonware_runtime::{
8077
telemetry::metrics::{MetricsExt, Registered},
8178
Clock, Metrics, Spawner, Storage,
8279
};
8380
use commonware_storage::metadata::{Config as MetadataConfig, Metadata};
84-
use commonware_utils::{channel::ring, sequence::U64};
81+
use commonware_utils::{
82+
channel::{fallible::OneshotExt, oneshot, ring},
83+
sequence::U64,
84+
};
8585
use prometheus_client::metrics::gauge::Gauge;
8686
use rand::Rng;
8787

@@ -92,22 +92,17 @@ type SyncTargets<A, E> = <<A as Application<E>>::Databases as DatabaseSet<E>>::S
9292
type BlockDigest<A, E> = <<A as Application<E>>::Block as Digestible>::Digest;
9393
type AnchoredUpdate<A, E> = (Anchor<BlockDigest<A, E>>, SyncTargets<A, E>);
9494

95-
/// Bootstrap outcome before durable metadata is finalized.
96-
enum BootstrapState<D, G, F>
95+
/// Startup bootstrap completion delivered directly to the actor.
96+
pub(super) struct Completion<E, A>
9797
where
98-
G: Digest,
98+
E: Rng + Spawner + Metrics + Clock,
99+
A: Application<E>,
99100
{
100-
/// Databases are ready with no marshal floor update.
101-
Ready {
102-
databases: D,
103-
last_processed: Anchor<G>,
104-
},
105-
/// Databases were state-synced and require marshal floor update.
106-
Synced {
107-
databases: D,
108-
last_processed: Anchor<G>,
109-
floor: F,
110-
},
101+
/// Databases initialized by bootstrap.
102+
pub(super) databases: A::Databases,
103+
104+
/// Last block whose state is already represented by the databases.
105+
pub(super) last_processed: Anchor<BlockDigest<A, E>>,
111106
}
112107

113108
/// Startup inputs for bootstrap.
@@ -144,6 +139,9 @@ where
144139
/// Database configuration for the managed set.
145140
pub(super) db_config: <A::Databases as DatabaseSet<E>>::Config,
146141

142+
/// Application used to load genesis during bootstrap.
143+
pub(super) app: A,
144+
147145
/// Metadata partition that stores the durable "state sync done" bit.
148146
pub(super) metadata_partition: String,
149147

@@ -155,6 +153,9 @@ where
155153

156154
/// Startup mode and required inputs for that mode.
157155
pub(super) mode: Mode<E, A, F>,
156+
157+
/// Actor handoff for initialized databases.
158+
pub(super) completion: oneshot::Sender<Completion<E, A>>,
158159
}
159160

160161
/// Initialize databases and transition the actor into processing mode.
@@ -187,14 +188,13 @@ where
187188
/// databases to marshal's processed block targets. Rewind errors indicate
188189
/// unrecoverable local history loss/corruption (for example pruned rewind
189190
/// boundaries or invalid commit targets), so startup must stop.
190-
/// - Marshal unreachable after `set_floor`. After state sync the marshal
191-
/// floor must be raised so that the node does not attempt to re-process
192-
/// blocks below the synced height. If the marshal does not respond, or
193-
/// reports a processed height that does not equal the floor, the node
194-
/// cannot safely determine where to resume.
191+
/// - Marshal unreachable after `set_floor`. Before state sync the marshal
192+
/// floor must be raised to the initial target so the finalized block stream
193+
/// starts at the first block that may advance synced state. If the marshal
194+
/// does not respond, or reports a processed height below the initial floor,
195+
/// the node cannot safely determine where to resume.
195196
pub(super) async fn bootstrap<E, A, S, V, R>(
196197
marshal: MarshalMailbox<S, V>,
197-
application: ApplicationMailbox<E, A>,
198198
config: BootstrapConfig<E, A, R, Finalization<S, V::Commitment>>,
199199
) where
200200
E: Rng + Spawner + Metrics + Clock + Storage,
@@ -206,10 +206,12 @@ pub(super) async fn bootstrap<E, A, S, V, R>(
206206
let BootstrapConfig {
207207
context,
208208
db_config,
209+
mut app,
209210
metadata_partition,
210211
sync_config,
211212
resolvers,
212213
mode,
214+
completion,
213215
} = config;
214216

215217
let state_sync_done: Registered<Gauge> =
@@ -233,15 +235,9 @@ pub(super) async fn bootstrap<E, A, S, V, R>(
233235
"state sync bootstrap received a sync startup target after state sync was already marked complete",
234236
);
235237

236-
let genesis = application.genesis().await;
238+
let genesis = app.genesis().await;
237239
let databases = A::Databases::init(context.child("db_set"), db_config).await;
238240

239-
// After a crash following state sync, the block at the floor height
240-
// may not yet be in the marshal's archive: `set_floor` advanced
241-
// `processed_height`, but the local marshal had not finalized that
242-
// block through its own consensus flow before the crash. If the
243-
// block is missing, hint the marshal to fetch it from the network,
244-
// then poll until it arrives.
245241
let (processed_anchor, processed_targets) =
246242
processed_anchor_targets::<E, A, S, V>(&marshal, &genesis)
247243
.await
@@ -257,21 +253,27 @@ pub(super) async fn bootstrap<E, A, S, V, R>(
257253
);
258254
}
259255

260-
application.sync_complete(databases, processed_anchor);
256+
assert!(
257+
completion.send_lossy(Completion {
258+
databases,
259+
last_processed: processed_anchor,
260+
}),
261+
"stateful actor dropped during bootstrap completion",
262+
);
261263
return;
262264
}
263265

264-
let state = match mode {
266+
let completion_message = match mode {
265267
Mode::MarshalSync => {
266268
let databases = A::Databases::init(context.child("db_set"), db_config).await;
267-
let genesis = application.genesis().await;
269+
let genesis = app.genesis().await;
268270
let genesis_context = genesis.context();
269271
let last_processed = Anchor {
270272
height: Height::zero(),
271273
round: Round::new(genesis_context.epoch(), genesis_context.view()),
272274
digest: genesis.digest(),
273275
};
274-
BootstrapState::Ready {
276+
Completion {
275277
databases,
276278
last_processed,
277279
}
@@ -288,6 +290,11 @@ pub(super) async fn bootstrap<E, A, S, V, R>(
288290
digest: block.digest(),
289291
};
290292
let initial_targets = A::sync_targets(&block);
293+
// Move marshal to the initial target before state sync starts so
294+
// the actor sees a contiguous finalized stream after target
295+
// selection. Every later finalized block can then become both a
296+
// sync target update and an acknowledgement toward handoff.
297+
marshal.set_floor(finalization.clone());
291298
let (databases, last_processed) = A::Databases::sync(
292299
context.child("state_sync"),
293300
db_config,
@@ -299,56 +306,23 @@ pub(super) async fn bootstrap<E, A, S, V, R>(
299306
)
300307
.await
301308
.unwrap_or_else(|err| panic!("state sync failed: {err:?}"));
302-
let floor = if last_processed.height == initial_anchor.height {
303-
finalization
304-
} else {
305-
marshal
306-
.get_finalization(last_processed.height)
307-
.await
308-
.expect("marshal must respond with finalization after state sync")
309-
};
310-
BootstrapState::Synced {
309+
Completion {
311310
databases,
312311
last_processed,
313-
floor,
314312
}
315313
}
316314
};
317315

318-
let (databases, last_processed) = match state {
319-
BootstrapState::Ready {
320-
databases,
321-
last_processed,
322-
} => {
323-
metadata
324-
.put_sync(SYNC_DONE_KEY, true)
325-
.await
326-
.expect("must persist state sync completion metadata");
327-
state_sync_done.set(1);
328-
(databases, last_processed)
329-
}
330-
BootstrapState::Synced {
331-
databases,
332-
last_processed,
333-
floor: finalization,
334-
} => {
335-
let floor = last_processed.height;
336-
metadata
337-
.put_sync(SYNC_DONE_KEY, true)
338-
.await
339-
.expect("must persist state sync completion metadata");
340-
state_sync_done.set(1);
341-
// Marshal fetches and stores the floor block before advancing its
342-
// processed height, which also clears pending acknowledgements
343-
// below that floor.
344-
let floor_commitment = finalization.proposal.payload;
345-
marshal.set_floor(finalization);
346-
wait_for_floor(&marshal, floor, floor_commitment).await;
347-
(databases, last_processed)
348-
}
349-
};
316+
metadata
317+
.put_sync(SYNC_DONE_KEY, true)
318+
.await
319+
.expect("must persist state sync completion metadata");
320+
state_sync_done.set(1);
350321

351-
application.sync_complete(databases, last_processed);
322+
assert!(
323+
completion.send_lossy(completion_message),
324+
"stateful actor dropped during bootstrap completion",
325+
);
352326
}
353327

354328
/// Load marshal's current processed anchor and derived sync targets.
@@ -401,54 +375,3 @@ where
401375
A::sync_targets(&block),
402376
))
403377
}
404-
405-
async fn wait_for_floor<S, V>(
406-
marshal: &MarshalMailbox<S, V>,
407-
floor: Height,
408-
commitment: V::Commitment,
409-
) where
410-
S: Scheme,
411-
V: MarshalVariant,
412-
{
413-
let block = marshal
414-
.subscribe_by_commitment(commitment, CommitmentFallback::Wait)
415-
.await
416-
.expect("marshal floor block subscription cancelled");
417-
assert_eq!(
418-
block.height(),
419-
floor,
420-
"marshal returned unexpected state sync floor block height",
421-
);
422-
423-
let processed_height = marshal
424-
.get_processed_height()
425-
.await
426-
.expect("marshal must respond with processed height after set_floor");
427-
assert_eq!(
428-
processed_height, floor,
429-
"marshal processed height must match updated floor after state sync",
430-
);
431-
}
432-
433-
#[cfg(test)]
434-
mod tests {
435-
#[test]
436-
fn synced_bootstrap_persists_sync_done_before_advancing_floor() {
437-
let source = include_str!("bootstrap.rs");
438-
let synced_arm = source
439-
.split("BootstrapState::Synced")
440-
.nth(2)
441-
.expect("synced arm should exist");
442-
let set_floor = synced_arm
443-
.find("marshal.set_floor")
444-
.expect("synced bootstrap should advance marshal floor");
445-
let put_sync = synced_arm
446-
.find("put_sync(SYNC_DONE_KEY, true)")
447-
.expect("synced bootstrap should persist sync_done");
448-
449-
assert!(
450-
put_sync < set_floor,
451-
"sync_done must be durable before the marshal floor advances",
452-
);
453-
}
454-
}

0 commit comments

Comments
 (0)