Skip to content

Commit e5215d9

Browse files
[consensus] Remove Coordinator (#1984)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent e15c616 commit e5215d9

43 files changed

Lines changed: 1254 additions & 596 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

broadcast/fuzz/fuzz_targets/broadcast_engine_operations.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ fn fuzz(input: FuzzInput) {
157157
commonware_p2p::simulated::Config {
158158
max_size: 1024 * 1024,
159159
disconnect_on_block: false,
160+
tracked_peer_sets: None,
160161
},
161162
);
162163
network.start();
@@ -171,7 +172,11 @@ fn fuzz(input: FuzzInput) {
171172
peers.push(public_key.clone());
172173

173174
// Create channel
174-
let (sender, receiver) = oracle.register(public_key.clone(), 0).await.unwrap();
175+
let (sender, receiver) = oracle
176+
.control(public_key.clone())
177+
.register(0)
178+
.await
179+
.unwrap();
175180

176181
// Create mailbox
177182
let config = Config {

broadcast/src/buffered/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ mod tests {
7373
commonware_p2p::simulated::Config {
7474
max_size: 1024 * 1024,
7575
disconnect_on_block: true,
76+
tracked_peer_sets: None,
7677
},
7778
);
7879
network.start();
@@ -85,7 +86,7 @@ mod tests {
8586

8687
let mut registrations: Registrations = BTreeMap::new();
8788
for peer in peers.iter() {
88-
let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap();
89+
let (sender, receiver) = oracle.control(peer.clone()).register(0).await.unwrap();
8990
registrations.insert(peer.clone(), (sender, receiver));
9091
}
9192

collector/src/p2p/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,12 @@ mod tests {
8888
(Sender<PublicKey>, Receiver<PublicKey>),
8989
)>,
9090
) {
91-
let (network, mut oracle) = Network::new(
91+
let (network, oracle) = Network::new(
9292
context.with_label("network"),
9393
commonware_p2p::simulated::Config {
9494
max_size: 1024 * 1024,
9595
disconnect_on_block: true,
96+
tracked_peer_sets: None,
9697
},
9798
);
9899
network.start();
@@ -105,8 +106,9 @@ mod tests {
105106

106107
let mut connections = Vec::new();
107108
for peer in &peers {
108-
let (sender1, receiver1) = oracle.register(peer.clone(), 0).await.unwrap();
109-
let (sender2, receiver2) = oracle.register(peer.clone(), 1).await.unwrap();
109+
let mut control = oracle.control(peer.clone());
110+
let (sender1, receiver1) = control.register(0).await.unwrap();
111+
let (sender2, receiver2) = control.register(1).await.unwrap();
110112
connections.push(((sender1, receiver1), (sender2, receiver2)));
111113
}
112114

consensus/src/aggregation/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,11 @@ mod tests {
119119
) -> Registrations<PublicKey> {
120120
let mut registrations = BTreeMap::new();
121121
for participant in participants.iter() {
122-
let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
122+
let (sender, receiver) = oracle
123+
.control(participant.clone())
124+
.register(0)
125+
.await
126+
.unwrap();
123127
registrations.insert(participant.clone(), (sender, receiver));
124128
}
125129
registrations
@@ -161,6 +165,7 @@ mod tests {
161165
commonware_p2p::simulated::Config {
162166
max_size: 1024 * 1024,
163167
disconnect_on_block: true,
168+
tracked_peer_sets: None,
164169
},
165170
);
166171
network.start();

consensus/src/marshal/mod.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ mod tests {
109109
use commonware_p2p::{
110110
simulated::{self, Link, Network, Oracle},
111111
utils::requester,
112+
Manager,
112113
};
113-
use commonware_resolver::p2p;
114114
use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner};
115115
use commonware_utils::{NZUsize, NZU64};
116116
use governor::Quota;
@@ -166,7 +166,6 @@ mod tests {
166166
async fn setup_validator(
167167
context: deterministic::Context,
168168
oracle: &mut Oracle<K>,
169-
coordinator: p2p::mocks::Coordinator<K>,
170169
validator: K,
171170
scheme_provider: P,
172171
) -> (
@@ -196,10 +195,11 @@ mod tests {
196195
};
197196

198197
// Create the resolver
199-
let backfill = oracle.register(validator.clone(), 1).await.unwrap();
198+
let mut control = oracle.control(validator.clone());
199+
let backfill = control.register(1).await.unwrap();
200200
let resolver_cfg = resolver::Config {
201201
public_key: validator.clone(),
202-
coordinator,
202+
manager: oracle.clone(),
203203
mailbox_size: config.mailbox_size,
204204
requester_config: requester::Config {
205205
me: Some(validator.clone()),
@@ -222,7 +222,7 @@ mod tests {
222222
codec_config: (),
223223
};
224224
let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
225-
let network = oracle.register(validator, 2).await.unwrap();
225+
let network = control.register(2).await.unwrap();
226226
broadcast_engine.start(network);
227227

228228
let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
@@ -258,12 +258,16 @@ mod tests {
258258
Notarization::from_notarizes(&schemes[0], &notarizes).unwrap()
259259
}
260260

261-
fn setup_network(context: deterministic::Context) -> Oracle<K> {
261+
fn setup_network(
262+
context: deterministic::Context,
263+
tracked_peer_sets: Option<usize>,
264+
) -> Oracle<K> {
262265
let (network, oracle) = Network::new(
263266
context.with_label("network"),
264267
simulated::Config {
265268
max_size: 1024 * 1024,
266269
disconnect_on_block: true,
270+
tracked_peer_sets,
267271
},
268272
);
269273
network.start();
@@ -313,7 +317,7 @@ mod tests {
313317
.with_timeout(Some(Duration::from_secs(300))),
314318
);
315319
runner.start(|mut context| async move {
316-
let mut oracle = setup_network(context.clone());
320+
let mut oracle = setup_network(context.clone(), Some(3));
317321
let Fixture {
318322
participants,
319323
schemes,
@@ -324,11 +328,12 @@ mod tests {
324328
let mut applications = BTreeMap::new();
325329
let mut actors = Vec::new();
326330

331+
// Register the initial peer set.
332+
oracle.update(0, participants.clone().into()).await;
327333
for (i, validator) in participants.iter().enumerate() {
328334
let (application, actor) = setup_validator(
329335
context.with_label(&format!("validator-{i}")),
330336
&mut oracle,
331-
p2p::mocks::Coordinator::new(participants.clone()),
332337
validator.clone(),
333338
schemes[i].clone().into(),
334339
)
@@ -424,7 +429,7 @@ mod tests {
424429
fn test_subscribe_basic_block_delivery() {
425430
let runner = deterministic::Runner::timed(Duration::from_secs(60));
426431
runner.start(|mut context| async move {
427-
let mut oracle = setup_network(context.clone());
432+
let mut oracle = setup_network(context.clone(), None);
428433
let Fixture {
429434
participants,
430435
schemes,
@@ -436,7 +441,6 @@ mod tests {
436441
let (_application, actor) = setup_validator(
437442
context.with_label(&format!("validator-{i}")),
438443
&mut oracle,
439-
p2p::mocks::Coordinator::new(vec![]),
440444
validator.clone(),
441445
schemes[i].clone().into(),
442446
)
@@ -476,7 +480,7 @@ mod tests {
476480
fn test_subscribe_multiple_subscriptions() {
477481
let runner = deterministic::Runner::timed(Duration::from_secs(60));
478482
runner.start(|mut context| async move {
479-
let mut oracle = setup_network(context.clone());
483+
let mut oracle = setup_network(context.clone(), None);
480484
let Fixture {
481485
participants,
482486
schemes,
@@ -488,7 +492,6 @@ mod tests {
488492
let (_application, actor) = setup_validator(
489493
context.with_label(&format!("validator-{i}")),
490494
&mut oracle,
491-
p2p::mocks::Coordinator::new(participants.clone()),
492495
validator.clone(),
493496
schemes[i].clone().into(),
494497
)
@@ -548,7 +551,7 @@ mod tests {
548551
fn test_subscribe_canceled_subscriptions() {
549552
let runner = deterministic::Runner::timed(Duration::from_secs(60));
550553
runner.start(|mut context| async move {
551-
let mut oracle = setup_network(context.clone());
554+
let mut oracle = setup_network(context.clone(), None);
552555
let Fixture {
553556
participants,
554557
schemes,
@@ -560,7 +563,6 @@ mod tests {
560563
let (_application, actor) = setup_validator(
561564
context.with_label(&format!("validator-{i}")),
562565
&mut oracle,
563-
p2p::mocks::Coordinator::new(participants.clone()),
564566
validator.clone(),
565567
schemes[i].clone().into(),
566568
)
@@ -612,7 +614,7 @@ mod tests {
612614
fn test_subscribe_blocks_from_different_sources() {
613615
let runner = deterministic::Runner::timed(Duration::from_secs(60));
614616
runner.start(|mut context| async move {
615-
let mut oracle = setup_network(context.clone());
617+
let mut oracle = setup_network(context.clone(), None);
616618
let Fixture {
617619
participants,
618620
schemes,
@@ -624,7 +626,6 @@ mod tests {
624626
let (_application, actor) = setup_validator(
625627
context.with_label(&format!("validator-{i}")),
626628
&mut oracle,
627-
p2p::mocks::Coordinator::new(participants.clone()),
628629
validator.clone(),
629630
schemes[i].clone().into(),
630631
)
@@ -714,7 +715,7 @@ mod tests {
714715
fn test_get_info_basic_queries_present_and_missing() {
715716
let runner = deterministic::Runner::timed(Duration::from_secs(60));
716717
runner.start(|mut context| async move {
717-
let mut oracle = setup_network(context.clone());
718+
let mut oracle = setup_network(context.clone(), None);
718719
let Fixture {
719720
participants,
720721
schemes,
@@ -726,7 +727,6 @@ mod tests {
726727
let (_application, mut actor) = setup_validator(
727728
context.with_label("validator-0"),
728729
&mut oracle,
729-
p2p::mocks::Coordinator::new(vec![]),
730730
me,
731731
schemes[0].clone().into(),
732732
)
@@ -775,7 +775,7 @@ mod tests {
775775
fn test_get_info_latest_progression_multiple_finalizations() {
776776
let runner = deterministic::Runner::timed(Duration::from_secs(60));
777777
runner.start(|mut context| async move {
778-
let mut oracle = setup_network(context.clone());
778+
let mut oracle = setup_network(context.clone(), None);
779779
let Fixture {
780780
participants,
781781
schemes,
@@ -787,7 +787,6 @@ mod tests {
787787
let (_application, mut actor) = setup_validator(
788788
context.with_label("validator-0"),
789789
&mut oracle,
790-
p2p::mocks::Coordinator::new(vec![]),
791790
me,
792791
schemes[0].clone().into(),
793792
)
@@ -852,7 +851,7 @@ mod tests {
852851
fn test_get_block_by_height_and_latest() {
853852
let runner = deterministic::Runner::timed(Duration::from_secs(60));
854853
runner.start(|mut context| async move {
855-
let mut oracle = setup_network(context.clone());
854+
let mut oracle = setup_network(context.clone(), None);
856855
let Fixture {
857856
participants,
858857
schemes,
@@ -863,7 +862,6 @@ mod tests {
863862
let (_application, mut actor) = setup_validator(
864863
context.with_label("validator-0"),
865864
&mut oracle,
866-
p2p::mocks::Coordinator::new(vec![]),
867865
me,
868866
schemes[0].clone().into(),
869867
)
@@ -910,7 +908,7 @@ mod tests {
910908
fn test_get_block_by_commitment_from_sources_and_missing() {
911909
let runner = deterministic::Runner::timed(Duration::from_secs(60));
912910
runner.start(|mut context| async move {
913-
let mut oracle = setup_network(context.clone());
911+
let mut oracle = setup_network(context.clone(), None);
914912
let Fixture {
915913
participants,
916914
schemes,
@@ -921,7 +919,6 @@ mod tests {
921919
let (_application, mut actor) = setup_validator(
922920
context.with_label("validator-0"),
923921
&mut oracle,
924-
p2p::mocks::Coordinator::new(participants),
925922
me,
926923
schemes[0].clone().into(),
927924
)
@@ -969,7 +966,7 @@ mod tests {
969966
fn test_get_finalization_by_height() {
970967
let runner = deterministic::Runner::timed(Duration::from_secs(60));
971968
runner.start(|mut context| async move {
972-
let mut oracle = setup_network(context.clone());
969+
let mut oracle = setup_network(context.clone(), None);
973970
let Fixture {
974971
participants,
975972
schemes,
@@ -980,7 +977,6 @@ mod tests {
980977
let (_application, mut actor) = setup_validator(
981978
context.with_label("validator-0"),
982979
&mut oracle,
983-
p2p::mocks::Coordinator::new(vec![]),
984980
me,
985981
schemes[0].clone().into(),
986982
)

consensus/src/marshal/resolver/p2p.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@ use crate::{
55
Block,
66
};
77
use commonware_cryptography::PublicKey;
8-
use commonware_p2p::{utils::requester, Receiver, Sender};
9-
use commonware_resolver::p2p::{self, Coordinator};
8+
use commonware_p2p::{utils::requester, Manager, Receiver, Sender};
9+
use commonware_resolver::p2p;
1010
use commonware_runtime::{Clock, Metrics, Spawner};
1111
use futures::channel::mpsc;
1212
use governor::clock::Clock as GClock;
1313
use rand::Rng;
1414
use std::time::Duration;
1515

1616
/// Configuration for the P2P [Resolver](commonware_resolver::Resolver).
17-
pub struct Config<P: PublicKey, C: Coordinator<PublicKey = P>> {
17+
pub struct Config<P: PublicKey, C: Manager<PublicKey = P>> {
1818
/// The public key to identify this node.
1919
pub public_key: P,
2020

21-
/// The coordinator of peers that can be consulted for fetching data.
22-
pub coordinator: C,
21+
/// The provider of peers that can be consulted for fetching data.
22+
pub manager: C,
2323

2424
/// The size of the request mailbox backlog.
2525
pub mailbox_size: usize,
@@ -48,7 +48,7 @@ pub fn init<E, C, B, S, R, P>(
4848
)
4949
where
5050
E: Rng + Spawner + Clock + GClock + Metrics,
51-
C: Coordinator<PublicKey = P>,
51+
C: Manager<PublicKey = P>,
5252
B: Block,
5353
S: Sender<PublicKey = P>,
5454
R: Receiver<PublicKey = P>,
@@ -59,7 +59,7 @@ where
5959
let (resolver_engine, resolver) = p2p::Engine::new(
6060
ctx.with_label("resolver"),
6161
p2p::Config {
62-
coordinator: config.coordinator,
62+
manager: config.manager,
6363
consumer: handler.clone(),
6464
producer: handler,
6565
mailbox_size: config.mailbox_size,

0 commit comments

Comments
 (0)