Skip to content

Commit d992e13

Browse files
committed
age: Buffer as many STREAM chunks as we have logical CPUs
1 parent 4804e1e commit d992e13

File tree

3 files changed

+35
-27
lines changed

3 files changed

+35
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

age/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ i18n-embed-fl = "0.5"
7575
lazy_static = "1"
7676
rust-embed = "5"
7777

78+
# Performance
79+
num_cpus = "1.0"
80+
7881
# Common CLI dependencies
7982
console = { version = "0.14", optional = true }
8083
pinentry = { version = "0.4", optional = true }

age/src/primitives/stream.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use chacha20poly1305::{
44
aead::{generic_array::GenericArray, AeadInPlace, NewAead},
55
ChaChaPoly1305,
66
};
7+
use lazy_static::lazy_static;
78
use pin_project::pin_project;
89
use secrecy::{ExposeSecret, SecretVec};
910
use std::cmp;
@@ -24,6 +25,11 @@ const CHUNK_SIZE: usize = 64 * 1024;
2425
const TAG_SIZE: usize = 16;
2526
const ENCRYPTED_CHUNK_SIZE: usize = CHUNK_SIZE + TAG_SIZE;
2627

28+
lazy_static! {
29+
static ref CHUNKS_SIZE: usize = num_cpus::get() * CHUNK_SIZE;
30+
static ref ENCRYPTED_CHUNKS_SIZE: usize = num_cpus::get() * ENCRYPTED_CHUNK_SIZE;
31+
}
32+
2733
pub(crate) struct PayloadKey(
2834
pub(crate) GenericArray<u8, <ChaChaPoly1305<c2_chacha::Ietf> as NewAead>::KeySize>,
2935
);
@@ -112,7 +118,7 @@ impl Stream {
112118
StreamWriter {
113119
stream: Self::new(key),
114120
inner,
115-
chunks: Vec::with_capacity(CHUNK_SIZE),
121+
chunks: Vec::with_capacity(*CHUNKS_SIZE),
116122
#[cfg(feature = "async")]
117123
encrypted_chunks: None,
118124
}
@@ -130,7 +136,7 @@ impl Stream {
130136
StreamWriter {
131137
stream: Self::new(key),
132138
inner,
133-
chunks: Vec::with_capacity(CHUNK_SIZE),
139+
chunks: Vec::with_capacity(*CHUNKS_SIZE),
134140
encrypted_chunks: None,
135141
}
136142
}
@@ -146,7 +152,7 @@ impl Stream {
146152
StreamReader {
147153
stream: Self::new(key),
148154
inner,
149-
encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE],
155+
encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE],
150156
encrypted_pos: 0,
151157
start: StartPos::Implicit(0),
152158
plaintext_len: None,
@@ -167,7 +173,7 @@ impl Stream {
167173
StreamReader {
168174
stream: Self::new(key),
169175
inner,
170-
encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE],
176+
encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE],
171177
encrypted_pos: 0,
172178
start: StartPos::Implicit(0),
173179
plaintext_len: None,
@@ -284,13 +290,13 @@ impl<W: Write> Write for StreamWriter<W> {
284290
let mut bytes_written = 0;
285291

286292
while !buf.is_empty() {
287-
let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len());
293+
let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len());
288294
self.chunks.extend_from_slice(&buf[..to_write]);
289295
bytes_written += to_write;
290296
buf = &buf[to_write..];
291297

292-
// At this point, either buf is empty, or we have a full chunk.
293-
assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE);
298+
// At this point, either buf is empty, or we have a full set of chunks.
299+
assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE);
294300

295301
// Only encrypt the chunk if we have more data to write, as the last
296302
// chunk must be written in finish().
@@ -342,16 +348,16 @@ impl<W: AsyncWrite> AsyncWrite for StreamWriter<W> {
342348
) -> Poll<io::Result<usize>> {
343349
ready!(self.as_mut().poll_flush_chunk(cx))?;
344350

345-
let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len());
351+
let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len());
346352

347353
self.as_mut()
348354
.project()
349355
.chunks
350356
.extend_from_slice(&buf[..to_write]);
351357
buf = &buf[to_write..];
352358

353-
// At this point, either buf is empty, or we have a full chunk.
354-
assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE);
359+
// At this point, either buf is empty, or we have a full set of chunks.
360+
assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE);
355361

