Skip to content

Commit c7574ac

Browse files
spike
1 parent 205dbe0 commit c7574ac

7 files changed

Lines changed: 64 additions & 98 deletions

File tree

consensus/src/marshal/coding/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,19 @@ mod tests {
156156
None
157157
}
158158

159-
fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
159+
fn subscribe_by_digest(&self, _digest: D) -> Option<oneshot::Receiver<TestCodedBlock>> {
160160
let (sender, receiver) = oneshot::channel();
161161
self.digest_subscriptions.lock().push(sender);
162-
receiver
162+
Some(receiver)
163163
}
164164

165165
fn subscribe_by_commitment(
166166
&self,
167167
_commitment: Commitment,
168-
) -> oneshot::Receiver<TestCodedBlock> {
168+
) -> Option<oneshot::Receiver<TestCodedBlock>> {
169169
let (sender, receiver) = oneshot::channel();
170170
self.commitment_subscriptions.lock().push(sender);
171-
receiver
171+
Some(receiver)
172172
}
173173

174174
fn finalized(&self, _commitment: Commitment) {}

consensus/src/marshal/coding/variant.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ where
9595
fn subscribe_by_digest(
9696
&self,
9797
digest: <CodedBlock<B, C, H> as Digestible>::Digest,
98-
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
99-
self.subscribe_by_digest(digest)
98+
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
99+
Some(self.subscribe_by_digest(digest))
100100
}
101101

102102
fn subscribe_by_commitment(
103103
&self,
104104
commitment: Commitment,
105-
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
106-
self.subscribe(commitment)
105+
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
106+
Some(self.subscribe(commitment))
107107
}
108108

