Skip to content

Commit 4e85927

Browse files
[stream] use buffer pool (#3033)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent bdfc9d5 commit 4e85927

14 files changed

Lines changed: 415 additions & 89 deletions

File tree

cryptography/src/handshake.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ mod key_exchange;
4949
use key_exchange::{EphemeralPublicKey, SecretKey};
5050

5151
mod cipher;
52-
pub use cipher::{RecvCipher, SendCipher, CIPHERTEXT_OVERHEAD};
52+
pub use cipher::{RecvCipher, SendCipher, TAG_SIZE};
5353

5454
#[cfg(all(test, feature = "arbitrary"))]
5555
mod conformance;

cryptography/src/handshake/cipher.rs

Lines changed: 153 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ use rand_core::CryptoRngCore;
44
use std::vec::Vec;
55
use zeroize::Zeroizing;
66

7-
/// The amount of overhead in a ciphertext, compared to the plain message.
8-
/// ChaCha20-Poly1305 uses a 128-bit (16 byte) authentication tag.
9-
pub const CIPHERTEXT_OVERHEAD: usize = 16;
7+
/// Size of the ChaCha20-Poly1305 authentication tag.
8+
///
9+
/// This tag is the overhead added to each ciphertext and must be transmitted
10+
/// alongside it for the receiver to verify integrity and authenticity.
11+
pub const TAG_SIZE: usize = 16;
1012

1113
/// How many bytes are in a nonce.
1214
/// ChaCha20-Poly1305 uses a 96-bit (12 byte) nonce.
@@ -55,36 +57,33 @@ cfg_if::cfg_if! {
5557
Self(LessSafeKey::new(unbound_key))
5658
}
5759

58-
fn encrypt(
60+
fn encrypt_in_place(
5961
&self,
6062
nonce: &[u8; NONCE_SIZE_BYTES],
61-
data: &[u8],
62-
) -> Result<Vec<u8>, Error> {
63+
data: &mut [u8],
64+
) -> Result<[u8; TAG_SIZE], Error> {
6365
let nonce = aead::Nonce::assume_unique_for_key(*nonce);
64-
let mut scratch = Vec::with_capacity(data.len() + CIPHERTEXT_OVERHEAD);
65-
scratch.extend_from_slice(data);
66-
self.0
67-
.seal_in_place_append_tag(nonce, aead::Aad::empty(), &mut scratch)
66+
let tag = self
67+
.0
68+
.seal_in_place_separate_tag(nonce, aead::Aad::empty(), data)
6869
.map_err(|_| Error::EncryptionFailed)?;
69-
Ok(scratch)
70+
Ok(tag.as_ref().try_into().expect("tag size mismatch"))
7071
}
7172

72-
fn decrypt(
73+
fn decrypt_in_place(
7374
&self,
7475
nonce: &[u8; NONCE_SIZE_BYTES],
75-
data: &[u8],
76-
) -> Result<Vec<u8>, Error> {
76+
data: &mut [u8],
77+
) -> Result<usize, Error> {
7778
let nonce = aead::Nonce::assume_unique_for_key(*nonce);
78-
let mut scratch = data.to_vec();
7979
self.0
80-
.open_in_place(nonce, aead::Aad::empty(), &mut scratch)
80+
.open_in_place(nonce, aead::Aad::empty(), data)
8181
.map_err(|_| Error::DecryptionFailed)?;
82-
scratch.truncate(data.len() - CIPHERTEXT_OVERHEAD);
83-
Ok(scratch)
82+
Ok(data.len() - TAG_SIZE)
8483
}
8584
}
8685
} else {
87-
use chacha20poly1305::{aead::Aead, ChaCha20Poly1305, KeyInit as _};
86+
use chacha20poly1305::{aead::AeadInPlace, ChaCha20Poly1305, KeyInit as _};
8887

8988
struct Cipher(ChaCha20Poly1305);
9089

@@ -93,24 +92,36 @@ cfg_if::cfg_if! {
9392
Self(ChaCha20Poly1305::new(key.into()))
9493
}
9594

96-
fn encrypt(
95+
fn encrypt_in_place(
9796
&self,
9897
nonce: &[u8; NONCE_SIZE_BYTES],
99-
data: &[u8],
100-
) -> Result<Vec<u8>, Error> {
101-
self.0
102-
.encrypt(nonce.into(), data)
103-
.map_err(|_| Error::EncryptionFailed)
98+
data: &mut [u8],
99+
) -> Result<[u8; TAG_SIZE], Error> {
100+
let tag = self
101+
.0
102+
.encrypt_in_place_detached(nonce.into(), &[], data)
103+
.map_err(|_| Error::EncryptionFailed)?;
104+
Ok(tag.into())
104105
}
105106

106-
fn decrypt(
107+
fn decrypt_in_place(
107108
&self,
108109
nonce: &[u8; NONCE_SIZE_BYTES],
109-
data: &[u8],
110-
) -> Result<Vec<u8>, Error> {
110+
data: &mut [u8],
111+
) -> Result<usize, Error> {
112+
let plaintext_len = data.len() - TAG_SIZE;
113+
let tag: [u8; TAG_SIZE] = data[plaintext_len..]
114+
.try_into()
115+
.map_err(|_| Error::DecryptionFailed)?;
111116
self.0
112-
.decrypt(nonce.into(), data)
113-
.map_err(|_| Error::DecryptionFailed)
117+
.decrypt_in_place_detached(
118+
nonce.into(),
119+
&[],
120+
&mut data[..plaintext_len],
121+
&tag.into(),
122+
)
123+
.map_err(|_| Error::DecryptionFailed)?;
124+
Ok(plaintext_len)
114125
}
115126
}
116127
}
@@ -133,10 +144,23 @@ impl SendCipher {
133144
}
134145
}
135146

147+
/// Encrypts `data` in-place and returns the authentication tag.
148+
///
149+
/// The caller is responsible for appending the returned tag to the buffer.
150+
#[inline]
151+
pub fn send_in_place(&mut self, data: &mut [u8]) -> Result<[u8; TAG_SIZE], Error> {
152+
let nonce = self.nonce.inc()?;
153+
self.inner
154+
.expose(|cipher| cipher.encrypt_in_place(&nonce, data))
155+
}
156+
136157
/// Encrypts data and returns the ciphertext.
137158
pub fn send(&mut self, data: &[u8]) -> Result<Vec<u8>, Error> {
138-
let nonce = self.nonce.inc()?;
139-
self.inner.expose(|cipher| cipher.encrypt(&nonce, data))
159+
let mut buf = vec![0u8; data.len() + TAG_SIZE];
160+
buf[..data.len()].copy_from_slice(data);
161+
let tag = self.send_in_place(&mut buf[..data.len()])?;
162+
buf[data.len()..].copy_from_slice(&tag);
163+
Ok(buf)
140164
}
141165
}
142166

