-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathmailbox.rs
More file actions
222 lines (210 loc) · 7.47 KB
/
mailbox.rs
File metadata and controls
222 lines (210 loc) · 7.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
//! Mailbox for the shard buffer engine.
use crate::{
marshal::coding::types::CodedBlock,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Hasher, PublicKey};
use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
use std::sync::Arc;
/// A message that can be sent to the coding [`Engine`].
///
/// [`Engine`]: super::Engine
pub enum Message<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
/// A request to broadcast a proposed [`CodedBlock`] to all peers.
Proposed {
/// The erasure coded block.
block: CodedBlock<B, C, H>,
/// The round in which the block was proposed.
round: Round,
/// A channel signaled once the engine has accepted the proposal for
/// dissemination. Callers rely on this ack to know that the shards
/// have been enqueued onto the network.
ack: oneshot::Sender<()>,
},
/// A notification from consensus that a [`Commitment`] has been discovered.
Discovered {
/// The [`Commitment`] of the proposed block.
commitment: Commitment,
/// The leader's public key.
leader: P,
/// The round in which the commitment was proposed.
round: Round,
},
/// A request to get a reconstructed block, if available.
GetByCommitment {
/// The [`Commitment`] of the block to get.
commitment: Commitment,
/// The response channel.
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
},
/// A request to get a reconstructed block by its digest, if available.
GetByDigest {
/// The digest of the block to get.
digest: B::Digest,
/// The response channel.
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
},
/// A request to open a subscription for assigned shard verification.
///
/// For participants, this resolves once the leader-delivered shard for
/// the local participant index has been verified. Reconstructing the full
/// block from gossiped shards does not resolve this subscription: that
/// block may still be used for later certification, but it is not enough
/// to claim the participant received the shard it is expected to echo.
///
/// For proposers, this resolves immediately after the locally built block
/// is cached because they trivially have all shards.
SubscribeAssignedShardVerified {
/// The block's commitment.
commitment: Commitment,
/// The response channel.
response: oneshot::Sender<()>,
},
/// A request to open a subscription for the reconstruction of a [`CodedBlock`]
/// by its [`Commitment`].
SubscribeByCommitment {
/// The block's digest.
commitment: Commitment,
/// The response channel.
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
},
/// A request to open a subscription for the reconstruction of a [`CodedBlock`]
/// by its digest.
SubscribeByDigest {
/// The block's digest.
digest: B::Digest,
/// The response channel.
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
},
/// A request to prune all caches at and below the given commitment.
Prune {
/// Inclusive prune target [`Commitment`].
through: Commitment,
},
}
/// A mailbox for sending messages to the [`Engine`].
///
/// [`Engine`]: super::Engine
#[derive(Clone)]
pub struct Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
}
impl<B, C, H, P> Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
/// Create a new [`Mailbox`] with the given sender.
pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
Self { sender }
}
/// Broadcast a proposed erasure coded block's shards to the participants.
///
/// Resolves to `true` once the engine has accepted the proposal. Returns
/// `false` when the engine has shut down or dropped the ack before
/// accepting.
pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) -> bool {
self.sender
.request(|ack| Message::Proposed { block, round, ack })
.await
.is_some()
}
/// Inform the engine of an externally proposed [`Commitment`].
pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
let msg = Message::Discovered {
commitment,
leader,
round,
};
self.sender.send_lossy(msg).await;
}
/// Request a reconstructed block by its [`Commitment`].
pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
self.sender
.request(|tx| Message::GetByCommitment {
commitment,
response: tx,
})
.await
.flatten()
}
/// Request a reconstructed block by its digest.
pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
self.sender
.request(|tx| Message::GetByDigest {
digest,
response: tx,
})
.await
.flatten()
}
/// Subscribe to assigned shard verification for a commitment.
///
/// For participants, this resolves once the leader-delivered shard for
/// the local participant index has been verified. Reconstructing the full
/// block from gossiped shards does not resolve this subscription: that
/// block may still be used for later certification, but it is not enough
/// to claim the participant received the shard it is expected to echo.
///
/// For proposers, this resolves immediately after the locally built block
/// is cached because they trivially have all shards.
pub async fn subscribe_assigned_shard_verified(
&self,
commitment: Commitment,
) -> oneshot::Receiver<()> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeAssignedShardVerified {
commitment,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
/// Subscribe to the reconstruction of a [`CodedBlock`] by its [`Commitment`].
pub async fn subscribe(
&self,
commitment: Commitment,
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeByCommitment {
commitment,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
/// Subscribe to the reconstruction of a [`CodedBlock`] by its digest.
pub async fn subscribe_by_digest(
&self,
digest: B::Digest,
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeByDigest {
digest,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
/// Request to prune all caches at and below the given commitment.
pub async fn prune(&self, through: Commitment) {
let msg = Message::Prune { through };
self.sender.send_lossy(msg).await;
}
}