Skip to content

Commit 509db2b

Browse files
peaseeCopilot
authored andcommitted
fix: Ensure target file size is properly respected in vortex sink (#33)
* fix: Ensure target file sizes are respected * fix: Improve robustness for streams split over multiple cores * wip * wip * wip * wip * refactor: Improve robustness * refactor: Improve concurrent writer robustness for memory bounds * Update vortex-datafusion/src/persistent/sink.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update vortex-datafusion/src/persistent/format.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update vortex-datafusion/src/persistent/sink.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update vortex-datafusion/src/persistent/sink.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update vortex-datafusion/src/tests/nested_projection.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * review: Address comments * Update vortex-datafusion/src/persistent/sink.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * review: Address comments * Revert "Update vortex-datafusion/src/persistent/sink.rs" This reverts commit 6c3be0f. * refactor: Simplify * test: Add file writer finish notify test * review: Address comments --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 9363d69 commit 509db2b

4 files changed

Lines changed: 1573 additions & 83 deletions

File tree

vortex-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ object_store = { workspace = true }
3636
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
3737
tokio-stream = { workspace = true }
3838
tracing = { workspace = true, features = ["std", "attributes"] }
39+
uuid = { workspace = true, features = ["v7"] }
3940
vortex = { workspace = true, features = ["object_store", "tokio", "files"] }
4041
vortex-utils = { workspace = true, features = ["dashmap"] }
4142

vortex-datafusion/src/lib.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,85 @@ where
4848
}
4949
}
5050
}
51+
52+
#[cfg(test)]
53+
mod common_tests {
54+
use std::sync::Arc;
55+
use std::sync::LazyLock;
56+
57+
use datafusion::arrow::array::RecordBatch;
58+
use datafusion::datasource::provider::DefaultTableFactory;
59+
use datafusion::execution::SessionStateBuilder;
60+
use datafusion::prelude::SessionContext;
61+
use datafusion_common::GetExt;
62+
use object_store::ObjectStore;
63+
use object_store::memory::InMemory;
64+
use url::Url;
65+
use vortex::VortexSessionDefault;
66+
use vortex::array::ArrayRef;
67+
use vortex::array::arrow::FromArrowArray;
68+
use vortex::file::WriteOptionsSessionExt;
69+
use vortex::io::ObjectStoreWriter;
70+
use vortex::io::VortexWrite;
71+
use vortex::session::VortexSession;
72+
73+
use crate::VortexFormatFactory;
74+
use crate::VortexOptions;
75+
76+
static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
77+
78+
pub struct TestSessionContext {
79+
pub store: Arc<dyn ObjectStore>,
80+
pub session: SessionContext,
81+
}
82+
83+
impl Default for TestSessionContext {
84+
fn default() -> Self {
85+
Self::new(false)
86+
}
87+
}
88+
89+
impl TestSessionContext {
90+
/// Create a new test session context with the given projection pushdown setting.
91+
pub fn new(projection_pushdown: bool) -> Self {
92+
let store = Arc::new(InMemory::new());
93+
let opts = VortexOptions {
94+
projection_pushdown,
95+
..Default::default()
96+
};
97+
let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
98+
let mut session_state_builder = SessionStateBuilder::new()
99+
.with_default_features()
100+
.with_table_factory(
101+
factory.get_ext().to_uppercase(),
102+
Arc::new(DefaultTableFactory::new()),
103+
)
104+
.with_object_store(&Url::try_from("file://").unwrap(), store.clone());
105+
106+
if let Some(file_formats) = session_state_builder.file_formats() {
107+
file_formats.push(factory as _);
108+
}
109+
110+
let session: SessionContext =
111+
SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
112+
113+
Self { store, session }
114+
}
115+
116+
/// Write arrow data into a vortex file.
117+
pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
118+
where
119+
P: Into<object_store::path::Path>,
120+
{
121+
let array = ArrayRef::from_arrow(batch, false);
122+
let mut write = ObjectStoreWriter::new(self.store.clone(), &path.into()).await?;
123+
VX_SESSION
124+
.write_options()
125+
.write(&mut write, array.to_array_stream())
126+
.await?;
127+
write.shutdown().await?;
128+
129+
Ok(())
130+
}
131+
}
132+
}