@@ -157,6 +181,33 @@ impl RecvCipher {
157181
}
158182
}
159183

184+
/// Decrypts `encrypted_data` in-place and returns the plaintext length.
185+
///
186+
/// The buffer must contain ciphertext with the authentication tag appended
187+
/// (last `TAG_SIZE` bytes). After decryption, the plaintext is in
188+
/// `encrypted_data[..returned_len]`.
189+
///
190+
/// # Errors
191+
///
192+
/// Returns an error if:
193+
/// - `encrypted_data.len() < TAG_SIZE`
194+
/// - Too many messages have been received with this cipher
195+
/// - The ciphertext was corrupted or tampered with
196+
///
197+
/// In the last two cases, the `RecvCipher` will no longer be able to return
198+
/// valid ciphertexts, and will always return an error on subsequent calls
199+
/// to [`Self::recv`]. Terminating (and optionally reestablishing) the connection
200+
/// is a simple (and safe) way to handle this scenario.
201+
#[inline]
202+
pub fn recv_in_place(&mut self, encrypted_data: &mut [u8]) -> Result<usize, Error> {
203+
if encrypted_data.len() < TAG_SIZE {
204+
return Err(Error::DecryptionFailed);
205+
}
206+
let nonce = self.nonce.inc()?;
207+
self.inner
208+
.expose(|cipher| cipher.decrypt_in_place(&nonce, encrypted_data))
209+
}
210+
160211
/// Decrypts ciphertext and returns the original data.
161212
///
162213
/// # Errors
@@ -171,9 +222,10 @@ impl RecvCipher {
171222
/// to [`Self::recv`]. Terminating (and optionally reestablishing) the connection
172223
/// is a simple (and safe) way to handle this scenario.
173224
pub fn recv(&mut self, encrypted_data: &[u8]) -> Result<Vec<u8>, Error> {
174-
let nonce = self.nonce.inc()?;
175-
self.inner
176-
.expose(|cipher| cipher.decrypt(&nonce, encrypted_data))
225+
let mut buf = encrypted_data.to_vec();
226+
let plaintext_len = self.recv_in_place(&mut buf)?;
227+
buf.truncate(plaintext_len);
228+
Ok(buf)
177229
}
178230
}
179231

