Skip to content

Commit 5ba20f8

Browse files
authored
[consensus/marshal] Cheaper CodedBlock::clone (#3708)
1 parent a7f1730 commit 5ba20f8

9 files changed

Lines changed: 82 additions & 113 deletions

File tree

consensus/src/marshal/coding/shards/engine.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ use rand::Rng;
174174
use std::{
175175
collections::{BTreeMap, VecDeque},
176176
num::NonZeroUsize,
177-
sync::Arc,
178177
};
179178
use thiserror::Error;
180179
use tracing::{debug, warn};
@@ -331,8 +330,7 @@ where
331330
/// An ephemeral cache of reconstructed blocks, keyed by commitment.
332331
///
333332
/// These blocks are evicted after a durability signal from the marshal.
334-
/// Wrapped in [`Arc`] to enable cheap cloning when serving multiple subscribers.
335-
reconstructed_blocks: BTreeMap<Commitment, Arc<CodedBlock<B, C, H>>>,
333+
reconstructed_blocks: BTreeMap<Commitment, CodedBlock<B, C, H>>,
336334

337335
/// Open subscriptions for assigned shard verification for the keyed
338336
/// [`Commitment`].
@@ -350,7 +348,7 @@ where
350348
/// the keyed [`Commitment`].
351349
#[allow(clippy::type_complexity)]
352350
block_subscriptions:
353-
BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<Arc<CodedBlock<B, C, H>>>>>,
351+
BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<CodedBlock<B, C, H>>>>,
354352

355353
/// Metrics for the shard engine.
356354
metrics: ShardMetrics<P>,
@@ -580,9 +578,9 @@ where
580578
fn try_reconstruct(
581579
&mut self,
582580
commitment: Commitment,
583-
) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
581+
) -> Result<Option<CodedBlock<B, C, H>>, Error<C>> {
584582
if let Some(block) = self.reconstructed_blocks.get(&commitment) {
585-
return Ok(Some(Arc::clone(block)));
583+
return Ok(Some(block.clone()));
586584
}
587585
let Some(state) = self.state.get_mut(&commitment) else {
588586
return Ok(None);
@@ -635,8 +633,8 @@ where
635633

636634
// Construct a coding block with a _trusted_ commitment. `S::decode` verified the blob's
637635
// integrity against the commitment, so shards can be lazily re-constructed if need be.
638-
let block = Arc::new(CodedBlock::new_trusted(inner, commitment));
639-
self.cache_block(Arc::clone(&block));
636+
let block = CodedBlock::new_trusted(inner, commitment);
637+
self.cache_block(block.clone());
640638
self.metrics.blocks_reconstructed_total.inc();
641639
Ok(Some(block))
642640
}
@@ -744,10 +742,9 @@ where
744742
}
745743

746744
/// Cache a block and notify all subscribers waiting on it.
747-
fn cache_block(&mut self, block: Arc<CodedBlock<B, C, H>>) {
745+
fn cache_block(&mut self, block: CodedBlock<B, C, H>) {
748746
let commitment = block.commitment();
749-
self.reconstructed_blocks
750-
.insert(commitment, Arc::clone(&block));
747+
self.reconstructed_blocks.insert(commitment, block.clone());
751748
self.notify_block_subscribers(block);
752749
}
753750

@@ -825,7 +822,6 @@ where
825822
}
826823

827824
// Cache the block so we don't have to reconstruct it again.
828-
let block = Arc::new(block);
829825
self.cache_block(block);
830826

