-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathacks.rs
More file actions
192 lines (167 loc) · 6.1 KB
/
acks.rs
File metadata and controls
192 lines (167 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
use super::Variant;
use crate::types::Height;
use commonware_utils::{futures::OptionFuture, Acknowledgement};
use futures::FutureExt;
use pin_project::pin_project;
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// A pending acknowledgement from the application for a block at the contained height/commitment.
#[pin_project]
pub(super) struct PendingAck<V: Variant, A: Acknowledgement> {
pub(super) height: Height,
pub(super) commitment: V::Commitment,
#[pin]
pub(super) receiver: A::Waiter,
}
impl<V: Variant, A: Acknowledgement> Future for PendingAck<V, A> {
type Output = <A::Waiter as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().receiver.poll(cx)
}
}
/// Tracks in-flight application acknowledgements with FIFO semantics.
pub(super) struct PendingAcks<V: Variant, A: Acknowledgement> {
current: OptionFuture<PendingAck<V, A>>,
queue: VecDeque<PendingAck<V, A>>,
max: usize,
}
impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
/// Creates a new pending-ack tracker with a maximum in-flight capacity.
pub(super) fn new(max: usize) -> Self {
Self {
current: None.into(),
queue: VecDeque::with_capacity(max),
max,
}
}
/// Drops the current ack and all queued acks.
pub(super) fn clear(&mut self) {
self.current = None.into();
self.queue.clear();
}
/// Returns the currently armed ack future (if any) for `select_loop!`.
pub(super) const fn current(&mut self) -> &mut OptionFuture<PendingAck<V, A>> {
&mut self.current
}
/// Returns whether we can dispatch another block without exceeding capacity.
pub(super) fn has_capacity(&self) -> bool {
let reserved = usize::from(self.current.is_some());
self.queue.len() < self.max - reserved
}
/// Returns the next height to dispatch while preserving sequential order.
pub(super) fn next_dispatch_height(&self, start_height: Height) -> Height {
self.queue
.back()
.map(|ack| ack.height.next())
.or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
.unwrap_or(start_height)
}
/// Enqueues a newly dispatched ack, arming it immediately when idle.
pub(super) fn enqueue(&mut self, ack: PendingAck<V, A>) {
if self.current.is_none() {
self.current.replace(ack);
return;
}
self.queue.push_back(ack);
}
/// Returns metadata for a completed current ack and arms the next queued ack.
pub(super) fn complete_current(
&mut self,
result: <A::Waiter as Future>::Output,
) -> (Height, V::Commitment, <A::Waiter as Future>::Output) {
let PendingAck {
height, commitment, ..
} = self.current.take().expect("ack state must be present");
if let Some(next) = self.queue.pop_front() {
self.current.replace(next);
}
(height, commitment, result)
}
/// If the current ack is already resolved, takes it and arms the next ack.
pub(super) fn pop_ready(
&mut self,
) -> Option<(Height, V::Commitment, <A::Waiter as Future>::Output)> {
let pending = self.current.as_mut()?;
let result = Pin::new(&mut pending.receiver).now_or_never()?;
Some(self.complete_current(result))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
marshal::{mocks::block::Block, standard::Standard},
types::Height,
};
use commonware_cryptography::sha256::{Digest, Sha256};
use commonware_utils::acknowledgement::Exact;
type TestBlock = Block<Digest, ()>;
type TestVariant = Standard<TestBlock>;
fn digest(byte: u8) -> Digest {
Sha256::fill(byte)
}
fn pending_ack(height: u64, byte: u8) -> (PendingAck<TestVariant, Exact>, Exact) {
let (ack, receiver) = Exact::handle();
(
PendingAck {
height: Height::new(height),
commitment: digest(byte),
receiver,
},
ack,
)
}
#[test]
fn enqueue_tracks_capacity_and_fifo_ready_order() {
let mut pending = PendingAcks::<TestVariant, Exact>::new(2);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(8));
let (first, first_ack) = pending_ack(8, 1);
pending.enqueue(first);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(9));
let (second, second_ack) = pending_ack(9, 2);
pending.enqueue(second);
assert!(!pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(8)),
Height::new(10)
);
second_ack.acknowledge();
assert!(pending.pop_ready().is_none());
first_ack.acknowledge();
let (height, commitment, result) = pending.pop_ready().expect("first ack should be ready");
assert_eq!(height, Height::new(8));
assert_eq!(commitment, digest(1));
assert!(result.is_ok());
let (height, commitment, result) = pending
.pop_ready()
.expect("queued ready ack should be armed next");
assert_eq!(height, Height::new(9));
assert_eq!(commitment, digest(2));
assert!(result.is_ok());
assert!(pending.has_capacity());
}
#[test]
fn clear_drops_all_pending_acks() {
let mut pending = PendingAcks::<TestVariant, Exact>::new(2);
let (first, first_ack) = pending_ack(3, 1);
let (second, second_ack) = pending_ack(4, 2);
pending.enqueue(first);
pending.enqueue(second);
assert!(!pending.has_capacity());
pending.clear();
first_ack.acknowledge();
second_ack.acknowledge();
assert!(pending.pop_ready().is_none());
assert!(pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(10)),
Height::new(10)
);
}
}