Skip to content

Commit b36ba66

Browse files
patrick-ogradyBrendanChoudanlaineroberto-bayardo
authored
[storage] Introduce Codec for journal::variable and archive (#811)
Co-authored-by: Brendan Chou <3680392+BrendanChou@users.noreply.github.com> Co-authored-by: Dan Laine <dan@commonware.xyz> Co-authored-by: Roberto Bayardo <roberto@commonware.xyz>
1 parent bdc740c commit b36ba66

22 files changed

Lines changed: 711 additions & 1056 deletions

File tree

consensus/src/ordered_broadcast/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,7 @@ pub struct Config<
7575

7676
/// Upon replaying a journal, the number of entries to replay concurrently.
7777
pub journal_replay_concurrency: usize,
78+
79+
/// Compression level for the journal.
80+
pub journal_compression: Option<u8>,
7881
}

consensus/src/ordered_broadcast/engine.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,11 @@ pub struct Engine<
142142
// The rest of the name is the hex-encoded public keys of the relevant sequencer.
143143
journal_name_prefix: String,
144144

145+
// Compression level for the journal.
146+
journal_compression: Option<u8>,
147+
145148
// A map of sequencer public keys to their journals.
146-
journals: BTreeMap<C::PublicKey, Journal<E>>,
149+
journals: BTreeMap<C::PublicKey, Journal<E, (), Node<C, D>>>,
147150

148151
////////////////////////////////////////
149152
// State
@@ -227,6 +230,7 @@ impl<
227230
journal_heights_per_section: cfg.journal_heights_per_section,
228231
journal_replay_concurrency: cfg.journal_replay_concurrency,
229232
journal_name_prefix: cfg.journal_name_prefix,
233+
journal_compression: cfg.journal_compression,
230234
journals: BTreeMap::new(),
231235
tip_manager: TipManager::<C, D>::new(),
232236
ack_manager: AckManager::<C::PublicKey, D>::new(),
@@ -603,7 +607,7 @@ impl<
603607
// Append to journal if the `Node` is new, making sure to sync the journal
604608
// to prevent sending two conflicting chunks to the automaton, even if
605609
// the node crashes and restarts.
606-
self.journal_append(node).await;
610+
self.journal_append(node.clone()).await;
607611
self.journal_sync(&node.chunk.sequencer, node.chunk.height)
608612
.await;
609613
}
@@ -952,18 +956,21 @@ impl<
952956
// Initialize journal
953957
let cfg = journal::variable::Config {
954958
partition: format!("{}{}", &self.journal_name_prefix, sequencer),
959+
compression: self.journal_compression,
960+
codec_config: (),
955961
};
956-
let mut journal = Journal::init(self.context.clone(), cfg)
957-
.await
958-
.expect("unable to init journal");
962+
let mut journal =
963+
Journal::<_, _, Node<C, D>>::init(self.context.with_label("journal"), cfg)
964+
.await
965+
.expect("unable to init journal");
959966

