Skip to content

Commit e096e70

Browse files
ion-elgrecortyler
authored andcommitted
refactor: async writer + multipart
Signed-off-by: Ion Koutsouris <[email protected]>
1 parent 17ce84f commit e096e70

File tree

3 files changed

+153
-18
lines changed

3 files changed

+153
-18
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
//! Async Sharable Buffer for async writer
2+
//!
3+
4+
use std::sync::Arc;
5+
6+
use futures::TryFuture;
7+
8+
use std::pin::Pin;
9+
use std::task::{Context, Poll};
10+
use tokio::io::AsyncWrite;
11+
use tokio::sync::RwLock as TokioRwLock;
12+
13+
/// An in-memory buffer that allows for shared ownership and interior mutability.
14+
/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance
15+
/// allows multiple owners to have access to the same underlying buffer.
16+
#[derive(Debug, Default, Clone)]
17+
pub struct AsyncShareableBuffer {
18+
buffer: Arc<TokioRwLock<Vec<u8>>>,
19+
}
20+
21+
impl AsyncShareableBuffer {
22+
/// Consumes this instance and returns the underlying buffer.
23+
/// Returns `None` if there are other references to the instance.
24+
pub async fn into_inner(self) -> Option<Vec<u8>> {
25+
Arc::try_unwrap(self.buffer)
26+
.ok()
27+
.map(|lock| lock.into_inner())
28+
}
29+
30+
/// Returns a clone of the underlying buffer as a `Vec`.
31+
pub async fn to_vec(&self) -> Vec<u8> {
32+
let inner = self.buffer.read().await;
33+
inner.clone()
34+
}
35+
36+
/// Returns the number of bytes in the underlying buffer.
37+
pub async fn len(&self) -> usize {
38+
let inner = self.buffer.read().await;
39+
inner.len()
40+
}
41+
42+
/// Returns `true` if the underlying buffer is empty.
43+
pub async fn is_empty(&self) -> bool {
44+
let inner = self.buffer.read().await;
45+
inner.is_empty()
46+
}
47+
48+
/// Creates a new instance with the buffer initialized from the provided bytes.
49+
pub fn from_bytes(bytes: &[u8]) -> Self {
50+
Self {
51+
buffer: Arc::new(TokioRwLock::new(bytes.to_vec())),
52+
}
53+
}
54+
}
55+
56+
impl AsyncWrite for AsyncShareableBuffer {
57+
fn poll_write(
58+
self: Pin<&mut Self>,
59+
cx: &mut Context<'_>,
60+
buf: &[u8],
61+
) -> Poll<std::io::Result<usize>> {
62+
let this = self.clone();
63+
let buf = buf.to_vec();
64+
65+
let fut = async move {
66+
let mut buffer = this.buffer.write().await;
67+
buffer.extend_from_slice(&buf);
68+
Ok(buf.len())
69+
};
70+
71+
tokio::pin!(fut);
72+
fut.try_poll(cx)
73+
}
74+
75+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
76+
Poll::Ready(Ok(()))
77+
}
78+
79+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
80+
Poll::Ready(Ok(()))
81+
}
82+
}

crates/core/src/operations/write/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
//! let table = ops.write(vec![batch]).await?;
2424
//! ````
2525
26+
pub(crate) mod async_utils;
2627
pub mod configs;
2728
pub(crate) mod execution;
2829
pub(crate) mod generated_columns;

crates/core/src/operations/write/writer.rs

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Abstractions and implementations for writing data to delta tables
22
33
use std::collections::HashMap;
4+
use std::sync::OnceLock;
45

56
use arrow_array::RecordBatch;
67
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
@@ -9,11 +10,13 @@ use delta_kernel::expressions::Scalar;
910
use futures::{StreamExt, TryStreamExt};
1011
use indexmap::IndexMap;
1112
use object_store::{path::Path, ObjectStore};
12-
use parquet::arrow::ArrowWriter;
13+
use parquet::arrow::AsyncArrowWriter;
1314
use parquet::basic::Compression;
1415
use parquet::file::properties::WriterProperties;
16+
use tokio::task::JoinSet;
1517
use tracing::debug;
1618

19+
use super::async_utils::AsyncShareableBuffer;
1720
use crate::crate_version;
1821
use crate::errors::{DeltaResult, DeltaTableError};
1922
use crate::kernel::{Add, PartitionsExt};
@@ -22,12 +25,35 @@ use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
2225
use crate::writer::stats::create_add;
2326
use crate::writer::utils::{
2427
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
25-
ShareableBuffer,
2628
};
2729

2830
// TODO databricks often suggests a file size of 100mb, should we set this default?
2931
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
3032
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
33+
const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;
34+
35+
fn upload_part_size() -> usize {
36+
static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
37+
*UPLOAD_SIZE.get_or_init(|| {
38+
std::env::var("DELTARS_UPLOAD_PART_SIZE")
39+
.ok()
40+
.and_then(|s| s.parse::<usize>().ok())
41+
.map(|size| {
42+
if size < DEFAULT_UPLOAD_PART_SIZE {
43+
// Minimum part size in GCS and S3
44+
debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
45+
DEFAULT_UPLOAD_PART_SIZE
46+
} else if size > 1024 * 1024 * 1024 * 5 {
47+
// Maximum part size in GCS and S3
48+
debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
49+
1024 * 1024 * 1024 * 5
50+
} else {
51+
size
52+
}
53+
})
54+
.unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
55+
})
56+
}
3157

