Skip to content

Commit ce762fb

Browse files
move around
1 parent a1bd005 commit ce762fb

4 files changed

Lines changed: 99 additions & 74 deletions

File tree

consensus/src/marshal/config.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@ use std::num::{NonZeroU64, NonZeroUsize};
1717
/// supersedes the configured anchor.
1818
pub enum Start<S: Scheme, C: Digest, B> {
1919
/// Start from the height-zero genesis block.
20-
///
21-
/// Genesis is emitted to the application unless durable metadata shows it
22-
/// was already acknowledged.
2320
Genesis(B),
24-
/// Start from a finalized commitment, delivering the floor block to the
25-
/// application if it has not already been acknowledged.
21+
/// Start from a finalized commitment.
2622
Floor(Finalization<S, C>),
2723
}
2824

consensus/src/marshal/core/actor.rs

Lines changed: 15 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::{
44
delivery::PendingVerification,
55
floor::Floor,
66
mailbox::{CommitmentFallback, Mailbox, Message},
7+
stream::Stream,
78
subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
89
variant::OptionalBuffer,
910
Buffer, Variant,
@@ -39,23 +40,18 @@ use commonware_runtime::{
3940
};
4041
use commonware_storage::{
4142
archive::Identifier as ArchiveID,
42-
metadata::{self, Metadata},
4343
};
4444
use commonware_utils::{
4545
acknowledgement::Exact,
4646
channel::{fallible::OneshotExt, oneshot},
4747
futures::AbortablePool,
48-
sequence::U64,
4948
Acknowledgement, BoxedError,
5049
};
5150
use futures::{future::join_all, try_join};
5251
use rand_core::CryptoRngCore;
5352
use std::{collections::BTreeMap, future::Future, num::NonZeroUsize, sync::Arc};
5453
use tracing::{debug, warn};
5554

56-
/// The key used to store the last processed height in the metadata store.
57-
const LATEST_KEY: U64 = U64::new(0xFF);
58-
5955
// Resolver request keys are expressed in the variant commitment type, which
6056
// may differ from the block digest for coded variants.
6157
type ResolverRequestFor<V> = Key<<V as Variant>::Commitment>;
@@ -68,47 +64,6 @@ struct ResolverDelivery<V: Variant> {
6864
response: oneshot::Sender<bool>,
6965
}
7066

71-
/// Last block acknowledged by the application.
72-
#[derive(Clone, Copy)]
73-
enum ApplicationFloor {
74-
BeforeGenesis,
75-
Acknowledged(Height),
76-
}
77-
78-
impl ApplicationFloor {
79-
const fn from_metadata(height: Option<Height>) -> Self {
80-
match height {
81-
Some(height) => Self::Acknowledged(height),
82-
None => Self::BeforeGenesis,
83-
}
84-
}
85-
86-
const fn processed_height(self) -> Height {
87-
match self {
88-
Self::BeforeGenesis => Height::zero(),
89-
Self::Acknowledged(height) => height,
90-
}
91-
}
92-
93-
const fn round_restore_height(self) -> Height {
94-
match self {
95-
Self::BeforeGenesis => Height::zero(),
96-
Self::Acknowledged(height) => height.next(),
97-
}
98-
}
99-
100-
const fn next_dispatch_height(self) -> Height {
101-
match self {
102-
Self::BeforeGenesis => Height::zero(),
103-
Self::Acknowledged(height) => height.next(),
104-
}
105-
}
106-
107-
const fn acknowledge(&mut self, height: Height) {
108-
*self = Self::Acknowledged(height);
109-
}
110-
}
111-
11267
/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
11368
/// receiving notarizations and finalizations from consensus, and reconstructing a total order
11469
/// of blocks.
@@ -163,7 +118,7 @@ where
163118
// Current processed floor and any pending floor update
164119
floor: Floor<P::Scheme, V::Commitment>,
165120
// Application delivery cursor
166-
application_floor: ApplicationFloor,
121+
stream: Stream<E>,
167122
// Pending application acknowledgements
168123
pending_acks: PendingAcks<V, A>,
169124
// Highest known finalized height
@@ -174,8 +129,6 @@ where
174129
// ---------- Storage ----------
175130
// Prunable cache
176131
cache: cache::Manager<E, V, P::Scheme>,
177-
// Metadata tracking application progress
178-
application_metadata: Metadata<E, U64, Height>,
179132
// Finalizations stored by height
180133
finalizations_by_height: FC,
181134
// Finalized blocks stored by height
@@ -226,19 +179,14 @@ where
226179
)
227180
.await;
228181

229-
// Initialize metadata tracking application progress
230-
let application_metadata = Metadata::init(
182+
let stream = Stream::new(
231183
context.child("application_metadata"),
232-
metadata::Config {
233-
partition: format!("{}-application-metadata", config.partition_prefix),
234-
codec_config: (),
235-
},
184+
&config.partition_prefix,
236185
)
237-
.await
238-
.expect("failed to initialize application metadata");
239-
let application_floor =
240-
ApplicationFloor::from_metadata(application_metadata.get(&LATEST_KEY).copied());
241-
let last_processed_height = application_floor.processed_height();
186+
.await;
187+
let last_processed_height = stream
188+
.processed_height()
189+
.unwrap_or_else(Height::zero);
242190

243191
// Genesis is a local anchor. A floor finalization is verified and
244192
// resolved after `run` receives the resolver and buffer.
@@ -260,7 +208,7 @@ where
260208
// one height ahead restores the anchor's round across the crash window.
261209
let last_processed_round = Self::latest_processed_round(
262210
&finalizations_by_height,
263-
application_floor.round_restore_height(),
211+
stream.next_height(),
264212
)
265213
.await;
266214

@@ -289,12 +237,11 @@ where
289237
strategy: config.strategy,
290238
last_proposed_block: None,
291239
floor,
292-
application_floor,
240+
stream,
293241
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
294242
tip: Height::zero(),
295243
block_subscriptions: Subscriptions::new(),
296244
cache,
297-
application_metadata,
298245
finalizations_by_height,
299246
finalized_blocks,
300247
finalized_height,
@@ -532,7 +479,7 @@ where
532479
};
533480