109109
fn finalized(&self, commitment: Commitment) {

consensus/src/marshal/core/actor.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use super::{
66
mailbox::{CommitmentFallback, Mailbox, Message},
77
stream::Stream,
88
subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
9-
variant::OptionalBuffer,
109
Buffer, Variant,
1110
};
1211
use crate::{
@@ -296,7 +295,7 @@ where
296295
pub fn start<R, Buf>(
297296
mut self,
298297
application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
299-
buffer: Option<Buf>,
298+
buffer: Buf,
300299
resolver: (handler::Receiver<V::Commitment>, R),
301300
) -> Handle<()>
302301
where
@@ -307,15 +306,14 @@ where
307306
>,
308307
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
309308
{
310-
let buffer = OptionalBuffer::new(buffer);
311309
spawn_cell!(self.context, self.run(application, buffer, resolver))
312310
}
313311

314312
/// Run the application actor.
315313
async fn run<R, Buf>(
316314
mut self,
317315
mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
318-
mut buffer: OptionalBuffer<V, Buf>,
316+
mut buffer: Buf,
319317
(mut resolver_rx, mut resolver): (handler::Receiver<V::Commitment>, R),
320318
) where
321319
R: Resolver<
@@ -436,7 +434,7 @@ where
436434
&mut self,
437435
result: <A::Waiter as Future>::Output,
438436
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
439-
buffer: &mut OptionalBuffer<V, Buf>,
437+
buffer: &mut Buf,
440438
resolver: &mut R,
441439
) where
442440
Buf: Buffer<V>,
@@ -491,7 +489,7 @@ where
491489
message: Message<P::Scheme, V>,
492490
resolver: &mut R,
493491
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
494-
buffer: &mut OptionalBuffer<V, Buf>,
492+
buffer: &mut Buf,
495493
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
496494
) where
497495
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -764,7 +762,7 @@ where
764762
message: handler::Message<V::Commitment>,
765763
resolver_rx: &mut handler::Receiver<V::Commitment>,
766764
resolver: &mut R,
767-
buffer: &mut OptionalBuffer<V, Buf>,
765+
buffer: &mut Buf,
768766
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
769767
) where
770768
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -850,7 +848,7 @@ where
850848
&self,
851849
key: ResolverRequestFor<V>,
852850
response: oneshot::Sender<Bytes>,
853-
buffer: &OptionalBuffer<V, Buf>,
851+
buffer: &Buf,
854852
) {
855853
match key {
856854
Key::Block(commitment) => {
@@ -898,7 +896,7 @@ where
898896
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
899897
>,
900898
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
901-
buffer: &mut OptionalBuffer<V, Buf>,
899+
buffer: &mut Buf,
902900
) {
903901
let digest = match key {
904902
SubscriptionKey::Digest(digest) => digest,
@@ -985,7 +983,7 @@ where
985983
finalization: Finalization<P::Scheme, V::Commitment>,
986984
skip_if_superseded: bool,
987985
resolver: &mut R,
988-
buffer: &mut OptionalBuffer<V, Buf>,
986+
buffer: &mut Buf,
989987
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
990988
) where
991989
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -1052,7 +1050,7 @@ where
10521050
async fn apply_floor_anchor<Buf: Buffer<V>>(
10531051
&mut self,
10541052
block: &V::Block,
1055-
buffer: &mut OptionalBuffer<V, Buf>,
1053+
buffer: &mut Buf,
10561054
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
10571055
resolver: &mut impl Resolver<
10581056
Key = ResolverRequestFor<V>,
@@ -1171,7 +1169,7 @@ where
11711169
&mut self,
11721170
message: ResolverDelivery<V>,
11731171
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
1174-
buffer: &mut OptionalBuffer<V, Buf>,
1172+
buffer: &mut Buf,
11751173
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
11761174
resolver: &mut impl Resolver<
11771175
Key = ResolverRequestFor<V>,
@@ -1349,7 +1347,7 @@ where
13491347
async fn verify_delivered<Buf: Buffer<V>>(
13501348
&mut self,
13511349
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1352-
buffer: &mut OptionalBuffer<V, Buf>,
1350+
buffer: &mut Buf,
13531351
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
13541352
resolver: &mut impl Resolver<
13551353
Key = ResolverRequestFor<V>,
@@ -1848,7 +1846,7 @@ where
18481846
/// parent links).
18491847
async fn find_block_by_digest<Buf: Buffer<V>>(
18501848
&self,
1851-
buffer: &OptionalBuffer<V, Buf>,
1849+
buffer: &Buf,
18521850
digest: <V::Block as Digestible>::Digest,
18531851
) -> Option<V::Block> {
18541852
if let Some(block) = buffer.find_by_digest(digest).await {
@@ -1863,7 +1861,7 @@ where
18631861
/// Having the full commitment may enable additional retrieval mechanisms.
18641862
async fn find_block_by_commitment<Buf: Buffer<V>>(
18651863
&self,
1866-
buffer: &OptionalBuffer<V, Buf>,
1864+
buffer: &Buf,
18671865
commitment: V::Commitment,
18681866
) -> Option<V::Block> {
18691867
if let Some(block) = buffer.find_by_commitment(commitment).await {
@@ -1888,7 +1886,7 @@ where
18881886
/// needs a subsequent [`sync_finalized`](Self::sync_finalized).
18891887
async fn try_repair_gaps<Buf: Buffer<V>>(
18901888
&mut self,
1891-
buffer: &mut OptionalBuffer<V, Buf>,
1889+
buffer: &mut Buf,
18921890
resolver: &mut impl Resolver<
18931891
Key = ResolverRequestFor<V>,
18941892
Subscriber = Annotation,

consensus/src/marshal/core/subscriptions.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{variant::OptionalBuffer, Buffer, Variant};
1+
use super::{Buffer, Variant};
22
use commonware_cryptography::Digestible;
33
use commonware_utils::{
44
channel::{fallible::OneshotExt, oneshot},
@@ -71,7 +71,7 @@ impl<V: Variant> Subscriptions<V> {
7171
key: KeyFor<V>,
7272
response: oneshot::Sender<V::Block>,
7373
waiters: &mut AbortablePool<Result<V::Block, KeyFor<V>>>,
74-
buffer: &OptionalBuffer<V, Buf>,
74+
buffer: &Buf,
7575
) {
7676
match self.entries.entry(key) {
7777
Entry::Occupied(mut entry) => {
@@ -147,16 +147,16 @@ mod tests {
147147
None
148148
}
149149

150-
fn subscribe_by_digest(&self, _digest: Digest) -> oneshot::Receiver<TestBlock> {
150+
fn subscribe_by_digest(&self, _digest: Digest) -> Option<oneshot::Receiver<TestBlock>> {
151151
let (sender, receiver) = oneshot::channel();
152152
self.digest_subscribers.lock().push(sender);
153-
receiver
153+
Some(receiver)
154154
}
155155

156-
fn subscribe_by_commitment(&self, _commitment: Digest) -> oneshot::Receiver<TestBlock> {
156+
fn subscribe_by_commitment(&self, _commitment: Digest) -> Option<oneshot::Receiver<TestBlock>> {
157157
let (sender, receiver) = oneshot::channel();
158158
self.commitment_subscribers.lock().push(sender);
159-
receiver
159+
Some(receiver)
160160
}
161161

162162
fn finalized(&self, _commitment: Digest) {}
@@ -179,7 +179,7 @@ mod tests {
179179
#[test]
180180
fn insert_coalesces_duplicate_keys() {
181181
let test_buffer = TestBuffer::default();
182-
let buffer = OptionalBuffer::new(Some(test_buffer.clone()));
182+
let buffer = Some(test_buffer.clone());
183183
let mut waiters = TestWaiters::default();
184184
let mut subscriptions = Subscriptions::<TestVariant>::new();
185185
let block = block(1, 10);
@@ -211,7 +211,7 @@ mod tests {
211211
#[test]
212212
fn notify_wakes_digest_and_commitment_subscribers() {
213213
let test_buffer = TestBuffer::default();
214-
let buffer = OptionalBuffer::new(Some(test_buffer.clone()));
214+
let buffer = Some(test_buffer.clone());
215215
let mut waiters = TestWaiters::default();
216216
let mut subscriptions = Subscriptions::<TestVariant>::new();
217217
let block = block(2, 20);
@@ -243,7 +243,7 @@ mod tests {
243243

244244
#[test]
245245
fn retain_open_drops_closed_subscribers_and_keeps_open_ones() {
246-
let buffer = OptionalBuffer::new(Some(TestBuffer::default()));
246+
let buffer = Some(TestBuffer::default());
247247
let mut waiters = TestWaiters::default();
248248
let mut subscriptions = Subscriptions::<TestVariant>::new();
249249
let block = block(3, 30);
@@ -279,7 +279,7 @@ mod tests {
279279
#[test]
280280
fn remove_drops_waiter_and_aborts_buffer_waiter() {
281281
deterministic::Runner::default().start(|context| async move {
282-
let buffer = OptionalBuffer::new(Some(TestBuffer::default()));
282+
let buffer = Some(TestBuffer::default());
283283
let mut waiters = TestWaiters::default();
284284
let mut subscriptions = Subscriptions::<TestVariant>::new();
285285
let block = block(4, 40);
@@ -307,16 +307,11 @@ mod tests {
307307
fn insert_without_buffer_keeps_local_subscriber() {
308308
let mut waiters = TestWaiters::default();
309309
let mut subscriptions = Subscriptions::<TestVariant>::new();
310-
let buffer = OptionalBuffer::<TestVariant, TestBuffer>::new(None);
310+
let buffer: Option<TestBuffer> = None;
311311
let block = block(5, 50);
312312

313313
let (sender, receiver) = oneshot::channel();
314-
subscriptions.insert::<TestBuffer>(
315-
Key::Digest(block.digest()),
316-
sender,
317-
&mut waiters,
318-
&buffer,
319-
);
314+
subscriptions.insert(Key::Digest(block.digest()), sender, &mut waiters, &buffer);
320315

321316
assert_eq!(subscriptions.entries.len(), 1);
322317
subscriptions.notify(&block);

0 commit comments

Comments
 (0)