Skip to content

Commit 8506d6d

Browse files
committed
Stream multipart assembly instead of buffering entire file in memory
CompleteMultipartUpload previously read all temp parts into a single Vec, then wrote the assembled data via put_object (sequential 64KB writes). For a 1GB file: 1GB memory allocation + 16,384 serial round-trips. Now streams each part through a WalWriter: pipelined reads (64 per batch) from the temp part → pipelined writes (64 per batch) to a WAL temp file → atomic rename. Memory usage is bounded to one pipeline buffer (~4MB) regardless of total file size.
1 parent 264f2bb commit 8506d6d

2 files changed

Lines changed: 62 additions & 14 deletions

File tree

src/s3/router.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,21 +1075,14 @@ async fn handle_complete_multipart_upload(
10751075
}
10761076
}
10771077

1078-
// Concatenate parts in order and write the final object
1079-
let mut final_data = Vec::new();
1080-
for pn in &part_numbers {
1081-
if let Some(part) = upload.parts.get(pn) {
1082-
match state.share.read_temp(&part.temp_path).await {
1083-
Ok(data) => final_data.extend_from_slice(&data),
1084-
Err(e) => {
1085-
return io_to_s3_error(&e);
1086-
}
1087-
}
1088-
}
1089-
}
1078+
// Stream parts through a WAL writer (pipelined reads → pipelined writes
1079+
// → atomic rename). Never buffers the whole file in memory.
1080+
let temp_paths: Vec<&str> = part_numbers
1081+
.iter()
1082+
.filter_map(|pn| upload.parts.get(pn).map(|p| p.temp_path.as_str()))
1083+
.collect();
10901084

1091-
// Write the assembled object
1092-
let meta = match state.share.put_object(key, &final_data).await {
1085+
let meta = match state.share.assemble_parts(key, &temp_paths).await {
10931086
Ok(m) => m,
10941087
Err(e) => return io_to_s3_error(&e),
10951088
};

src/smb/ops.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,61 @@ impl ShareSession {
461461
Ok(data)
462462
}
463463

464+
/// Assemble multipart upload parts into a single file via streaming.
465+
///
466+
/// Reads each temp part using pipelined reads and writes through a WalWriter
467+
/// (pipelined writes + atomic rename). Never holds more than one pipeline
468+
/// buffer in memory — supports arbitrarily large files.
469+
pub async fn assemble_parts(
470+
&self,
471+
key: &str,
472+
temp_paths: &[&str],
473+
) -> io::Result<ObjectMeta> {
474+
let mut wal = self.open_wal_write(key).await?;
475+
let max_read = self.pool.max_read_size;
476+
477+
for &temp_path in temp_paths {
478+
// Open the part file for reading on any pool connection
479+
let (client, tree_id) = self.pick();
480+
let cr = client
481+
.create(
482+
tree_id,
483+
temp_path,
484+
DesiredAccess::GenericRead as u32,
485+
ShareAccess::All as u32,
486+
CreateDisposition::Open as u32,
487+
CreateOptions::NonDirectoryFile as u32,
488+
)
489+
.await?;
490+
491+
let file_size = cr.file_size;
492+
let file_id = cr.file_id;
493+
494+
// Stream part data to the WalWriter using pipelined reads
495+
let mut offset = 0u64;
496+
while offset < file_size {
497+
let remaining = file_size - offset;
498+
let batch = remaining
499+
.div_ceil(max_read as u64)
500+
.min(PIPELINE_DEPTH as u64) as usize;
501+
let chunks = client
502+
.pipelined_read(tree_id, &file_id, offset, max_read, batch)
503+
.await?;
504+
if chunks.is_empty() {
505+
break;
506+
}
507+
for chunk in &chunks {
508+
wal.write(chunk).await?;
509+
offset += chunk.len() as u64;
510+
}
511+
}
512+
513+
let _ = client.close(tree_id, &file_id).await;
514+
}
515+
516+
wal.commit(self).await
517+
}
518+
464519
/// Delete a temp file (best effort).
465520
pub async fn delete_temp(&self, smb_path: &str) {
466521
let (client, tree_id) = self.pick();

0 commit comments

Comments
 (0)