356362
// Only encrypt the chunk if we have more data to write, as the last
357363
// chunk must be written in poll_close().
@@ -445,7 +451,7 @@ impl<R> StreamReader<R> {
445451
// multiple of the chunk size. In that case, we try decrypting twice on a
446452
// decryption failure.
447453
// TODO: Generalise to multiple chunks.
448-
let last = chunks.len() < ENCRYPTED_CHUNK_SIZE;
454+
let last = chunks.len() < *ENCRYPTED_CHUNKS_SIZE;
449455

450456
self.chunks = match (self.stream.decrypt_chunks(chunks, last), last) {
451457
(Ok(chunk), _) => Some(chunk),
@@ -465,16 +471,16 @@ impl<R> StreamReader<R> {
465471
return 0;
466472
}
467473

468-
// TODO: Generalise to multiple chunks.
469-
let chunk = self.chunks.as_ref().unwrap();
470-
let cur_chunk_offset = self.cur_plaintext_pos as usize % CHUNK_SIZE;
474+
let chunks = self.chunks.as_ref().unwrap();
475+
let cur_chunks_offset = self.cur_plaintext_pos as usize % *CHUNKS_SIZE;
471476

472-
let to_read = cmp::min(chunk.expose_secret().len() - cur_chunk_offset, buf.len());
477+
let to_read = cmp::min(chunks.expose_secret().len() - cur_chunks_offset, buf.len());
473478

474-
buf[..to_read]
475-
.copy_from_slice(&chunk.expose_secret()[cur_chunk_offset..cur_chunk_offset + to_read]);
479+
buf[..to_read].copy_from_slice(
480+
&chunks.expose_secret()[cur_chunks_offset..cur_chunks_offset + to_read],
481+
);
476482
self.cur_plaintext_pos += to_read as u64;
477-
if self.cur_plaintext_pos % CHUNK_SIZE as u64 == 0 {
483+
if self.cur_plaintext_pos % *CHUNKS_SIZE as u64 == 0 {
478484
// We've finished with the current chunks.
479485
self.chunks = None;
480486
}
@@ -486,7 +492,7 @@ impl<R> StreamReader<R> {
486492
impl<R: Read> Read for StreamReader<R> {
487493
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
488494
if self.chunks.is_none() {
489-
while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE {
495+
while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE {
490496
match self
491497
.inner
492498
.read(&mut self.encrypted_chunks[self.encrypted_pos..])
@@ -514,7 +520,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for StreamReader<R> {
514520
buf: &mut [u8],
515521
) -> Poll<Result<usize, Error>> {
516522
if self.chunks.is_none() {
517-
while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE {
523+
while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE {
518524
let this = self.as_mut().project();
519525
match ready!(this
520526
.inner
@@ -630,12 +636,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {
630636
}
631637
};
632638

633-
// TODO: Generalise to multiple chunks.
634-
635-
let cur_chunk_index = self.cur_plaintext_pos / CHUNK_SIZE as u64;
639+
let cur_chunk_index = self.cur_plaintext_pos / *CHUNKS_SIZE as u64;
636640

637-
let target_chunk_index = target_pos / CHUNK_SIZE as u64;
638-
let target_chunk_offset = target_pos % CHUNK_SIZE as u64;
641+
let target_chunk_index = target_pos / *CHUNKS_SIZE as u64;
642+
let target_chunk_offset = target_pos % *CHUNKS_SIZE as u64;
639643

640644
if target_chunk_index == cur_chunk_index {
641645
// We just need to reposition ourselves within the current chunk.
@@ -646,10 +650,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {
646650

647651
// Seek to the beginning of the target chunk
648652
self.inner.seek(SeekFrom::Start(
649-
start + (target_chunk_index * ENCRYPTED_CHUNK_SIZE as u64),
653+
start + (target_chunk_index * *ENCRYPTED_CHUNKS_SIZE as u64),
650654
))?;
651655
self.stream.nonce.set_counter(target_chunk_index);
652-
self.cur_plaintext_pos = target_chunk_index * CHUNK_SIZE as u64;
656+
self.cur_plaintext_pos = target_chunk_index * *CHUNKS_SIZE as u64;
653657

654658
// Read and drop bytes from the chunk to reach the target position.
655659
if target_chunk_offset > 0 {

0 commit comments

Comments
 (0)