Skip to content

Commit a1787c9

Browse files
committed
refactor: async writer + multipart
Signed-off-by: Ion Koutsouris <[email protected]>
1 parent a5eb11a commit a1787c9

File tree

3 files changed

+170
-18
lines changed

3 files changed

+170
-18
lines changed

crates/core/src/operations/write/writer.rs

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Abstractions and implementations for writing data to delta tables
22
33
use std::collections::HashMap;
4+
use std::future::Future;
5+
use std::sync::OnceLock;
46

57
use arrow_array::RecordBatch;
68
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
@@ -9,25 +11,50 @@ use delta_kernel::expressions::Scalar;
911
use futures::{StreamExt, TryStreamExt};
1012
use indexmap::IndexMap;
1113
use object_store::{path::Path, ObjectStore};
12-
use parquet::arrow::ArrowWriter;
14+
use parquet::arrow::AsyncArrowWriter;
1315
use parquet::basic::Compression;
1416
use parquet::file::properties::WriterProperties;
17+
use tokio::task::{JoinError, JoinSet};
1518
use tracing::debug;
1619

1720
use crate::crate_version;
1821
use crate::errors::{DeltaResult, DeltaTableError};
1922
use crate::kernel::{Add, PartitionsExt};
2023
use crate::storage::ObjectStoreRef;
24+
use crate::writer::async_utils::AsyncShareableBuffer;
2125
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
2226
use crate::writer::stats::create_add;
2327
use crate::writer::utils::{
2428
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
25-
ShareableBuffer,
2629
};
2730

2831
// TODO databricks often suggests a file size of 100mb, should we set this default?
2932
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
3033
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
34+
const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;
35+
36+
fn upload_part_size() -> usize {
37+
static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
38+
*UPLOAD_SIZE.get_or_init(|| {
39+
std::env::var("DELTARS_UPLOAD_PART_SIZE")
40+
.ok()
41+
.and_then(|s| s.parse::<usize>().ok())
42+
.map(|size| {
43+
if size < DEFAULT_UPLOAD_PART_SIZE {
44+
// Minimum part size in GCS and S3
45+
debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
46+
DEFAULT_UPLOAD_PART_SIZE
47+
} else if size > 1024 * 1024 * 1024 * 5 {
48+
// Maximum part size in GCS and S3
49+
debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
50+
1024 * 1024 * 1024 * 5
51+
} else {
52+
size
53+
}
54+
})
55+
.unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
56+
})
57+
}
3158

3259
#[derive(thiserror::Error, Debug)]
3360
enum WriteError {
@@ -122,7 +149,6 @@ impl WriterConfig {
122149
}
123150
}
124151