960967
// Replay journal
961968
{
962969
debug!(?sequencer, "journal replay begin");
963970

964971
// Prepare the stream
965972
let stream = journal
966-
.replay(self.journal_replay_concurrency, None)
973+
.replay(self.journal_replay_concurrency)
967974
.await
968975
.expect("unable to replay journal");
969976
pin_mut!(stream);
@@ -973,9 +980,8 @@ impl<
973980
let mut tip: Option<Node<C, D>> = None;
974981
let mut num_items = 0;
975982
while let Some(msg) = stream.next().await {
983+
let (_, _, _, node) = msg.expect("unable to read from journal");
976984
num_items += 1;
977-
let (_, _, _, msg) = msg.expect("unable to decode journal message");
978-
let node = Node::decode(msg).expect("journal message is unexpected format");
979985
let height = node.chunk.height;
980986
match tip {
981987
None => {
@@ -1007,12 +1013,12 @@ impl<
10071013
///
10081014
/// To prevent ever writing two conflicting `Chunk`s at the same height,
10091015
/// the journal must already be open and replayed.
1010-
async fn journal_append(&mut self, node: &Node<C, D>) {
1016+
async fn journal_append(&mut self, node: Node<C, D>) {
10111017
let section = self.get_journal_section(node.chunk.height);
10121018
self.journals
10131019
.get_mut(&node.chunk.sequencer)
10141020
.expect("journal does not exist")
1015-
.append(section, node.encode().into())
1021+
.append(section, node)
10161022
.await
10171023
.expect("unable to append to journal");
10181024
}

consensus/src/ordered_broadcast/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ mod tests {
230230
journal_heights_per_section: 10,
231231
journal_replay_concurrency: 1,
232232
journal_name_prefix: format!("ordered-broadcast-seq/{}/", validator),
233+
journal_compression: Some(3),
233234
},
234235
);
235236

consensus/src/simplex/actors/voter/actor.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use commonware_cryptography::{Digest, Scheme};
1616
use commonware_macros::select;
1717
use commonware_p2p::{Receiver, Recipients, Sender};
1818
use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
19-
use commonware_storage::journal::variable::Journal;
19+
use commonware_storage::journal::variable::{Config as JConfig, Journal};
2020
use commonware_utils::quorum;
2121
use futures::{
2222
channel::{mpsc, oneshot},
@@ -322,8 +322,10 @@ pub struct Actor<
322322
reporter: F,
323323
supervisor: S,
324324

325+
partition: String,
326+
compression: Option<u8>,
325327
replay_concurrency: usize,
326-
journal: Option<Journal<E>>,
328+
journal: Option<Journal<E, usize, Voter<C::Signature, D>>>,
327329

328330
genesis: Option<D>,
329331

@@ -361,11 +363,7 @@ impl<
361363
S: Supervisor<Index = View, PublicKey = C::PublicKey>,
362364
> Actor<E, C, D, A, R, F, S>
363365
{
364-
pub fn new(
365-
context: E,
366-
journal: Journal<E>,
367-
cfg: Config<C, D, A, R, F, S>,
368-
) -> (Self, Mailbox<C::Signature, D>) {
366+
pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> (Self, Mailbox<C::Signature, D>) {
369367
// Assert correctness of timeouts
370368
if cfg.leader_timeout > cfg.notarization_timeout {
371369
panic!("leader timeout must be less than or equal to notarization timeout");
@@ -415,8 +413,10 @@ impl<
415413
reporter: cfg.reporter,
416414
supervisor: cfg.supervisor,
417415

416+
partition: cfg.partition,
417+
compression: cfg.compression,
418418
replay_concurrency: cfg.replay_concurrency,
419-
journal: Some(journal),
419+
journal: None,
420420

421421
genesis: None,
422422

@@ -708,9 +708,7 @@ impl<
708708
});
709709

710710
// Handle nullify
711-
let msg = Voter::Nullify::<C::Signature, D>(nullify.clone())
712-
.encode()
713-
.into();
711+
let msg = Voter::Nullify(nullify.clone());
714712
if round.add_verified_nullify(nullify).await && self.journal.is_some() {
715713
self.journal
716714
.as_mut()
@@ -1021,9 +1019,7 @@ impl<
10211019
});
10221020

10231021
// Handle notarize
1024-
let msg = Voter::Notarize::<C::Signature, D>(notarize.clone())
1025-
.encode()
1026-
.into();
1022+
let msg = Voter::Notarize(notarize.clone());
10271023
if round.add_verified_notarize(notarize).await && self.journal.is_some() {
10281024
self.journal
10291025
.as_mut()
@@ -1075,9 +1071,7 @@ impl<
10751071
});
10761072
for signature in &notarization.signatures {
10771073
let notarize = Notarize::new(notarization.proposal.clone(), signature.clone());
1078-
let msg = Voter::Notarize::<C::Signature, D>(notarize.clone())
1079-
.encode()
1080-
.into();
1074+
let msg = Voter::Notarize::<C::Signature, D>(notarize.clone());
10811075
if round.add_verified_notarize(notarize).await && self.journal.is_some() {
10821076
self.journal
10831077
.as_mut()
@@ -1146,9 +1140,7 @@ impl<
11461140
});
11471141
for signature in &nullification.signatures {
11481142
let nullify = Nullify::new(view, signature.clone());
1149-
let msg = Voter::Nullify::<C::Signature, D>(nullify.clone())
1150-
.encode()
1151-
.into();
1143+
let msg = Voter::Nullify(nullify.clone());
11521144
if round.add_verified_nullify(nullify).await && self.journal.is_some() {
11531145
self.journal
11541146
.as_mut()
@@ -1205,9 +1197,7 @@ impl<
12051197
});
12061198

12071199
// Handle finalize
1208-
let msg = Voter::Finalize::<C::Signature, D>(finalize.clone())
1209-
.encode()
1210-
.into();
1200+
let msg = Voter::Finalize(finalize.clone());
12111201
if round.add_verified_finalize(finalize).await && self.journal.is_some() {
12121202
self.journal
12131203
.as_mut()
@@ -1259,9 +1249,7 @@ impl<
12591249
});
12601250
for signature in &finalization.signatures {
12611251
let finalize = Finalize::new(finalization.proposal.clone(), signature.clone());
1262-
let msg = Voter::Finalize::<C::Signature, D>(finalize.clone())
1263-
.encode()
1264-
.into();
1252+
let msg = Voter::Finalize(finalize.clone());
12651253
if round.add_verified_finalize(finalize).await && self.journal.is_some() {
12661254
self.journal
12671255
.as_mut()
@@ -1685,21 +1673,28 @@ impl<
16851673
// We start on view 1 because the genesis container occupies view 0/height 0.
16861674
self.enter_view(1);
16871675

1676+
// Initialize journal
1677+
let mut journal = Journal::<_, _, Voter<C::Signature, D>>::init(
1678+
self.context.with_label("journal"),
1679+
JConfig {
1680+
partition: self.partition.clone(),
1681+
compression: self.compression,
1682+
codec_config: usize::MAX, // anything we read from journal is already verified
1683+
},
1684+
)
1685+
.await
1686+
.expect("unable to open journal");
1687+
16881688
// Rebuild from journal
16891689
let mut observed_view = 1;
1690-
let mut journal = self.journal.take().expect("missing journal");
16911690
{
16921691
let stream = journal
1693-
.replay(self.replay_concurrency, None)
1692+
.replay(self.replay_concurrency)
16941693
.await
16951694
.expect("unable to replay journal");
16961695
pin_mut!(stream);
16971696
while let Some(msg) = stream.next().await {
16981697
let (_, _, _, msg) = msg.expect("unable to decode journal message");
1699-
// We must wrap the message in Voter so we decode the right type of message (otherwise,
1700-
// we can parse a finalize as a notarize)
1701-
let msg = Voter::decode_cfg(msg, &self.max_participants)
1702-
.expect("journal message is unexpected format");
17031698
let view = msg.view();
17041699
let public_key_index = self
17051700
.supervisor

consensus/src/simplex/actors/voter/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub struct Config<
2323
pub reporter: F,
2424
pub supervisor: S,
2525

26+
pub partition: String,
27+
pub compression: Option<u8>,
2628
pub namespace: Vec<u8>,
2729
pub mailbox_size: usize,
2830
pub leader_timeout: Duration,
@@ -50,7 +52,6 @@ mod tests {
5052
Receiver, Recipients, Sender,
5153
};
5254
use commonware_runtime::{deterministic, Metrics, Runner, Spawner};
53-
use commonware_storage::journal::variable::{Config as JConfig, Journal};
5455
use commonware_utils::quorum;
5556
use futures::{channel::mpsc, StreamExt};
5657
use std::time::Duration;
@@ -106,19 +107,14 @@ mod tests {
106107
application_cfg,
107108
);
108109
actor.start();
109-
let cfg = JConfig {
110-
partition: "test".to_string(),
111-
};
112-
let journal = Journal::init(context.with_label("journal"), cfg)
113-
.await
114-
.expect("unable to create journal");
115-
116110
let cfg = Config {
117111
crypto: scheme,
118112
automaton: application.clone(),
119113
relay: application.clone(),
120114
reporter: supervisor.clone(),
121115
supervisor,
116+
partition: "test".to_string(),
117+
compression: Some(3),
122118
namespace: namespace.clone(),
123119
mailbox_size: 10,
124120
leader_timeout: Duration::from_secs(5),
@@ -129,7 +125,7 @@ mod tests {
129125
skip_timeout: 10,
130126
replay_concurrency: 1,
131127
};
132-
let (actor, mut mailbox) = Actor::new(context.clone(), journal, cfg);
128+
let (actor, mut mailbox) = Actor::new(context.clone(), cfg);
133129

134130
// Create a dummy backfiller mailbox
135131
let (backfiller_sender, mut backfiller_receiver) = mpsc::channel(1);

consensus/src/simplex/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1+
use super::types::{Activity, Context, View};
12
use crate::{Automaton, Relay, Reporter, Supervisor};
23
use commonware_cryptography::{Digest, Scheme};
34
use governor::Quota;
45
use std::time::Duration;
56

6-
use super::types::{Activity, Context, View};
7-
87
/// Configuration for the consensus engine.
98
pub struct Config<
109
C: Scheme,
@@ -29,6 +28,12 @@ pub struct Config<
2928
/// Supervisor for the consensus engine.
3029
pub supervisor: S,
3130

31+
/// Partition for consensus engine storage.
32+
pub partition: String,
33+
34+
/// Compression level for consensus engine storage.
35+
pub compression: Option<u8>,
36+
3237
/// Maximum number of messages to buffer on channels inside the consensus
3338
/// engine before blocking.
3439
pub mailbox_size: usize,

consensus/src/simplex/engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use commonware_cryptography::{Digest, Scheme};
88
use commonware_macros::select;
99
use commonware_p2p::{Receiver, Sender};
1010
use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
11-
use commonware_storage::journal::variable::Journal;
1211
use governor::clock::Clock as GClock;
1312
use rand::{CryptoRng, Rng};
1413
use tracing::debug;
@@ -42,20 +41,21 @@ impl<
4241
> Engine<E, C, D, A, R, F, S>
4342
{
4443
/// Create a new `simplex` consensus engine.
45-
pub fn new(context: E, journal: Journal<E>, cfg: Config<C, D, A, R, F, S>) -> Self {
44+
pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> Self {
4645
// Ensure configuration is valid
4746
cfg.assert();
4847

4948
// Create voter
5049
let (voter, voter_mailbox) = voter::Actor::new(
5150
context.with_label("voter"),
52-
journal,
5351
voter::Config {
5452
crypto: cfg.crypto.clone(),
5553
automaton: cfg.automaton,
5654
relay: cfg.relay,
5755
reporter: cfg.reporter,
5856
supervisor: cfg.supervisor.clone(),
57+
partition: cfg.partition,
58+
compression: cfg.compression,
5959
mailbox_size: cfg.mailbox_size,
6060
namespace: cfg.namespace.clone(),
6161
max_participants: cfg.max_participants,

0 commit comments

Comments
 (0)