@@ -189,7 +241,7 @@ mod tests {
189241

190242
let plaintext = b"hello world";
191243
let ciphertext = send.send(plaintext).unwrap();
192-
assert_eq!(ciphertext.len(), plaintext.len() + CIPHERTEXT_OVERHEAD);
244+
assert_eq!(ciphertext.len(), plaintext.len() + TAG_SIZE);
193245

194246
let decrypted = recv.recv(&ciphertext).unwrap();
195247
assert_eq!(decrypted, plaintext);
@@ -211,7 +263,7 @@ mod tests {
211263
fn test_recv_ciphertext_too_short() {
212264
let mut rng = test_rng();
213265
let mut recv = RecvCipher::new(&mut rng);
214-
let short_data = vec![0u8; CIPHERTEXT_OVERHEAD - 1];
266+
let short_data = vec![0u8; TAG_SIZE - 1];
215267
assert!(matches!(
216268
recv.recv(&short_data),
217269
Err(Error::DecryptionFailed)
@@ -222,7 +274,70 @@ mod tests {
222274
fn test_recv_ciphertext_exactly_overhead() {
223275
let mut rng = test_rng();
224276
let mut recv = RecvCipher::new(&mut rng);
225-
let tag_only = vec![0u8; CIPHERTEXT_OVERHEAD];
277+
let tag_only = vec![0u8; TAG_SIZE];
226278
assert!(matches!(recv.recv(&tag_only), Err(Error::DecryptionFailed)));
227279
}
280+
281+
#[test]
282+
fn test_send_recv_in_place_roundtrip() {
283+
let mut send = SendCipher::new(&mut test_rng());
284+
let mut recv = RecvCipher::new(&mut test_rng());
285+
286+
let plaintext = b"hello world";
287+
let mut buf = vec![0u8; plaintext.len() + TAG_SIZE];
288+
buf[..plaintext.len()].copy_from_slice(plaintext);
289+
290+
// Encrypt plaintext in place, get tag back
291+
let tag = send.send_in_place(&mut buf[..plaintext.len()]).unwrap();
292+
// Append tag to buffer
293+
buf[plaintext.len()..].copy_from_slice(&tag);
294+
295+
// Decrypt ciphertext+tag in place, get plaintext length back
296+
let plaintext_len = recv.recv_in_place(&mut buf).unwrap();
297+
298+
assert_eq!(plaintext_len, plaintext.len());
299+
assert_eq!(&buf[..plaintext_len], plaintext);
300+
}
301+
302+
#[test]
303+
fn test_recv_in_place_ciphertext_too_short() {
304+
let mut recv = RecvCipher::new(&mut test_rng());
305+
306+
// Buffer smaller than tag size
307+
let mut buf = vec![0u8; TAG_SIZE - 1];
308+
assert!(matches!(
309+
recv.recv_in_place(&mut buf),
310+
Err(Error::DecryptionFailed)
311+
));
312+
}
313+
314+
#[test]
315+
fn test_send_in_place_recv_compatibility() {
316+
let mut send = SendCipher::new(&mut test_rng());
317+
let mut recv = RecvCipher::new(&mut test_rng());
318+
319+
let plaintext = b"cross-api test";
320+
let mut buf = vec![0u8; plaintext.len() + TAG_SIZE];
321+
buf[..plaintext.len()].copy_from_slice(plaintext);
322+
323+
let tag = send.send_in_place(&mut buf[..plaintext.len()]).unwrap();
324+
buf[plaintext.len()..].copy_from_slice(&tag);
325+
326+
// Use allocating recv on in-place encrypted data
327+
let decrypted = recv.recv(&buf).unwrap();
328+
assert_eq!(decrypted, plaintext);
329+
}
330+
331+
#[test]
332+
fn test_send_recv_in_place_compatibility() {
333+
let mut send = SendCipher::new(&mut test_rng());
334+
let mut recv = RecvCipher::new(&mut test_rng());
335+
336+
let plaintext = b"cross-api test";
337+
let mut ciphertext = send.send(plaintext).unwrap();
338+
339+
// Use in-place recv on allocating send data
340+
let plaintext_len = recv.recv_in_place(&mut ciphertext).unwrap();
341+
assert_eq!(&ciphertext[..plaintext_len], plaintext);
342+
}
228343
}

p2p/src/authenticated/discovery/actors/dialer.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::authenticated::{
1414
use commonware_cryptography::Signer;
1515
use commonware_macros::select_loop;
1616
use commonware_runtime::{
17-
spawn_cell, Clock, ContextCell, Handle, Metrics, Network, Resolver, SinkOf, Spawner, StreamOf,
17+
spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Resolver, SinkOf,
18+
Spawner, StreamOf,
1819
};
1920
use commonware_stream::encrypted::{dial, Config as StreamConfig};
2021
use commonware_utils::SystemTimeExt;
@@ -64,7 +65,11 @@ pub struct Actor<E: Spawner + Clock + Network + Resolver + Metrics, C: Signer> {
6465
attempts: Family<metrics::Peer, Counter>,
6566
}
6667

67-
impl<E: Spawner + Clock + Network + Resolver + CryptoRngCore + Metrics, C: Signer> Actor<E, C> {
68+
impl<
69+
E: Spawner + BufferPooler + Clock + Network + Resolver + CryptoRngCore + Metrics,
70+
C: Signer,
71+
> Actor<E, C>
72+
{
6873
pub fn new(context: E, cfg: Config<C>) -> Self {
6974
let attempts = Family::<metrics::Peer, Counter>::default();
7075
context.register(

p2p/src/authenticated/discovery/actors/listener.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use crate::authenticated::{
88
use commonware_cryptography::Signer;
99
use commonware_macros::select_loop;
1010
use commonware_runtime::{
11-
spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota,
12-
SinkOf, Spawner, StreamOf,
11+
spawn_cell, BufferPooler, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics,
12+
Network, Quota, SinkOf, Spawner, StreamOf,
1313
};
1414
use commonware_stream::encrypted::{listen, Config as StreamConfig};
1515
use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt};
@@ -34,7 +34,7 @@ pub struct Config<C: Signer> {
3434
pub allowed_handshake_rate_per_subnet: Quota,
3535
}
3636

37-
pub struct Actor<E: Spawner + Clock + Network + CryptoRngCore + Metrics, C: Signer> {
37+
pub struct Actor<E: Spawner + BufferPooler + Clock + Network + CryptoRngCore + Metrics, C: Signer> {
3838
context: ContextCell<E>,
3939

4040
address: SocketAddr,
@@ -49,7 +49,7 @@ pub struct Actor<E: Spawner + Clock + Network + CryptoRngCore + Metrics, C: Sign
4949
handshakes_subnet_rate_limited: Counter,
5050
}
5151

52-
impl<E: Spawner + Clock + Network + CryptoRngCore + Metrics, C: Signer> Actor<E, C> {
52+
impl<E: Spawner + BufferPooler + Clock + Network + CryptoRngCore + Metrics, C: Signer> Actor<E, C> {
5353
pub fn new(context: E, cfg: Config<C>) -> Self {
5454
// Create metrics
5555
let handshakes_blocked = Counter::default();

p2p/src/authenticated/discovery/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ mod tests {
242242
use commonware_cryptography::{ed25519, Signer as _};
243243
use commonware_macros::{select, select_loop, test_group, test_traced};
244244
use commonware_runtime::{
245-
count_running_tasks, deterministic, tokio, Clock, Handle, IoBuf, Metrics,
245+
count_running_tasks, deterministic, tokio, BufferPooler, Clock, Handle, IoBuf, Metrics,
246246
Network as RNetwork, Quota, Resolver, Runner, Spawner,
247247
};
248248
use commonware_utils::{channel::mpsc, hostname, ordered::Set, TryCollect, NZU32};
@@ -284,7 +284,7 @@ mod tests {
284284
/// We set a unique `base_port` for each test to avoid "address already in use"
285285
/// errors when tests are run immediately after each other.
286286
async fn run_network(
287-
context: impl Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
287+
context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
288288
max_message_size: u32,
289289
base_port: u16,
290290
n: usize,

p2p/src/authenticated/discovery/network.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use crate::{
1313
use commonware_cryptography::Signer;
1414
use commonware_macros::select;
1515
use commonware_runtime::{
16-
spawn_cell, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota, Resolver, Spawner,
16+
spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
17+
Resolver, Spawner,
1718
};
1819
use commonware_stream::encrypted::Config as StreamConfig;
1920
use commonware_utils::union;
@@ -27,7 +28,10 @@ const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
2728
const STREAM_SUFFIX: &[u8] = b"_STREAM";
2829

2930
/// Implementation of an `authenticated` network.
30-
pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> {
31+
pub struct Network<
32+
E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
33+
C: Signer,
34+
> {
3135
context: ContextCell<E>,
3236
cfg: Config<C>,
3337

@@ -39,7 +43,11 @@ pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Me
3943
info_verifier: InfoVerifier<C::PublicKey>,
4044
}
4145

42-
impl<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> Network<E, C> {
46+
impl<
47+
E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
48+
C: Signer,
49+
> Network<E, C>
50+
{
4351
/// Create a new instance of an `authenticated` network.
4452
///
4553
/// # Parameters

0 commit comments

Comments
 (0)