1
1
//! Abstractions and implementations for writing data to delta tables
2
2
3
3
use std:: collections:: HashMap ;
4
+ use std:: sync:: OnceLock ;
4
5
5
6
use arrow_array:: RecordBatch ;
6
7
use arrow_schema:: { ArrowError , SchemaRef as ArrowSchemaRef } ;
@@ -9,11 +10,13 @@ use delta_kernel::expressions::Scalar;
9
10
use futures:: { StreamExt , TryStreamExt } ;
10
11
use indexmap:: IndexMap ;
11
12
use object_store:: { path:: Path , ObjectStore } ;
12
- use parquet:: arrow:: ArrowWriter ;
13
+ use parquet:: arrow:: AsyncArrowWriter ;
13
14
use parquet:: basic:: Compression ;
14
15
use parquet:: file:: properties:: WriterProperties ;
16
+ use tokio:: task:: JoinSet ;
15
17
use tracing:: debug;
16
18
19
+ use super :: async_utils:: AsyncShareableBuffer ;
17
20
use crate :: crate_version;
18
21
use crate :: errors:: { DeltaResult , DeltaTableError } ;
19
22
use crate :: kernel:: { Add , PartitionsExt } ;
@@ -22,12 +25,35 @@ use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
22
25
use crate :: writer:: stats:: create_add;
23
26
use crate :: writer:: utils:: {
24
27
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
25
- ShareableBuffer ,
26
28
} ;
27
29
28
30
// TODO databricks often suggests a file size of 100mb, should we set this default?
29
31
const DEFAULT_TARGET_FILE_SIZE : usize = 104_857_600 ;
30
32
const DEFAULT_WRITE_BATCH_SIZE : usize = 1024 ;
33
+ const DEFAULT_UPLOAD_PART_SIZE : usize = 1024 * 1024 * 5 ;
34
+
35
+ fn upload_part_size ( ) -> usize {
36
+ static UPLOAD_SIZE : OnceLock < usize > = OnceLock :: new ( ) ;
37
+ * UPLOAD_SIZE . get_or_init ( || {
38
+ std:: env:: var ( "DELTARS_UPLOAD_PART_SIZE" )
39
+ . ok ( )
40
+ . and_then ( |s| s. parse :: < usize > ( ) . ok ( ) )
41
+ . map ( |size| {
42
+ if size < DEFAULT_UPLOAD_PART_SIZE {
43
+ // Minimum part size in GCS and S3
44
+ debug ! ( "DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB." ) ;
45
+ DEFAULT_UPLOAD_PART_SIZE
46
+ } else if size > 1024 * 1024 * 1024 * 5 {
47
+ // Maximum part size in GCS and S3
48
+ debug ! ( "DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB." ) ;
49
+ 1024 * 1024 * 1024 * 5
50
+ } else {
51
+ size
52
+ }
53
+ } )
54
+ . unwrap_or ( DEFAULT_UPLOAD_PART_SIZE )
55
+ } )
56
+ }
31
57
32
58
#[ derive( thiserror:: Error , Debug ) ]
33
59
enum WriteError {
@@ -122,7 +148,6 @@ impl WriterConfig {
122
148
}
123
149
}
124
150
125
- #[ derive( Debug ) ]
126
151
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
127
152
pub struct DeltaWriter {
128
153
/// An object store pointing at Delta table root
@@ -286,13 +311,12 @@ impl PartitionWriterConfig {
286
311
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
287
312
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
288
313
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
289
- #[ derive( Debug ) ]
290
314
pub struct PartitionWriter {
291
315
object_store : ObjectStoreRef ,
292
316
writer_id : uuid:: Uuid ,
293
317
config : PartitionWriterConfig ,
294
- buffer : ShareableBuffer ,
295
- arrow_writer : ArrowWriter < ShareableBuffer > ,
318
+ buffer : AsyncShareableBuffer ,
319
+ arrow_writer : AsyncArrowWriter < AsyncShareableBuffer > ,
296
320
part_counter : usize ,
297
321
files_written : Vec < Add > ,
298
322
/// Num index cols to collect stats for
@@ -309,8 +333,8 @@ impl PartitionWriter {
309
333
num_indexed_cols : i32 ,
310
334
stats_columns : Option < Vec < String > > ,
311
335
) -> DeltaResult < Self > {
312
- let buffer = ShareableBuffer :: default ( ) ;
313
- let arrow_writer = ArrowWriter :: try_new (
336
+ let buffer = AsyncShareableBuffer :: default ( ) ;
337
+ let arrow_writer = AsyncArrowWriter :: try_new (
314
338
buffer. clone ( ) ,
315
339
config. file_schema . clone ( ) ,
316
340
Some ( config. writer_properties . clone ( ) ) ,
@@ -340,9 +364,11 @@ impl PartitionWriter {
340
364
)
341
365
}
342
366
343
- fn reset_writer ( & mut self ) -> DeltaResult < ( ArrowWriter < ShareableBuffer > , ShareableBuffer ) > {
344
- let new_buffer = ShareableBuffer :: default ( ) ;
345
- let arrow_writer = ArrowWriter :: try_new (
367
+ fn reset_writer (
368
+ & mut self ,
369
+ ) -> DeltaResult < ( AsyncArrowWriter < AsyncShareableBuffer > , AsyncShareableBuffer ) > {
370
+ let new_buffer = AsyncShareableBuffer :: default ( ) ;
371
+ let arrow_writer = AsyncArrowWriter :: try_new (
346
372
new_buffer. clone ( ) ,
347
373
self . config . file_schema . clone ( ) ,
348
374
Some ( self . config . writer_properties . clone ( ) ) ,
@@ -353,20 +379,20 @@ impl PartitionWriter {
353
379
) )
354
380
}
355
381
356
- fn write_batch ( & mut self , batch : & RecordBatch ) -> DeltaResult < ( ) > {
357
- Ok ( self . arrow_writer . write ( batch) ?)
382
+ async fn write_batch ( & mut self , batch : & RecordBatch ) -> DeltaResult < ( ) > {
383
+ Ok ( self . arrow_writer . write ( batch) . await ?)
358
384
}
359
385
360
386
async fn flush_arrow_writer ( & mut self ) -> DeltaResult < ( ) > {
361
387
// replace counter / buffers and close the current writer
362
388
let ( writer, buffer) = self . reset_writer ( ) ?;
363
- let metadata = writer. close ( ) ?;
389
+ let metadata = writer. close ( ) . await ?;
364
390
// don't write empty file
365
391
if metadata. num_rows == 0 {
366
392
return Ok ( ( ) ) ;
367
393
}
368
394
369
- let buffer = match buffer. into_inner ( ) {
395
+ let mut buffer = match buffer. into_inner ( ) . await {
370
396
Some ( buffer) => Bytes :: from ( buffer) ,
371
397
None => return Ok ( ( ) ) , // Nothing to write
372
398
} ;
@@ -376,7 +402,33 @@ impl PartitionWriter {
376
402
let file_size = buffer. len ( ) as i64 ;
377
403
378
404
// write file to object store
379
- self . object_store . put ( & path, buffer. into ( ) ) . await ?;
405
+ let mut multi_part_upload = self . object_store . put_multipart ( & path) . await ?;
406
+ let part_size = upload_part_size ( ) ;
407
+ let mut tasks = JoinSet :: new ( ) ;
408
+ let max_concurrent_tasks = 10 ; // TODO: make configurable
409
+
410
+ while buffer. len ( ) > part_size {
411
+ let part = buffer. split_to ( part_size) ;
412
+ let upload_future = multi_part_upload. put_part ( part. into ( ) ) ;
413
+
414
+ // wait until one spot frees up before spawning new task
415
+ if tasks. len ( ) >= max_concurrent_tasks {
416
+ tasks. join_next ( ) . await ;
417
+ }
418
+ tasks. spawn ( upload_future) ;
419
+ }
420
+
421
+ if !buffer. is_empty ( ) {
422
+ let upload_future = multi_part_upload. put_part ( buffer. into ( ) ) ;
423
+ tasks. spawn ( upload_future) ;
424
+ }
425
+
426
+ // wait for all remaining tasks to complete
427
+ while let Some ( result) = tasks. join_next ( ) . await {
428
+ result. map_err ( |e| DeltaTableError :: generic ( e. to_string ( ) ) ) ??;
429
+ }
430
+
431
+ multi_part_upload. complete ( ) . await ?;
380
432
381
433
self . files_written . push (
382
434
create_add (
@@ -412,9 +464,9 @@ impl PartitionWriter {
412
464
let max_offset = batch. num_rows ( ) ;
413
465
for offset in ( 0 ..max_offset) . step_by ( self . config . write_batch_size ) {
414
466
let length = usize:: min ( self . config . write_batch_size , max_offset - offset) ;
415
- self . write_batch ( & batch. slice ( offset, length) ) ?;
467
+ self . write_batch ( & batch. slice ( offset, length) ) . await ?;
416
468
// flush currently buffered data to disk once we meet or exceed the target file size.
417
- let estimated_size = self . buffer . len ( ) + self . arrow_writer . in_progress_size ( ) ;
469
+ let estimated_size = self . buffer . len ( ) . await + self . arrow_writer . in_progress_size ( ) ;
418
470
if estimated_size >= self . config . target_file_size {
419
471
debug ! ( "Writing file with estimated size {estimated_size:?} to disk." ) ;
420
472
self . flush_arrow_writer ( ) . await ?;
0 commit comments