Skip to content

Commit 54ee26b

Browse files
[Codec] [Runtime] [p2p] Multi-buffer encoding for zero-copy of Bytes fields (#3491)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 44f9bc8 commit 54ee26b

20 files changed

Lines changed: 801 additions & 29 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codec/src/codec.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ pub trait FixedSize {
2222
pub trait EncodeSize {
2323
/// Returns the encoded size of this value (in bytes).
2424
fn encode_size(&self) -> usize;
25+
26+
/// Returns the encoded size excluding bytes passed to [`BufsMut::push`]
27+
/// during [`Write::write_bufs`]. Used to size the working buffer for inline
28+
/// writes. Override alongside [`Write::write_bufs`] for types where large
29+
/// [`Bytes`] fields go via push; failing to do so will over-allocate.
30+
fn encode_inline_size(&self) -> usize {
31+
self.encode_size()
32+
}
2533
}
2634

2735
// Automatically implement `EncodeSize` for types that are `FixedSize`.
@@ -37,6 +45,13 @@ pub trait Write {
3745
///
3846
/// Implementations should panic if the buffer doesn't have enough capacity.
3947
fn write(&self, buf: &mut impl BufMut);
48+
49+
/// Writes to a [`BufsMut`], allowing existing [`Bytes`] chunks to be
50+
/// appended via [`BufsMut::push`] instead of written inline. Must encode
51+
/// to the same format as [`Write::write`]. Defaults to [`Write::write`].
52+
fn write_bufs(&self, buf: &mut impl BufsMut) {
53+
self.write(buf);
54+
}
4055
}
4156

4257
/// Trait for types that can be read (decoded) from a byte buffer.
@@ -194,6 +209,13 @@ pub trait CodecFixedShared: CodecFixed<Cfg = ()> + Send + Sync {}
194209
// Automatically implement `CodecFixedShared` for types that meet all bounds.
195210
impl<T: CodecFixed<Cfg = ()> + Send + Sync> CodecFixedShared for T {}
196211

212+
/// A [`BufMut`] that can also append pre-existing [`Bytes`] chunks.
213+
pub trait BufsMut: BufMut {
214+
/// Appends a [`Bytes`] chunk instead of writing its contents inline into
215+
/// the destination buffer.
216+
fn push(&mut self, bytes: impl Into<Bytes>);
217+
}
218+
197219
#[cfg(test)]
198220
mod tests {
199221
use super::*;

codec/src/types/btree_map.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
extern crate alloc;
77

88
use crate::{
9-
codec::{EncodeSize, Read, Write},
9+
codec::{BufsMut, EncodeSize, Read, Write},
1010
error::Error,
1111
types::read_ordered_map,
1212
RangeCfg,
@@ -28,6 +28,16 @@ impl<K: Ord + Eq + Write, V: Write> Write for BTreeMap<K, V> {
2828
v.write(buf);
2929
}
3030
}
31+
32+
fn write_bufs(&self, buf: &mut impl BufsMut) {
33+
self.len().write(buf);
34+
35+
// Keys are already sorted in BTreeMap, so we can iterate directly
36+
for (k, v) in self {
37+
k.write_bufs(buf);
38+
v.write_bufs(buf);
39+
}
40+
}
3141
}
3242

3343
impl<K: Ord + Eq + EncodeSize, V: EncodeSize> EncodeSize for BTreeMap<K, V> {
@@ -42,6 +52,18 @@ impl<K: Ord + Eq + EncodeSize, V: EncodeSize> EncodeSize for BTreeMap<K, V> {
4252
}
4353
size
4454
}
55+
56+
fn encode_inline_size(&self) -> usize {
57+
// Start with the size of the length prefix
58+
let mut size = self.len().encode_size();
59+
60+
// Add the encoded size of each key and value
61+
for (k, v) in self {
62+
size += k.encode_inline_size();
63+
size += v.encode_inline_size();
64+
}
65+
size
66+
}
4567
}
4668

4769
impl<K: Read + Clone + Ord + Eq, V: Read + Clone> Read for BTreeMap<K, V> {

codec/src/types/btree_set.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
extern crate alloc;
77

88
use crate::{
9-
codec::{EncodeSize, Read, Write},
9+
codec::{BufsMut, EncodeSize, Read, Write},
1010
error::Error,
1111
types::read_ordered_set,
1212
RangeCfg,
@@ -27,6 +27,15 @@ impl<K: Ord + Eq + Write> Write for BTreeSet<K> {
2727
item.write(buf);
2828
}
2929
}
30+
31+
fn write_bufs(&self, buf: &mut impl BufsMut) {
32+
self.len().write(buf);
33+
34+
// Items are already sorted in BTreeSet, so we can iterate directly
35+
for item in self {
36+
item.write_bufs(buf);
37+
}
38+
}
3039
}
3140

3241
impl<K: Ord + Eq + EncodeSize> EncodeSize for BTreeSet<K> {
@@ -37,6 +46,14 @@ impl<K: Ord + Eq + EncodeSize> EncodeSize for BTreeSet<K> {
3746
}
3847
size
3948
}
49+
50+
fn encode_inline_size(&self) -> usize {
51+
let mut size = self.len().encode_size();
52+
for item in self {
53+
size += item.encode_inline_size();
54+
}
55+
size
56+
}
4057
}
4158