831827
// Local proposals bypass reconstruction, so shard subscribers waiting
@@ -937,7 +933,7 @@ where
937933
fn handle_block_subscription(
938934
&mut self,
939935
key: BlockSubscriptionKey<B::Digest>,
940-
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
936+
response: oneshot::Sender<CodedBlock<B, C, H>>,
941937
) {
942938
let block = match key {
943939
BlockSubscriptionKey::Commitment(commitment) => {
@@ -951,7 +947,7 @@ where
951947

952948
// Answer immediately if we have the block cached.
953949
if let Some(block) = block {
954-
response.send_lossy(Arc::clone(block));
950+
response.send_lossy(block.clone());
955951
return;
956952
}
957953

@@ -975,7 +971,7 @@ where
975971
}
976972

977973
/// Notifies and cleans up any subscriptions for a reconstructed block.
978-
fn notify_block_subscribers(&mut self, block: Arc<CodedBlock<B, C, H>>) {
974+
fn notify_block_subscribers(&mut self, block: CodedBlock<B, C, H>) {
979975
let commitment = block.commitment();
980976
let digest = block.digest();
981977

@@ -985,7 +981,7 @@ where
985981
.remove(&BlockSubscriptionKey::Commitment(commitment))
986982
{
987983
for subscriber in subscribers.drain(..) {
988-
subscriber.send_lossy(Arc::clone(&block));
984+
subscriber.send_lossy(block.clone());
989985
}
990986
}
991987

@@ -995,7 +991,7 @@ where
995991
.remove(&BlockSubscriptionKey::Digest(digest))
996992
{
997993
for subscriber in subscribers.drain(..) {
998-
subscriber.send_lossy(Arc::clone(&block));
994+
subscriber.send_lossy(block.clone());
999995
}
1000996
}
1001997
}
@@ -1541,7 +1537,10 @@ mod tests {
15411537
future::Future,
15421538
marker::PhantomData,
15431539
num::NonZeroU32,
1544-
sync::atomic::{AtomicIsize, Ordering},
1540+
sync::{
1541+
atomic::{AtomicIsize, Ordering},
1542+
Arc,
1543+
},
15451544
time::Duration,
15461545
};
15471546

consensus/src/marshal/coding/shards/mailbox.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::{
88
use commonware_coding::Scheme as CodingScheme;
99
use commonware_cryptography::{Hasher, PublicKey};
1010
use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
11-
use std::sync::Arc;
1211

1312
/// A message that can be sent to the coding [`Engine`].
1413
///
@@ -41,14 +40,14 @@ where
4140
/// The [`Commitment`] of the block to get.
4241
commitment: Commitment,
4342
/// The response channel.
44-
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
43+
response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
4544
},
4645
/// A request to get a reconstructed block by its digest, if available.
4746
GetByDigest {
4847
/// The digest of the block to get.
4948
digest: B::Digest,
5049
/// The response channel.
51-
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
50+
response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
5251
},
5352
/// A request to open a subscription for assigned shard verification.
5453
///
@@ -72,15 +71,15 @@ where
7271
/// The block's digest.
7372
commitment: Commitment,
7473
/// The response channel.
75-
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
74+
response: oneshot::Sender<CodedBlock<B, C, H>>,
7675
},
7776
/// A request to open a subscription for the reconstruction of a [`CodedBlock`]
7877
/// by its digest.
7978
SubscribeByDigest {
8079
/// The block's digest.
8180
digest: B::Digest,
8281
/// The response channel.
83-
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
82+
response: oneshot::Sender<CodedBlock<B, C, H>>,
8483
},
8584
/// A request to prune all caches at and below the given commitment.
8685
Prune {
@@ -132,7 +131,7 @@ where
132131
}
133132

134133
/// Request a reconstructed block by its [`Commitment`].
135-
pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
134+
pub async fn get(&self, commitment: Commitment) -> Option<CodedBlock<B, C, H>> {
136135
self.sender
137136
.request(|tx| Message::GetByCommitment {
138137
commitment,
@@ -143,7 +142,7 @@ where
143142
}
144143

145144
/// Request a reconstructed block by its digest.
146-
pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
145+
pub async fn get_by_digest(&self, digest: B::Digest) -> Option<CodedBlock<B, C, H>> {
147146
self.sender
148147
.request(|tx| Message::GetByDigest {
149148
digest,
@@ -180,7 +179,7 @@ where
180179
pub async fn subscribe(
181180
&self,
182181
commitment: Commitment,
183-
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
182+
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
184183
let (responder, receiver) = oneshot::channel();
185184
let msg = Message::SubscribeByCommitment {
186185
commitment,
@@ -194,7 +193,7 @@ where
194193
pub async fn subscribe_by_digest(
195194
&self,
196195
digest: B::Digest,
197-
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
196+
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
198197
let (responder, receiver) = oneshot::channel();
199198
let msg = Message::SubscribeByDigest {
200199
digest,

consensus/src/marshal/coding/types.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use commonware_coding::{Config as CodingConfig, Scheme};
99
use commonware_cryptography::{Committable, Digestible, Hasher};
1010
use commonware_parallel::{Sequential, Strategy};
1111
use commonware_utils::{Faults, N3f1, NZU16};
12-
use std::marker::PhantomData;
12+
use std::{marker::PhantomData, sync::Arc};
1313

1414
/// A broadcastable shard of erasure coded data, including the coding commitment and
1515
/// the configuration used to code the data.
@@ -142,7 +142,7 @@ where
142142
#[derive(Debug)]
143143
pub struct CodedBlock<B: Block, C: Scheme, H: Hasher> {
144144
/// The inner block type.
145-
inner: B,
145+
inner: Arc<B>,
146146
/// The erasure coding configuration.
147147
config: CodingConfig,
148148
/// The erasure coding commitment.
@@ -152,7 +152,7 @@ pub struct CodedBlock<B: Block, C: Scheme, H: Hasher> {
152152
/// These shards are optional to enable lazy construction. If the block is
153153
/// constructed with [`Self::new_trusted`], the shards are computed lazily
154154
/// via [`Self::shards`].
155-
shards: Option<Vec<C::Shard>>,
155+
shards: Option<Arc<[C::Shard]>>,
156156
/// Phantom data for the hasher.
157157
_hasher: PhantomData<H>,
158158
}
@@ -175,18 +175,18 @@ impl<B: Block, C: Scheme, H: Hasher> CodedBlock<B, C, H> {
175175
pub fn new(inner: B, config: CodingConfig, strategy: &impl Strategy) -> Self {
176176
let (commitment, shards) = Self::encode(&inner, config, strategy);
177177
Self {
178-
inner,
178+
inner: Arc::new(inner),
179179
config,
180180
commitment,
181-
shards: Some(shards),
181+
shards: Some(shards.into()),
182182
_hasher: PhantomData,
183183
}
184184
}
185185

186186
/// Create a new [`CodedBlock`] from a [`Block`] and trusted [`Commitment`].
187187
pub fn new_trusted(inner: B, commitment: Commitment) -> Self {
188188
Self {
189-
inner,
189+
inner: Arc::new(inner),
190190
config: commitment.config(),
191191
commitment: commitment.root(),
192192
shards: None,
@@ -213,7 +213,7 @@ impl<B: Block, C: Scheme, H: Hasher> CodedBlock<B, C, H> {
213213
"coded block constructed with trusted commitment does not match commitment"
214214
);
215215

216-
self.shards = Some(shards);
216+
self.shards = Some(shards.into());
217217
self.shards.as_ref().unwrap()
218218
}
219219
}
@@ -232,28 +232,28 @@ impl<B: Block, C: Scheme, H: Hasher> CodedBlock<B, C, H> {
232232
}
233233

234234
/// Returns a reference to the inner [`Block`].
235-
pub const fn inner(&self) -> &B {
235+
pub fn inner(&self) -> &B {
236236
&self.inner
237237
}
238238

239239
/// Takes the inner [`Block`].
240240
pub fn into_inner(self) -> B {
241-
self.inner
241+
Arc::unwrap_or_clone(self.inner)
242242
}
243243
}
244244

245-
impl<B: CertifiableBlock, C: Scheme, H: Hasher> From<CodedBlock<B, C, H>>
245+
impl<B: CertifiableBlock + Clone, C: Scheme, H: Hasher> From<CodedBlock<B, C, H>>
246246
for StoredCodedBlock<B, C, H>
247247
{
248248
fn from(block: CodedBlock<B, C, H>) -> Self {
249249
Self::new(block)
250250
}
251251
}
252252

253-
impl<B: Block + Clone, C: Scheme, H: Hasher> Clone for CodedBlock<B, C, H> {
253+
impl<B: Block, C: Scheme, H: Hasher> Clone for CodedBlock<B, C, H> {
254254
fn clone(&self) -> Self {
255255
Self {
256-
inner: self.inner.clone(),
256+
inner: Arc::clone(&self.inner),
257257
config: self.config,
258258
commitment: self.commitment,
259259
shards: self.shards.clone(),
@@ -315,10 +315,10 @@ impl<B: Block, C: Scheme, H: Hasher> Read for CodedBlock<B, C, H> {
315315
})?;
316316

317317
Ok(Self {
318-
inner,
318+
inner: Arc::new(inner),
319319
config,
320320
commitment,
321-
shards: Some(shards),
321+
shards: Some(shards.into()),
322322
_hasher: PhantomData,
323323
})
324324
}
@@ -380,15 +380,15 @@ pub struct StoredCodedBlock<B: Block, C: Scheme, H: Hasher> {
380380
_scheme: PhantomData<(C, H)>,
381381
}
382382

383-
impl<B: CertifiableBlock, C: Scheme, H: Hasher> StoredCodedBlock<B, C, H> {
383+
impl<B: CertifiableBlock + Clone, C: Scheme, H: Hasher> StoredCodedBlock<B, C, H> {
384384
/// Create a [`StoredCodedBlock`] from a verified [`CodedBlock`].
385385
///
386386
/// The caller must ensure the [`CodedBlock`] has been properly verified
387387
/// (i.e., its commitment was computed or validated against a trusted source).
388388
pub fn new(block: CodedBlock<B, C, H>) -> Self {
389389
Self {
390390
commitment: block.commitment(),
391-
inner: block.inner,
391+
inner: block.into_inner(),
392392
_scheme: PhantomData,
393393
}
394394
}
@@ -621,6 +621,24 @@ mod test {
621621
assert!(coded_block == decoded);
622622
}
623623

624+
#[test]
625+
fn test_coded_block_clone_shares_storage() {
626+
const CONFIG: CodingConfig = CodingConfig {
627+
minimum_shards: NZU16!(1),
628+
extra_shards: NZU16!(2),
629+
};
630+
631+
let block = Block::new::<Sha256>((), Sha256::hash(b"parent"), Height::new(42), 1_234_567);
632+
let coded_block = CodedBlock::<Block, RS, H>::new(block, CONFIG, &Sequential);
633+
let cloned = coded_block.clone();
634+
635+
assert!(Arc::ptr_eq(&coded_block.inner, &cloned.inner));
636+
assert!(Arc::ptr_eq(
637+
coded_block.shards.as_ref().unwrap(),
638+
cloned.shards.as_ref().unwrap()
639+
));
640+
}
641+
624642
#[test]
625643
fn test_stored_coded_block_codec_roundtrip() {
626644
const CONFIG: CodingConfig = CodingConfig {

consensus/src/marshal/coding/variant.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use commonware_coding::Scheme as CodingScheme;
1414
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
1515
use commonware_p2p::Recipients;
1616
use commonware_utils::channel::oneshot;
17-
use std::sync::Arc;
1817

1918
/// The coding variant of Marshal, which uses erasure coding for block dissemination.
2019
///
@@ -68,30 +67,29 @@ where
6867
P: PublicKey,
6968
{
7069
type PublicKey = P;
71-
type CachedBlock = Arc<CodedBlock<B, C, H>>;
7270

7371
async fn find_by_digest(
7472
&self,
7573
digest: <CodedBlock<B, C, H> as Digestible>::Digest,
76-
) -> Option<Self::CachedBlock> {
74+
) -> Option<CodedBlock<B, C, H>> {
7775
self.get_by_digest(digest).await
7876
}
7977

80-
async fn find_by_commitment(&self, commitment: Commitment) -> Option<Self::CachedBlock> {
78+
async fn find_by_commitment(&self, commitment: Commitment) -> Option<CodedBlock<B, C, H>> {
8179
self.get(commitment).await
8280
}
8381

8482
async fn subscribe_by_digest(
8583
&self,
8684
digest: <CodedBlock<B, C, H> as Digestible>::Digest,
87-
) -> oneshot::Receiver<Self::CachedBlock> {
85+
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
8886
self.subscribe_by_digest(digest).await
8987
}
9088

9189
async fn subscribe_by_commitment(
9290
&self,
9391
commitment: Commitment,
94-
) -> oneshot::Receiver<Self::CachedBlock> {
92+
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
9593
self.subscribe(commitment).await
9694
}
9795

0 commit comments

Comments
 (0)