3258
#[derive(thiserror::Error, Debug)]
3359
enum WriteError {
@@ -122,7 +148,6 @@ impl WriterConfig {
122148
}
123149
}
124150

125-
#[derive(Debug)]
126151
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
127152
pub struct DeltaWriter {
128153
/// An object store pointing at Delta table root
@@ -286,13 +311,12 @@ impl PartitionWriterConfig {
286311
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
287312
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
288313
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
289-
#[derive(Debug)]
290314
pub struct PartitionWriter {
291315
object_store: ObjectStoreRef,
292316
writer_id: uuid::Uuid,
293317
config: PartitionWriterConfig,
294-
buffer: ShareableBuffer,
295-
arrow_writer: ArrowWriter<ShareableBuffer>,
318+
buffer: AsyncShareableBuffer,
319+
arrow_writer: AsyncArrowWriter<AsyncShareableBuffer>,
296320
part_counter: usize,
297321
files_written: Vec<Add>,
298322
/// Num index cols to collect stats for
@@ -309,8 +333,8 @@ impl PartitionWriter {
309333
num_indexed_cols: i32,
310334
stats_columns: Option<Vec<String>>,
311335
) -> DeltaResult<Self> {
312-
let buffer = ShareableBuffer::default();
313-
let arrow_writer = ArrowWriter::try_new(
336+
let buffer = AsyncShareableBuffer::default();
337+
let arrow_writer = AsyncArrowWriter::try_new(
314338
buffer.clone(),
315339
config.file_schema.clone(),
316340
Some(config.writer_properties.clone()),
@@ -340,9 +364,11 @@ impl PartitionWriter {
340364
)
341365
}
342366

343-
fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
344-
let new_buffer = ShareableBuffer::default();
345-
let arrow_writer = ArrowWriter::try_new(
367+
fn reset_writer(
368+
&mut self,
369+
) -> DeltaResult<(AsyncArrowWriter<AsyncShareableBuffer>, AsyncShareableBuffer)> {
370+
let new_buffer = AsyncShareableBuffer::default();
371+
let arrow_writer = AsyncArrowWriter::try_new(
346372
new_buffer.clone(),
347373
self.config.file_schema.clone(),
348374
Some(self.config.writer_properties.clone()),
@@ -353,20 +379,20 @@ impl PartitionWriter {
353379
))
354380
}
355381

356-
fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
357-
Ok(self.arrow_writer.write(batch)?)
382+
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
383+
Ok(self.arrow_writer.write(batch).await?)
358384
}
359385

360386
async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
361387
// replace counter / buffers and close the current writer
362388
let (writer, buffer) = self.reset_writer()?;
363-
let metadata = writer.close()?;
389+
let metadata = writer.close().await?;
364390
// don't write empty file
365391
if metadata.num_rows == 0 {
366392
return Ok(());
367393
}
368394

369-
let buffer = match buffer.into_inner() {
395+
let mut buffer = match buffer.into_inner().await {
370396
Some(buffer) => Bytes::from(buffer),
371397
None => return Ok(()), // Nothing to write
372398
};
@@ -376,7 +402,33 @@ impl PartitionWriter {
376402
let file_size = buffer.len() as i64;
377403

378404
// write file to object store
379-
self.object_store.put(&path, buffer.into()).await?;
405+
let mut multi_part_upload = self.object_store.put_multipart(&path).await?;
406+
let part_size = upload_part_size();
407+
let mut tasks = JoinSet::new();
408+
let max_concurrent_tasks = 10; // TODO: make configurable
409+
410+
while buffer.len() > part_size {
411+
let part = buffer.split_to(part_size);
412+
let upload_future = multi_part_upload.put_part(part.into());
413+
414+
// wait until one spot frees up before spawning new task
415+
if tasks.len() >= max_concurrent_tasks {
416+
tasks.join_next().await;
417+
}
418+
tasks.spawn(upload_future);
419+
}
420+
421+
if !buffer.is_empty() {
422+
let upload_future = multi_part_upload.put_part(buffer.into());
423+
tasks.spawn(upload_future);
424+
}
425+
426+
// wait for all remaining tasks to complete
427+
while let Some(result) = tasks.join_next().await {
428+
result.map_err(|e| DeltaTableError::generic(e.to_string()))??;
429+
}
430+
431+
multi_part_upload.complete().await?;
380432

381433
self.files_written.push(
382434
create_add(
@@ -412,9 +464,9 @@ impl PartitionWriter {
412464
let max_offset = batch.num_rows();
413465
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
414466
let length = usize::min(self.config.write_batch_size, max_offset - offset);
415-
self.write_batch(&batch.slice(offset, length))?;
467+
self.write_batch(&batch.slice(offset, length)).await?;
416468
// flush currently buffered data to disk once we meet or exceed the target file size.
417-
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
469+
let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size();
418470
if estimated_size >= self.config.target_file_size {
419471
debug!("Writing file with estimated size {estimated_size:?} to disk.");
420472
self.flush_arrow_writer().await?;

0 commit comments

Comments
 (0)