4259
impl<K: Read + Clone + Ord + Eq> Read for BTreeSet<K> {

codec/src/types/bytes.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! For portability and consistency between architectures,
44
//! the length of the [Bytes] must fit within a [u32].
55
6-
use crate::{util::at_least, EncodeSize, Error, RangeCfg, Read, Write};
6+
use crate::{util::at_least, BufsMut, EncodeSize, Error, RangeCfg, Read, Write};
77
use bytes::{Buf, BufMut, Bytes};
88

99
impl Write for Bytes {
@@ -12,13 +12,24 @@ impl Write for Bytes {
1212
self.len().write(buf);
1313
buf.put_slice(self);
1414
}
15+
16+
#[inline]
17+
fn write_bufs(&self, buf: &mut impl BufsMut) {
18+
self.len().write(buf);
19+
buf.push(self.clone());
20+
}
1521
}
1622

1723
impl EncodeSize for Bytes {
1824
#[inline]
1925
fn encode_size(&self) -> usize {
2026
self.len().encode_size() + self.len()
2127
}
28+
29+
#[inline]
30+
fn encode_inline_size(&self) -> usize {
31+
self.len().encode_size()
32+
}
2233
}
2334

2435
impl Read for Bytes {

codec/src/types/hash_map.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! the size of the map must fit within a [u32].
55
66
use crate::{
7-
codec::{EncodeSize, Read, Write},
7+
codec::{BufsMut, EncodeSize, Read, Write},
88
error::Error,
99
types::read_ordered_map,
1010
RangeCfg,
@@ -28,6 +28,18 @@ impl<K: Ord + Hash + Eq + Write, V: Write> Write for HashMap<K, V> {
2828
v.write(buf);
2929
}
3030
}
31+
32+
fn write_bufs(&self, buf: &mut impl BufsMut) {
33+
self.len().write(buf);
34+
35+
// Sort the keys to ensure deterministic encoding
36+
let mut entries: Vec<_> = self.iter().collect();
37+
entries.sort_by(|a, b| a.0.cmp(b.0));
38+
for (k, v) in entries {
39+
k.write_bufs(buf);
40+
v.write_bufs(buf);
41+
}
42+
}
3143
}
3244

3345
impl<K: Ord + Hash + Eq + EncodeSize, V: EncodeSize> EncodeSize for HashMap<K, V> {
@@ -43,6 +55,19 @@ impl<K: Ord + Hash + Eq + EncodeSize, V: EncodeSize> EncodeSize for HashMap<K, V
4355
}
4456
size
4557
}
58+
59+
fn encode_inline_size(&self) -> usize {
60+
// Start with the size of the length prefix
61+
let mut size = self.len().encode_size();
62+
63+
// Add the encoded size of each key and value
64+
// Note: Iteration order doesn't matter for size calculation.
65+
for (k, v) in self {
66+
size += k.encode_inline_size();
67+
size += v.encode_inline_size();
68+
}
69+
size
70+
}
4671
}
4772

4873
// Read implementation for HashMap