vortex-datafusion/src/persistent/format.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use datafusion_common::Result as DFResult;
1717
use datafusion_common::Statistics;
1818
use datafusion_common::config::ConfigField;
1919
use datafusion_common::config_namespace;
20+
use datafusion_common::internal_datafusion_err;
2021
use datafusion_common::not_impl_err;
2122
use datafusion_common::parsers::CompressionTypeVariant;
2223
use datafusion_common::stats::Precision;
@@ -33,6 +34,8 @@ use datafusion_datasource::source::DataSourceExec;
3334
use datafusion_expr::dml::InsertOp;
3435
use datafusion_physical_expr::LexRequirement;
3536
use datafusion_physical_plan::ExecutionPlan;
37+
use datafusion_physical_plan::ExecutionPlanProperties;
38+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3639
use futures::FutureExt;
3740
use futures::StreamExt as _;
3841
use futures::TryStreamExt as _;
@@ -64,6 +67,7 @@ use crate::PrecisionExt as _;
6467
use crate::convert::TryToDataFusion;
6568

6669
const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
70+
const DEFAULT_TARGET_FILE_SIZE_MB: usize = 128;
6771

6872
/// Vortex implementation of a DataFusion [`FileFormat`].
6973
pub struct VortexFormat {
@@ -96,6 +100,24 @@ config_namespace! {
96100
/// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
97101
/// during footer parsing.
98102
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
103+
/// Target file size in megabytes for written Vortex files.
104+
///
105+
/// When greater than 0 for non-partitioned writes, Vortex bypasses
106+
/// DataFusion's file demuxer and splits output files based on
107+
/// approximate byte size rather than row count.
108+
pub target_file_size_mb: usize, default = DEFAULT_TARGET_FILE_SIZE_MB
109+
/// Whether to enable projection pushdown into the underlying Vortex scan.
110+
///
111+
/// When enabled, projection expressions may be partially evaluated during
112+
/// the scan. When disabled, Vortex reads only the referenced columns and
113+
/// all expressions are evaluated after the scan.
114+
pub projection_pushdown: bool, default = false
115+
/// The intra-partition scan concurrency, controlling the number of row splits to process
116+
/// concurrently per-thread within each file.
117+
///
118+
/// This does not affect the overall parallelism
119+
/// across partitions, which is controlled by DataFusion's execution configuration.
120+
pub scan_concurrency: Option<usize>, default = None
99121
}
100122
}
101123

@@ -417,8 +439,42 @@ impl FileFormat for VortexFormat {
417439
return not_impl_err!("Overwrites are not implemented yet for Vortex");
418440
}
419441

442+
let target_file_size = (self.opts.target_file_size_mb > 0)
443+
.then(|| {
444+
u64::try_from(self.opts.target_file_size_mb)
445+
.map_err(|e| {
446+
internal_datafusion_err!(
447+
"target_file_size_mb cannot be represented as u64: {e}"
448+
)
449+
})
450+
.map(|v| v.saturating_mul(1024 * 1024).max(1))
451+
})
452+
.transpose()?;
453+
454+
// For non-partitioned writes, force a single input stream so VortexSink
455+
// performs one coordinated write per statement instead of one
456+
// independent write per CPU/input partition.
457+
//
458+
// Use coalescing rather than repartitioning to avoid introducing a
459+
// shuffle/dispatcher step that can interleave batches from different
460+
// input partitions.
461+
//
462+
// For partitioned writes, keep DataFusion's demuxer behavior.
463+
let input: Arc<dyn ExecutionPlan> = if conf.table_partition_cols.is_empty()
464+
&& input.output_partitioning().partition_count() > 1
465+
{
466+
Arc::new(CoalescePartitionsExec::new(input))
467+
} else {
468+
input
469+
};
470+
420471
let schema = conf.output_schema().clone();
421-
let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
472+
let sink = Arc::new(VortexSink::new(
473+
conf,
474+
schema,
475+
self.session.clone(),
476+
target_file_size,
477+
));
422478

423479
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
424480
}

0 commit comments

Comments
 (0)