Skip to content

Commit ac49c16

Browse files
committed
fix: Always write with custom sink
1 parent 5ba7677 commit ac49c16

2 files changed

Lines changed: 48 additions & 79 deletions

File tree

vortex-datafusion/src/persistent/format.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ config_namespace! {
9898
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
9999
/// Target file size in megabytes for written Vortex files.
100100
///
101-
/// When set to a value greater than 0, the writer will attempt to split output
102-
/// into multiple files, each approximately this size. A value of 0
103-
/// means no file size limit is applied and file sizes are determined by the
104-
/// incoming data stream.
101+
/// The writer will attempt to split output into multiple files, each approximately
102+
/// this size. Defaults to 16 MB. A value of 0 is treated as the default (16 MB).
105103
pub target_file_size_mb: usize, default = 16
106104
}
107105
}
@@ -424,11 +422,12 @@ impl FileFormat for VortexFormat {
424422
return not_impl_err!("Overwrites are not implemented yet for Vortex");
425423
}
426424

427-
let target_file_size = if self.opts.target_file_size_mb > 0 {
428-
Some(self.opts.target_file_size_mb as u64 * 1024 * 1024)
425+
let target_file_size_mb = if self.opts.target_file_size_mb > 0 {
426+
self.opts.target_file_size_mb
429427
} else {
430-
None
428+
16 // Default to 16 MB when set to 0
431429
};
430+
let target_file_size = target_file_size_mb as u64 * 1024 * 1024;
432431

433432
let schema = conf.output_schema().clone();
434433
let sink = Arc::new(VortexSink::new(

vortex-datafusion/src/persistent/sink.rs

Lines changed: 42 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ pub struct VortexSink {
4242
config: FileSinkConfig,
4343
schema: SchemaRef,
4444
session: VortexSession,
45-
/// Target file size in bytes. When set, the writer will split output files
45+
/// Target file size in bytes. The writer will split output files
4646
/// when they reach approximately this size.
47-
target_file_size: Option<u64>,
47+
target_file_size: u64,
4848
}
4949

5050
impl VortexSink {
5151
pub fn new(
5252
config: FileSinkConfig,
5353
schema: SchemaRef,
5454
session: VortexSession,
55-
target_file_size: Option<u64>,
55+
target_file_size: u64,
5656
) -> Self {
5757
Self {
5858
config,
@@ -201,17 +201,15 @@ impl DataSink for VortexSink {
201201
data: SendableRecordBatchStream,
202202
context: &Arc<TaskContext>,
203203
) -> DFResult<u64> {
204-
match self.target_file_size {
205-
Some(target_size) if self.config.table_partition_cols.is_empty() => {
206-
// When target file size is set and no partitioning, bypass the demuxer
207-
// and write files directly with size-based splitting.
208-
self.write_all_with_target_size(data, context, target_size)
209-
.await
210-
}
211-
_ => {
212-
// Default path: use the FileSink/demuxer flow
213-
FileSink::write_all(self, data, context).await
214-
}
204+
if self.config.table_partition_cols.is_empty() {
205+
// Non-partitioned: bypass the demuxer and write files directly
206+
// with size-based splitting.
207+
self.write_all_with_target_size(data, context, self.target_file_size)
208+
.await
209+
} else {
210+
// Partitioned: use the FileSink/demuxer flow, which will call
211+
// spawn_writer_tasks_and_join where we also apply size limits.
212+
FileSink::write_all(self, data, context).await
215213
}
216214
}
217215
}
@@ -247,22 +245,16 @@ impl FileSink for VortexSink {
247245
// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
248246
// the demux task might deadlock itself.
249247
file_write_tasks.spawn(async move {
250-
if let Some(target_size) = target_file_size {
251-
write_with_file_size_limit(
252-
session,
253-
row_counter,
254-
object_store,
255-
dtype,
256-
path,
257-
rx,
258-
target_size,
259-
)
260-
.await
261-
} else {
262-
write_single_file(session, row_counter, object_store, dtype, path, rx)
263-
.await
264-
.map(|p| vec![p])
265-
}
248+
write_with_file_size_limit(
249+
session,
250+
row_counter,
251+
object_store,
252+
dtype,
253+
path,
254+
rx,
255+
target_file_size,
256+
)
257+
.await
266258
});
267259
}
268260

@@ -305,41 +297,6 @@ fn split_path(original: &Path, sub_index: usize) -> Path {
305297
}
306298
}
307299

308-
/// Write the entire stream to a single Vortex file (original behavior).
309-
async fn write_single_file(
310-
session: VortexSession,
311-
row_counter: Arc<AtomicU64>,
312-
object_store: Arc<dyn ObjectStore>,
313-
dtype: DType,
314-
path: Path,
315-
rx: tokio::sync::mpsc::Receiver<datafusion_common::arrow::array::RecordBatch>,
316-
) -> DFResult<Path> {
317-
let stream = ReceiverStream::new(rx).map(move |rb| {
318-
row_counter.fetch_add(rb.num_rows() as u64, Ordering::Relaxed);
319-
VortexResult::Ok(ArrayRef::from_arrow(rb, false))
320-
});
321-
322-
let stream_adapter = ArrayStreamAdapter::new(dtype, stream);
323-
324-
let mut sink = ObjectStoreWriter::new(object_store.clone(), &path)
325-
.await
326-
.map_err(|e| {
327-
DataFusionError::Execution(format!("Failed to create ObjectStoreWriter: {e}"))
328-
})?;
329-
330-
session
331-
.write_options()
332-
.write(&mut sink, stream_adapter)
333-
.await
334-
.map_err(|e| DataFusionError::Execution(format!("Failed to write Vortex file: {e}")))?;
335-
336-
sink.shutdown().await.map_err(|e| {
337-
DataFusionError::Execution(format!("Failed to shutdown Vortex writer: {e}"))
338-
})?;
339-
340-
Ok(path)
341-
}
342-
343300
/// Write the stream to multiple Vortex files, splitting when the target file size is reached.
344301
///
345302
/// Splits the input record batches into groups based on estimated uncompressed size, then writes
@@ -1429,14 +1386,14 @@ mod tests {
14291386
}
14301387

14311388
// -----------------------------------------------------------------------
1432-
// Integration test: target_file_size_mb = 0 uses default demuxer path
1389+
// Integration test: target_file_size_mb = 0 defaults to 16 MB
14331390
// -----------------------------------------------------------------------
14341391

1435-
/// With `target_file_size_mb=0`, the writer should fall through to the
1436-
/// DataFusion demuxer path (FileSink::write_all) instead of the custom
1437-
/// size-based splitting. Data integrity should be preserved regardless.
1392+
/// With `target_file_size_mb=0`, the writer should default to 16 MB and
1393+
/// still use the custom size-based writer. Data integrity should be
1394+
/// preserved regardless.
14381395
#[tokio::test]
1439-
async fn test_target_file_size_zero_uses_demuxer() -> anyhow::Result<()> {
1396+
async fn test_target_file_size_zero_defaults_to_16mb() -> anyhow::Result<()> {
14401397
use datafusion::arrow::array::Int64Array;
14411398

14421399
let dir = TempDir::new()?;
@@ -1459,6 +1416,7 @@ mod tests {
14591416
))
14601417
.await?;
14611418

1419+
// 100K Int8 values is well below 16 MB, so should produce a single file.
14621420
let entries: usize = 100_000;
14631421
let batch = RecordBatch::try_new(
14641422
Arc::new(Schema::new(vec![Field::new("a", DataType::Int8, false)])),
@@ -1472,8 +1430,7 @@ mod tests {
14721430
.collect()
14731431
.await?;
14741432

1475-
// Verify data integrity — the demuxer controls file count, so we only
1476-
// check that all rows are present.
1433+
// Verify data integrity.
14771434
let result = session
14781435
.sql("SELECT COUNT(*) as cnt FROM nosplit_tbl")
14791436
.await?
@@ -1489,6 +1446,19 @@ mod tests {
14891446

14901447
assert_eq!(count_value, entries as i64);
14911448

1449+
// Small data with a 16 MB default should produce exactly 1 file.
1450+
let files: Vec<_> = std::fs::read_dir(dir.path())?
1451+
.filter_map(|e| e.ok())
1452+
.filter(|e| e.path().extension().map_or(false, |ext| ext == "vortex"))
1453+
.collect();
1454+
1455+
assert_eq!(
1456+
files.len(),
1457+
1,
1458+
"Small data with default 16 MB target (from target_file_size_mb=0) should produce 1 file, got {}",
1459+
files.len()
1460+
);
1461+
14921462
Ok(())
14931463
}
14941464

0 commit comments

Comments
 (0)