codec/src/types/hash_set.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! the size of the set must fit within a [u32].
55
66
use crate::{
7-
codec::{EncodeSize, Read, Write},
7+
codec::{BufsMut, EncodeSize, Read, Write},
88
error::Error,
99
types::read_ordered_set,
1010
RangeCfg,
@@ -25,6 +25,17 @@ impl<K: Ord + Hash + Eq + Write> Write for HashSet<K> {
2525
item.write(buf);
2626
}
2727
}
28+
29+
fn write_bufs(&self, buf: &mut impl BufsMut) {
30+
self.len().write(buf);
31+
32+
// Sort the items to ensure deterministic encoding
33+
let mut items: Vec<_> = self.iter().collect();
34+
items.sort();
35+
for item in items {
36+
item.write_bufs(buf);
37+
}
38+
}
2839
}
2940

3041
impl<K: Ord + Hash + Eq + EncodeSize> EncodeSize for HashSet<K> {
@@ -37,6 +48,16 @@ impl<K: Ord + Hash + Eq + EncodeSize> EncodeSize for HashSet<K> {
3748
}
3849
size
3950
}
51+
52+
fn encode_inline_size(&self) -> usize {
53+
let mut size = self.len().encode_size();
54+
55+
// Note: Iteration order doesn't matter for size calculation.
56+
for item in self {
57+
size += item.encode_inline_size();
58+
}
59+
size
60+
}
4061
}
4162

4263
impl<K: Read + Clone + Ord + Hash + Eq> Read for HashSet<K> {

codec/src/types/lazy.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! This module exports the [`Lazy`] type.
22
3-
use crate::{Decode, Encode, EncodeSize, FixedSize, Read, Write};
3+
use crate::{BufsMut, Decode, Encode, EncodeSize, FixedSize, Read, Write};
44
use bytes::{Buf, Bytes};
55
use core::hash::Hash;
66
#[cfg(feature = "std")]
@@ -160,6 +160,15 @@ impl<T: Read + EncodeSize> EncodeSize for Lazy<T> {
160160
.expect("Lazy should have a value if pending is None")
161161
.encode_size()
162162
}
163+
164+
fn encode_inline_size(&self) -> usize {
165+
if self.pending.is_some() {
166+
return 0;
167+
}
168+
self.get()
169+
.expect("Lazy should have a value if pending is None")
170+
.encode_inline_size()
171+
}
163172
}
164173

165174
impl<T: Read + Write> Write for Lazy<T> {
@@ -173,6 +182,17 @@ impl<T: Read + Write> Write for Lazy<T> {
173182
.expect("Lazy should have a value if pending is None")
174183
.write(buf);
175184
}
185+
186+
fn write_bufs(&self, buf: &mut impl BufsMut) {
187+
if let Some(pending) = &self.pending {
188+
// Write raw bytes without length prefix (Bytes::write_bufs adds a length prefix)
189+
buf.push(pending.bytes.clone());
190+
return;
191+
}
192+
self.get()
193+
.expect("Lazy should have a value if pending is None")
194+
.write_bufs(buf);
195+
}
176196
}
177197

178198
impl<T: Read + FixedSize> Read for Lazy<T> {

codec/src/types/primitives.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
//! endian ambiguity.
2020
2121
use crate::{
22-
util::at_least, varint::UInt, EncodeSize, Error, FixedSize, RangeCfg, Read, ReadExt, Write,
22+
util::at_least, varint::UInt, BufsMut, EncodeSize, Error, FixedSize, RangeCfg, Read, ReadExt,
23+
Write,
2324
};
2425
use bytes::{Buf, BufMut};
2526
use core::num::{NonZeroU16, NonZeroU32, NonZeroU64};
@@ -196,13 +197,27 @@ impl<T: Write> Write for Option<T> {
196197
inner.write(buf);
197198
}
198199
}
200+
201+
#[inline]
202+
fn write_bufs(&self, buf: &mut impl BufsMut) {
203+
self.is_some().write(buf);
204+
if let Some(inner) = self {
205+
inner.write_bufs(buf);
206+
}
207+
}
199208
}
200209

201210
impl<T: EncodeSize> EncodeSize for Option<T> {
202211
#[inline]
203212
fn encode_size(&self) -> usize {
204213
self.as_ref().map_or(1, |inner| 1 + inner.encode_size())
205214
}
215+
216+
#[inline]
217+
fn encode_inline_size(&self) -> usize {
218+
self.as_ref()
219+
.map_or(1, |inner| 1 + inner.encode_inline_size())
220+
}
206221
}
207222

208223
impl<T: Read> Read for Option<T> {

0 commit comments

Comments
 (0)