Skip to content

Commit 0a9f647

Browse files
Yaminyaminureyes
andauthored
perf: stream SFTP uploads/downloads instead of buffering whole file (#195)
* perf: stream SFTP uploads/downloads instead of buffering whole file Upload (`upload_file`, `upload_dir_recursive`) used `tokio::fs::read` to load the entire local file into a `Vec<u8>` before calling `write_all`, and download (`download_file`, `download_dir_recursive`) used `read_to_end` into a pooled buffer + `clone()` to a separate `Vec` before writing locally. For multi-GB transfers this means peak RSS scales with file size and large files OOM the client. Replace each path with a small `stream_copy()` helper that loops on 256 KiB reads and writes through the existing `AsyncRead`/`AsyncWrite` implementations on `tokio::fs::File` and `russh_sftp::client::fs::File`. Buffer size matches the SFTP MAX_WRITE_LENGTH so each chunk maps to a single SFTP packet without further fragmentation. Verified locally on macOS arm64 against `bssh-server` v2.1.3 over loopback with a 1 GiB file: | Op | Build | real | RSS | |----------|------------|---------|----------| | upload | unpatched | 38.65s | 3.23 GB | | upload | streaming | 3.47s | 20 MB | | download | unpatched | 3.93s | 2.17 GB | | download | streaming | 3.41s | 16 MB | Peak RSS drops ~160x and uploads complete ~11x faster (a single multi-MB `write_all` apparently serializes much worse through the SFTP pipeline than 256 KiB chunked writes). * fix: align SFTP streaming chunking Match the streaming buffer to russh-sftp's 255 KiB packet limit so each chunk stays within one SFTP read/write request, explicitly close downloaded remote handles after successful copies, and cover the chunking behavior with a focused unit test. --------- Co-authored-by: Jeongkyu Shin <inureyes@gmail.com>
1 parent 27d20e4 commit 0a9f647

3 files changed

Lines changed: 103 additions & 25 deletions

File tree

ARCHITECTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ bssh (Backend.AI SSH / Broadcast SSH) is a high-performance parallel SSH command
1212
- SSH jump host support (-J)
1313
- SSH configuration file parsing (-F)
1414
- Interactive PTY sessions with single/multiplex modes
15-
- SFTP file transfers (upload/download)
15+
- SFTP file transfers (upload/download) with chunked streaming
1616
- Backend.AI cluster auto-detection
1717
- pdsh compatibility mode
1818

docs/architecture/ssh-client.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
- Support for SSH agent, key-based, and password authentication
3030
- Configurable timeouts and retry logic
3131
- Full SFTP support for file transfers
32+
- SFTP uploads/downloads stream file payloads in 255 KiB chunks, matching the
33+
default russh-sftp read/write packet limit and avoiding whole-file buffering
3234
- SSH keepalive support via `SshConnectionConfig`:
3335
- `keepalive_interval`: Interval between keepalive packets (default: 60s, 0 to disable)
3436
- `keepalive_max`: Maximum unanswered keepalive packets before disconnect (default: 3)

src/ssh/tokio_client/file_transfer.rs

Lines changed: 100 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,31 @@ use russh_sftp::{client::SftpSession, protocol::OpenFlags};
2323
use std::path::Path;
2424
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2525

26+
/// Chunk size used for streaming SFTP uploads/downloads.
27+
///
28+
/// Sized to match russh-sftp's default MAX_WRITE_LENGTH (255 KiB) so each
29+
/// chunk maps to a single SFTP WRITE/READ packet without further fragmentation.
30+
const STREAM_CHUNK_SIZE: usize = 255 * 1024;
31+
32+
/// Stream `reader` to `writer` in fixed-size chunks so a single transfer never
33+
/// holds more than `STREAM_CHUNK_SIZE` of file payload in memory at once.
34+
async fn stream_copy<R, W>(reader: &mut R, writer: &mut W) -> std::io::Result<()>
35+
where
36+
R: tokio::io::AsyncRead + Unpin,
37+
W: tokio::io::AsyncWrite + Unpin,
38+
{
39+
let mut buf = vec![0u8; STREAM_CHUNK_SIZE];
40+
loop {
41+
let n = reader.read(&mut buf).await?;
42+
if n == 0 {
43+
break;
44+
}
45+
writer.write_all(&buf[..n]).await?;
46+
}
47+
Ok(())
48+
}
49+
2650
use super::connection::Client;
27-
use crate::utils::buffer_pool::global;
2851

2952
impl Client {
3053
/// Upload a file with sftp to the remote server.
@@ -46,19 +69,18 @@ impl Client {
4669
channel.request_subsystem(true, "sftp").await?;
4770
let sftp = SftpSession::new(channel.into_stream()).await?;
4871

49-
// read file contents locally
50-
let file_contents = tokio::fs::read(src_file_path)
72+
// Open local file for streaming reads (avoids loading whole file in memory).
73+
let mut local_file = tokio::fs::File::open(src_file_path)
5174
.await
5275
.map_err(super::Error::IoError)?;
5376

54-
// interaction with i/o
5577
let mut file = sftp
5678
.open_with_flags(
5779
dest_file_path,
5880
OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE | OpenFlags::READ,
5981
)
6082
.await?;
61-
file.write_all(&file_contents)
83+
stream_copy(&mut local_file, &mut file)
6284
.await
6385
.map_err(super::Error::IoError)?;
6486
file.flush().await.map_err(super::Error::IoError)?;
@@ -89,18 +111,16 @@ impl Client {
89111
.open_with_flags(remote_file_path, OpenFlags::READ)
90112
.await?;
91113

92-
// Use pooled buffer for reading file contents to reduce allocations
93-
let mut pooled_buffer = global::get_large_buffer();
94-
remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?;
95-
let contents = pooled_buffer.as_vec().clone(); // Clone to owned Vec for writing
96-
97-
// write contents to local file
114+
// Stream remote file directly to local disk to avoid buffering the
115+
// whole file in memory.
98116
let mut local_file = tokio::fs::File::create(local_file_path.as_ref())
99117
.await
100118
.map_err(super::Error::IoError)?;
101-
102-
local_file
103-
.write_all(&contents)
119+
stream_copy(&mut remote_file, &mut local_file)
120+
.await
121+
.map_err(super::Error::IoError)?;
122+
remote_file
123+
.shutdown()
104124
.await
105125
.map_err(super::Error::IoError)?;
106126
local_file.flush().await.map_err(super::Error::IoError)?;
@@ -173,8 +193,9 @@ impl Client {
173193
let _ = sftp.create_dir(&remote_path).await; // Ignore error if already exists
174194
self.upload_dir_recursive(sftp, &path, &remote_path).await?;
175195
} else if metadata.is_file() {
176-
// Upload file
177-
let file_contents = tokio::fs::read(&path)
196+
// Stream local file to remote in chunks instead of loading
197+
// the entire file in memory before send.
198+
let mut local_file = tokio::fs::File::open(&path)
178199
.await
179200
.map_err(super::Error::IoError)?;
180201

@@ -185,8 +206,7 @@ impl Client {
185206
)
186207
.await?;
187208

188-
remote_file
189-
.write_all(&file_contents)
209+
stream_copy(&mut local_file, &mut remote_file)
190210
.await
191211
.map_err(super::Error::IoError)?;
192212
remote_file.flush().await.map_err(super::Error::IoError)?;
@@ -265,21 +285,77 @@ impl Client {
265285
self.download_dir_recursive(sftp, &remote_path, &local_path)
266286
.await?;
267287
} else if metadata.file_type().is_file() {
268-
// Download file using pooled buffer
288+
// Stream remote file directly to local disk in chunks.
269289
let mut remote_file =
270290
sftp.open_with_flags(&remote_path, OpenFlags::READ).await?;
271291

272-
let mut pooled_buffer = global::get_large_buffer();
273-
remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?;
274-
let contents = pooled_buffer.as_vec().clone();
275-
276-
tokio::fs::write(&local_path, contents)
292+
let mut local_file = tokio::fs::File::create(&local_path)
293+
.await
294+
.map_err(super::Error::IoError)?;
295+
stream_copy(&mut remote_file, &mut local_file)
296+
.await
297+
.map_err(super::Error::IoError)?;
298+
remote_file
299+
.shutdown()
277300
.await
278301
.map_err(super::Error::IoError)?;
302+
local_file.flush().await.map_err(super::Error::IoError)?;
279303
}
280304
}
281305

282306
Ok(())
283307
})
284308
}
285309
}
310+
311+
#[cfg(test)]
312+
mod tests {
313+
use super::*;
314+
use std::{
315+
io::Cursor,
316+
pin::Pin,
317+
task::{Context, Poll},
318+
};
319+
use tokio::io::AsyncWrite;
320+
321+
#[derive(Default)]
322+
struct RecordingWriter {
323+
bytes: Vec<u8>,
324+
write_lengths: Vec<usize>,
325+
}
326+
327+
impl AsyncWrite for RecordingWriter {
328+
fn poll_write(
329+
mut self: Pin<&mut Self>,
330+
_cx: &mut Context<'_>,
331+
buf: &[u8],
332+
) -> Poll<std::io::Result<usize>> {
333+
self.write_lengths.push(buf.len());
334+
self.bytes.extend_from_slice(buf);
335+
Poll::Ready(Ok(buf.len()))
336+
}
337+
338+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
339+
Poll::Ready(Ok(()))
340+
}
341+
342+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
343+
Poll::Ready(Ok(()))
344+
}
345+
}
346+
347+
#[tokio::test]
348+
async fn stream_copy_writes_sftp_sized_chunks() {
349+
let input = vec![0xAB; STREAM_CHUNK_SIZE * 2 + 17];
350+
let mut reader = Cursor::new(input.clone());
351+
let mut writer = RecordingWriter::default();
352+
353+
stream_copy(&mut reader, &mut writer).await.unwrap();
354+
355+
assert_eq!(writer.bytes, input);
356+
assert_eq!(
357+
writer.write_lengths,
358+
vec![STREAM_CHUNK_SIZE, STREAM_CHUNK_SIZE, 17]
359+
);
360+
}
361+
}

0 commit comments

Comments
 (0)