534481
// Persist buffered progress updates once after draining all ready acks.
535-
self.application_metadata
482+
self.stream
536483
.sync()
537484
.await
538485
.expect("failed to sync application progress");
@@ -1196,7 +1143,7 @@ where
11961143
self.update_processed_height(dispatch_floor, resolver);
11971144
self.update_processed_round_floor(dispatch_floor, round, resolver)
11981145
.await;
1199-
self.application_metadata
1146+
self.stream
12001147
.sync()
12011148
.await
12021149
.expect("failed to sync floor metadata");
@@ -1617,7 +1564,7 @@ where
16171564
///
16181565
/// Iteration M (ack handler, M > N):
16191566
/// ack handler -> update_processed_height -> metadata buffered
1620-
/// application_metadata.sync -> metadata durable
1567+
/// stream.sync -> metadata durable
16211568
/// ```
16221569
async fn try_dispatch_blocks(
16231570
&mut self,
@@ -1631,7 +1578,7 @@ where
16311578
while self.pending_acks.has_capacity() {
16321579
let next_height = self
16331580
.pending_acks
1634-
.next_dispatch_height(self.application_floor.next_dispatch_height());
1581+
.next_dispatch_height(self.stream.next_height());
16351582
let Some(block) = self.get_finalized_block(next_height).await else {
16361583
return;
16371584
};
@@ -2087,8 +2034,7 @@ where
20872034
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
20882035
>,
20892036
) {
2090-
self.application_floor.acknowledge(height);
2091-
self.application_metadata.put(LATEST_KEY, height);
2037+
self.stream.acknowledge(height);
20922038
self.floor.set_processed_height(height);
20932039
let _ = self
20942040
.processed_height

consensus/src/marshal/core/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ mod acks;
4848
pub(crate) mod cache;
4949
mod delivery;
5050
mod floor;
51+
mod stream;
5152

5253
mod mailbox;
5354
pub use mailbox::{CommitmentFallback, DigestFallback, Mailbox};
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use crate::types::Height;
2+
use commonware_storage::{
3+
metadata::{self, Metadata},
4+
Context,
5+
};
6+
use commonware_utils::sequence::U64;
7+
8+
/// The key used to store the last processed height in the metadata store.
9+
const LATEST_KEY: U64 = U64::new(0xFF);
10+
11+
/// Last block acknowledged by the application.
12+
#[derive(Clone, Copy)]
13+
enum State {
14+
Unprocessed,
15+
Processed(Height),
16+
}
17+
18+
impl State {
19+
const fn new(processed_height: Option<Height>) -> Self {
20+
match processed_height {
21+
Some(height) => Self::Processed(height),
22+
None => Self::Unprocessed,
23+
}
24+
}
25+
26+
const fn processed_height(self) -> Option<Height> {
27+
match self {
28+
Self::Unprocessed => None,
29+
Self::Processed(height) => Some(height),
30+
}
31+
}
32+
33+
const fn next_height(self) -> Height {
34+
match self {
35+
Self::Unprocessed => Height::zero(),
36+
Self::Processed(height) => height.next(),
37+
}
38+
}
39+
40+
const fn acknowledge(&mut self, height: Height) {
41+
*self = Self::Processed(height);
42+
}
43+
}
44+
45+
/// Application delivery stream progress and durable metadata.
46+
pub(super) struct Stream<E: Context> {
47+
metadata: Metadata<E, U64, Height>,
48+
state: State,
49+
}
50+
51+
impl<E: Context> Stream<E> {
52+
pub(super) async fn new(context: E, partition_prefix: &str) -> Self {
53+
let metadata = Metadata::init(
54+
context,
55+
metadata::Config {
56+
partition: format!("{partition_prefix}-application-metadata"),
57+
codec_config: (),
58+
},
59+
)
60+
.await
61+
.expect("failed to initialize application metadata");
62+
let state = State::new(metadata.get(&LATEST_KEY).copied());
63+
Self { metadata, state }
64+
}
65+
66+
pub(super) const fn processed_height(&self) -> Option<Height> {
67+
self.state.processed_height()
68+
}
69+
70+
pub(super) const fn next_height(&self) -> Height {
71+
self.state.next_height()
72+
}
73+
74+
pub(super) fn acknowledge(&mut self, height: Height) {
75+
self.state.acknowledge(height);
76+
self.metadata.put(LATEST_KEY, height);
77+
}
78+
79+
pub(super) async fn sync(&self) -> Result<(), metadata::Error> {
80+
self.metadata.sync().await
81+
}
82+
}

0 commit comments

Comments
 (0)