125-
#[derive(Debug)]
126152
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
127153
pub struct DeltaWriter {
128154
/// An object store pointing at Delta table root
@@ -286,13 +312,12 @@ impl PartitionWriterConfig {
286312
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
287313
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
288314
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
289-
#[derive(Debug)]
290315
pub struct PartitionWriter {
291316
object_store: ObjectStoreRef,
292317
writer_id: uuid::Uuid,
293318
config: PartitionWriterConfig,
294-
buffer: ShareableBuffer,
295-
arrow_writer: ArrowWriter<ShareableBuffer>,
319+
buffer: AsyncShareableBuffer,
320+
arrow_writer: AsyncArrowWriter<AsyncShareableBuffer>,
296321
part_counter: usize,
297322
files_written: Vec<Add>,
298323
/// Num index cols to collect stats for
@@ -309,8 +334,8 @@ impl PartitionWriter {
309334
num_indexed_cols: i32,
310335
stats_columns: Option<Vec<String>>,
311336
) -> DeltaResult<Self> {
312-
let buffer = ShareableBuffer::default();
313-
let arrow_writer = ArrowWriter::try_new(
337+
let buffer = AsyncShareableBuffer::default();
338+
let arrow_writer = AsyncArrowWriter::try_new(
314339
buffer.clone(),
315340
config.file_schema.clone(),
316341
Some(config.writer_properties.clone()),
@@ -340,9 +365,11 @@ impl PartitionWriter {
340365
)
341366
}
342367

343-
fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
344-
let new_buffer = ShareableBuffer::default();
345-
let arrow_writer = ArrowWriter::try_new(
368+
fn reset_writer(
369+
&mut self,
370+
) -> DeltaResult<(AsyncArrowWriter<AsyncShareableBuffer>, AsyncShareableBuffer)> {
371+
let new_buffer = AsyncShareableBuffer::default();
372+
let arrow_writer = AsyncArrowWriter::try_new(
346373
new_buffer.clone(),
347374
self.config.file_schema.clone(),
348375
Some(self.config.writer_properties.clone()),
@@ -353,20 +380,20 @@ impl PartitionWriter {
353380
))
354381
}
355382

356-
fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
357-
Ok(self.arrow_writer.write(batch)?)
383+
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
384+
Ok(self.arrow_writer.write(batch).await?)
358385
}
359386

360387
async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
361388
// replace counter / buffers and close the current writer
362389
let (writer, buffer) = self.reset_writer()?;
363-
let metadata = writer.close()?;
390+
let metadata = writer.close().await?;
364391
// don't write empty file
365392
if metadata.num_rows == 0 {
366393
return Ok(());
367394
}
368395

369-
let buffer = match buffer.into_inner() {
396+
let mut buffer = match buffer.into_inner().await {
370397
Some(buffer) => Bytes::from(buffer),
371398
None => return Ok(()), // Nothing to write
372399
};
@@ -376,7 +403,33 @@ impl PartitionWriter {
376403
let file_size = buffer.len() as i64;
377404

378405
// write file to object store
379-
self.object_store.put(&path, buffer.into()).await?;
406+
let mut multi_part_upload = self.object_store.put_multipart(&path).await?;
407+
let part_size = upload_part_size();
408+
let mut tasks = JoinSet::new();
409+
let max_concurrent_tasks = 10; // TODO: make configurable
410+
411+
while buffer.len() > part_size {
412+
let part = buffer.split_to(part_size);
413+
let upload_future = multi_part_upload.put_part(part.into());
414+
415+
// wait until one spot frees up before spawning new task
416+
if tasks.len() >= max_concurrent_tasks {
417+
tasks.join_next().await;
418+
}
419+
tasks.spawn(upload_future);
420+
}
421+
422+
if !buffer.is_empty() {
423+
let upload_future = multi_part_upload.put_part(buffer.into());
424+
tasks.spawn(upload_future);
425+
}
426+
427+
// wait for all remaining tasks to complete
428+
while let Some(result) = tasks.join_next().await {
429+
result.map_err(|e| DeltaTableError::generic(e.to_string()))??;
430+
}
431+
432+
multi_part_upload.complete().await?;
380433

381434
self.files_written.push(
382435
create_add(
@@ -412,9 +465,9 @@ impl PartitionWriter {
412465
let max_offset = batch.num_rows();
413466
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
414467
let length = usize::min(self.config.write_batch_size, max_offset - offset);
415-
self.write_batch(&batch.slice(offset, length))?;
468+
self.write_batch(&batch.slice(offset, length)).await?;
416469
// flush currently buffered data to disk once we meet or exceed the target file size.
417-
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
470+
let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size();
418471
if estimated_size >= self.config.target_file_size {
419472
debug!(
420473
"Writing file with estimated size {:?} to disk.",

crates/core/src/writer/async_utils.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//! Handle JSON messages when writing to delta tables
2+
//!
3+
4+
use std::io::Write;
5+
use std::sync::Arc;
6+
7+
use futures::TryFuture;
8+
9+
use std::io::Result as IoResult;
10+
use std::pin::Pin;
11+
use std::task::{Context, Poll};
12+
use tokio::io::{AsyncWrite, AsyncWriteExt};
13+
use tokio::sync::RwLock as TokioRwLock;
14+
15+
/// An in-memory buffer that allows for shared ownership and interior mutability.
16+
/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance
17+
/// allows multiple owners to have access to the same underlying buffer.
18+
#[derive(Debug, Default, Clone)]
19+
pub struct AsyncShareableBuffer {
20+
buffer: Arc<TokioRwLock<Vec<u8>>>,
21+
}
22+
23+
impl AsyncShareableBuffer {
24+
/// Consumes this instance and returns the underlying buffer.
25+
/// Returns `None` if there are other references to the instance.
26+
pub async fn into_inner(self) -> Option<Vec<u8>> {
27+
Arc::try_unwrap(self.buffer)
28+
.ok()
29+
.map(|lock| lock.into_inner())
30+
}
31+
32+
/// Returns a clone of the underlying buffer as a `Vec`.
33+
pub async fn to_vec(&self) -> Vec<u8> {
34+
let inner = self.buffer.read().await;
35+
inner.clone()
36+
}
37+
38+
/// Returns the number of bytes in the underlying buffer.
39+
pub async fn len(&self) -> usize {
40+
let inner = self.buffer.read().await;
41+
inner.len()
42+
}
43+
44+
/// Returns `true` if the underlying buffer is empty.
45+
pub async fn is_empty(&self) -> bool {
46+
let inner = self.buffer.read().await;
47+
inner.is_empty()
48+
}
49+
50+
/// Creates a new instance with the buffer initialized from the provided bytes.
51+
pub fn from_bytes(bytes: &[u8]) -> Self {
52+
Self {
53+
buffer: Arc::new(TokioRwLock::new(bytes.to_vec())),
54+
}
55+
}
56+
}
57+
58+
impl Write for AsyncShareableBuffer {
59+
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
60+
// Blocking call! This should not be used in async contexts.
61+
// Prefer using `AsyncWrite` instead.
62+
let mut inner = futures::executor::block_on(self.buffer.write());
63+
inner.extend_from_slice(buf);
64+
Ok(buf.len())
65+
}
66+
67+
fn flush(&mut self) -> IoResult<()> {
68+
Ok(())
69+
}
70+
}
71+
72+
impl AsyncWrite for AsyncShareableBuffer {
73+
fn poll_write(
74+
self: Pin<&mut Self>,
75+
cx: &mut Context<'_>,
76+
buf: &[u8],
77+
) -> Poll<std::io::Result<usize>> {
78+
let this = self.clone();
79+
let buf = buf.to_vec();
80+
81+
let fut = async move {
82+
let mut buffer = this.buffer.write().await;
83+
buffer.extend_from_slice(&buf);
84+
Ok(buf.len())
85+
};
86+
87+
tokio::pin!(fut);
88+
fut.try_poll(cx)
89+
}
90+
91+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
92+
Poll::Ready(Ok(()))
93+
}
94+
95+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
96+
Poll::Ready(Ok(()))
97+
}
98+
}

crates/core/src/writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub use json::JsonWriter;
1616
pub use record_batch::RecordBatchWriter;
1717
pub use stats::create_add;
1818

19+
pub mod async_utils;
1920
pub mod json;
2021
pub mod record_batch;
2122
pub(crate) mod stats;

0 commit comments

Comments
 (0)