Skip to content

Commit 013ed34

Browse files
[marshal] Use a Single Resolver (#1340)
1 parent 29f048c commit 013ed34

10 files changed

Lines changed: 651 additions & 247 deletions

File tree

consensus/src/marshal/actor.rs

Lines changed: 127 additions & 149 deletions
Large diffs are not rendered by default.

consensus/src/marshal/ingress/handler.rs

Lines changed: 412 additions & 13 deletions
Large diffs are not rendered by default.

consensus/src/marshal/mod.rs

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ mod tests {
7676
};
7777
use crate::{
7878
threshold_simplex::types::{
79-
finalize_namespace, notarize_namespace, Activity, Finalization, Notarization, Proposal,
79+
finalize_namespace, notarize_namespace, seed_namespace, view_message, Activity,
80+
Finalization, Notarization, Proposal,
8081
},
8182
Block as _, Reporter,
8283
};
@@ -96,6 +97,7 @@ mod tests {
9697
sha256::{self, Digest as Sha256Digest},
9798
Digestible, PrivateKeyExt as _, Signer as _,
9899
};
100+
use commonware_macros::test_traced;
99101
use commonware_p2p::simulated::{self, Link, Network, Oracle};
100102
use commonware_resolver::p2p as resolver;
101103
use commonware_runtime::{deterministic, Clock, Metrics, Runner};
@@ -111,28 +113,47 @@ mod tests {
111113
type E = PrivateKey;
112114

113115
const NAMESPACE: &[u8] = b"test";
114-
const NETWORK_SPEED: Duration = Duration::from_millis(100);
115-
const SUCCESS_RATE: f64 = 1.0;
116-
const JITTER: f64 = NETWORK_SPEED.as_millis() as f64 / 2.0;
117116
const NUM_VALIDATORS: u32 = 4;
118117
const QUORUM: u32 = 3;
119118
const NUM_BLOCKS: u64 = 100;
120119

121-
#[test]
122-
fn test_basic_finalization() {
120+
#[test_traced("WARN")]
121+
fn test_finalize_good_links() {
122+
let link = Link {
123+
latency: 100.0,
124+
jitter: 1.0,
125+
success_rate: 1.0,
126+
};
123127
for seed in 0..5 {
124-
let result1 = basic_finalization(seed);
125-
let result2 = basic_finalization(seed);
128+
let result1 = finalize(seed, link.clone());
129+
let result2 = finalize(seed, link.clone());
130+
126131
// Ensure determinism
127132
assert_eq!(result1, result2);
128133
}
129134
}
130135

131-
fn basic_finalization(seed: u64) -> String {
136+
#[test_traced("WARN")]
137+
fn test_finalize_bad_links() {
138+
let link = Link {
139+
latency: 200.0,
140+
jitter: 50.0,
141+
success_rate: 0.7,
142+
};
143+
for seed in 0..5 {
144+
let result1 = finalize(seed, link.clone());
145+
let result2 = finalize(seed, link.clone());
146+
147+
// Ensure determinism
148+
assert_eq!(result1, result2);
149+
}
150+
}
151+
152+
fn finalize(seed: u64, link: Link) -> String {
132153
let runner = deterministic::Runner::new(
133154
deterministic::Config::new()
134155
.with_seed(seed)
135-
.with_timeout(Some(Duration::from_secs(60))),
156+
.with_timeout(Some(Duration::from_secs(300))),
136157
);
137158
runner.start(|mut context| async move {
138159
// Initialize network
@@ -183,11 +204,6 @@ mod tests {
183204
}
184205

185206
// Add links between all peers
186-
let link = Link {
187-
latency: NETWORK_SPEED.as_millis() as f64,
188-
jitter: JITTER,
189-
success_rate: SUCCESS_RATE,
190-
};
191207
for p1 in peers.iter() {
192208
for p2 in peers.iter() {
193209
if p2 == p1 {
@@ -224,7 +240,9 @@ mod tests {
224240

225241
// Wait for the block to be broadcast, but due to jitter, we may or may not receive
226242
// the block before continuing.
227-
context.sleep(NETWORK_SPEED).await;
243+
context
244+
.sleep(Duration::from_millis(link.latency as u64))
245+
.await;
228246

229247
// Notarize block by the validator that broadcasted it
230248
let proposal = Proposal {
@@ -247,18 +265,26 @@ mod tests {
247265
}
248266
}
249267

250-
// Wait for things to complete
251-
context.sleep(Duration::from_secs(30)).await;
252-
253268
// Check that all applications received all blocks.
254-
assert_eq!(applications.len(), NUM_VALIDATORS as usize);
255-
for (i, app) in &applications {
256-
assert!(
257-
app.blocks().len() == NUM_BLOCKS as usize,
258-
"validator {i} has {:?} blocks",
259-
app.blocks().keys().collect::<Vec<_>>(),
260-
);
269+
let mut finished = false;
270+
while !finished {
271+
// Avoid a busy loop
272+
context.sleep(Duration::from_secs(1)).await;
273+
274+
// If not all validators have finished, try again
275+
if applications.len() != NUM_VALIDATORS as usize {
276+
continue;
277+
}
278+
finished = true;
279+
for app in applications.values() {
280+
if app.blocks().len() != NUM_BLOCKS as usize {
281+
finished = false;
282+
break;
283+
}
284+
}
261285
}
286+
287+
// Return state
262288
context.auditor().state()
263289
})
264290
}
@@ -278,7 +304,7 @@ mod tests {
278304
identity,
279305
coordinator,
280306
mailbox_size: 100,
281-
backfill_quota: Quota::per_second(NonZeroU32::new(1).unwrap()),
307+
backfill_quota: Quota::per_second(NonZeroU32::new(5).unwrap()),
282308
namespace: NAMESPACE.to_vec(),
283309
view_retention_timeout: 10,
284310
max_repair: 10,
@@ -311,51 +337,69 @@ mod tests {
311337
broadcast_engine.start(network);
312338

313339
// Start the actor
314-
let backfill_by_digest = oracle.register(secret.public_key(), 2).await.unwrap();
315-
let backfill_by_height = oracle.register(secret.public_key(), 3).await.unwrap();
316-
let backfill_by_view = oracle.register(secret.public_key(), 4).await.unwrap();
317-
actor.start(
318-
application.clone(),
319-
buffer,
320-
backfill_by_digest,
321-
backfill_by_height,
322-
backfill_by_view,
323-
);
340+
let backfill = oracle.register(secret.public_key(), 2).await.unwrap();
341+
actor.start(application.clone(), buffer, backfill);
324342

325343
(application, mailbox)
326344
}
327345

328346
fn make_finalization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Finalization<V, D> {
329347
let proposal_msg = proposal.encode();
330-
let partials: Vec<_> = shares
348+
349+
// Generate proposal signature
350+
let proposal_partials: Vec<_> = shares
331351
.iter()
332352
.take(quorum as usize)
333353
.map(|s| {
334354
partial_sign_message::<V>(s, Some(&finalize_namespace(NAMESPACE)), &proposal_msg)
335355
})
336356
.collect();
337-
let signature = threshold_signature_recover::<V, _>(quorum, &partials).unwrap();
357+
let proposal_signature =
358+
threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
359+
360+
// Generate seed signature (for the view number)
361+
let seed_msg = view_message(proposal.view);
362+
let seed_partials: Vec<_> = shares
363+
.iter()
364+
.take(quorum as usize)
365+
.map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
366+
.collect();
367+
let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
368+
338369
Finalization {
339370
proposal,
340-
proposal_signature: signature,
341-
seed_signature: signature,
371+
proposal_signature,
372+
seed_signature,
342373
}
343374
}
344375

345376
fn make_notarization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Notarization<V, D> {
346377
let proposal_msg = proposal.encode();
347-
let partials: Vec<_> = shares
378+
379+
// Generate proposal signature
380+
let proposal_partials: Vec<_> = shares
348381
.iter()
349382
.take(quorum as usize)
350383
.map(|s| {
351384
partial_sign_message::<V>(s, Some(&notarize_namespace(NAMESPACE)), &proposal_msg)
352385
})
353386
.collect();
354-
let signature = threshold_signature_recover::<V, _>(quorum, &partials).unwrap();
387+
let proposal_signature =
388+
threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
389+
390+
// Generate seed signature (for the view number)
391+
let seed_msg = view_message(proposal.view);
392+
let seed_partials: Vec<_> = shares
393+
.iter()
394+
.take(quorum as usize)
395+
.map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
396+
.collect();
397+
let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
398+
355399
Notarization {
356400
proposal,
357-
proposal_signature: signature,
358-
seed_signature: signature,
401+
proposal_signature,
402+
seed_signature,
359403
}
360404
}
361405
}

resolver/src/p2p/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ use crate::{
55
use bytes::Bytes;
66
use commonware_cryptography::PublicKey;
77
use commonware_p2p::utils::requester;
8-
use commonware_utils::Array;
8+
use commonware_utils::Span;
99
use std::time::Duration;
1010

1111
/// Configuration for the peer actor.
1212
pub struct Config<
1313
P: PublicKey,
1414
D: Coordinator<PublicKey = P>,
15-
Key: Array,
15+
Key: Span,
1616
Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
1717
Pro: Producer<Key = Key>,
1818
> {

resolver/src/p2p/engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use commonware_runtime::{
1919
},
2020
Clock, Handle, Metrics, Spawner,
2121
};
22-
use commonware_utils::{futures::Pool as FuturesPool, Array};
22+
use commonware_utils::{futures::Pool as FuturesPool, Span};
2323
use futures::{
2424
channel::{mpsc, oneshot},
2525
future::{self, Either},
@@ -43,7 +43,7 @@ pub struct Engine<
4343
E: Clock + GClock + Spawner + Rng + Metrics,
4444
P: PublicKey,
4545
D: Coordinator<PublicKey = P>,
46-
Key: Array,
46+
Key: Span,
4747
Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
4848
Pro: Producer<Key = Key>,
4949
NetS: Sender<PublicKey = P>,
@@ -94,7 +94,7 @@ impl<
9494
E: Clock + GClock + Spawner + Rng + Metrics,
9595
P: PublicKey,
9696
D: Coordinator<PublicKey = P>,
97-
Key: Array,
97+
Key: Span,
9898
Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
9999
Pro: Producer<Key = Key>,
100100
NetS: Sender<PublicKey = P>,

resolver/src/p2p/fetcher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use commonware_p2p::{
99
Recipients, Sender,
1010
};
1111
use commonware_runtime::{Clock, Metrics};
12-
use commonware_utils::{Array, PrioritySet};
12+
use commonware_utils::{PrioritySet, Span};
1313
use governor::clock::Clock as GClock;
1414
use rand::Rng;
1515
use std::{
@@ -39,7 +39,7 @@ enum SendError<S: Sender> {
3939
pub struct Fetcher<
4040
E: Clock + GClock + Rng + Metrics,
4141
P: PublicKey,
42-
Key: Array,
42+
Key: Span,
4343
NetS: Sender<PublicKey = P>,
4444
> {
4545
context: E,
@@ -64,7 +64,7 @@ pub struct Fetcher<
6464
_s: PhantomData<NetS>,
6565
}
6666

67-
impl<E: Clock + GClock + Rng + Metrics, P: PublicKey, Key: Array, NetS: Sender<PublicKey = P>>
67+
impl<E: Clock + GClock + Rng + Metrics, P: PublicKey, Key: Span, NetS: Sender<PublicKey = P>>
6868
Fetcher<E, P, Key, NetS>
6969
{
7070
/// Creates a new fetcher.

resolver/src/p2p/ingress.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::Resolver;
2-
use commonware_utils::Array;
2+
use commonware_utils::Span;
33
use futures::{channel::mpsc, SinkExt};
44

55
type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
@@ -33,7 +33,7 @@ impl<K> Mailbox<K> {
3333
}
3434
}
3535

36-
impl<K: Array> Resolver for Mailbox<K> {
36+
impl<K: Span> Resolver for Mailbox<K> {
3737
type Key = K;
3838

3939
/// Send a fetch request to the peer actor.

resolver/src/p2p/mocks/key.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bytes::{Buf, BufMut};
22
use commonware_codec::{Error as CodecError, FixedSize, Read, ReadExt, Write};
3-
use commonware_utils::{Array, Span};
4-
use std::{fmt, ops::Deref};
3+
use commonware_utils::Span;
4+
use std::fmt;
55

66
/// A key that can be used for testing
77
#[derive(Clone, Default, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
@@ -13,19 +13,6 @@ impl fmt::Display for Key {
1313
}
1414
}
1515

16-
impl AsRef<[u8]> for Key {
17-
fn as_ref(&self) -> &[u8] {
18-
std::slice::from_ref(&self.0)
19-
}
20-
}
21-
22-
impl Deref for Key {
23-
type Target = [u8];
24-
fn deref(&self) -> &Self::Target {
25-
std::slice::from_ref(&self.0)
26-
}
27-
}
28-
2916
impl Write for Key {
3017
fn write(&self, buf: &mut impl BufMut) {
3118
self.0.write(buf);
@@ -45,5 +32,3 @@ impl FixedSize for Key {
4532
}
4633

4734
impl Span for Key {}
48-
49-
impl Array for Key {}

0 commit comments

Comments
 (0)