Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 179 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,17 @@ exclude = [
]

[workspace.dependencies]
arrow = "57.1.0"
arrow-array = {version = "57.1.0", features = ["chrono-tz"]}
arrow-buffer = "57.1.0"
arrow-csv = "57.1.0"
arrow-data = "57.1.0"
arrow-flight = "57.1.0"
arrow-ipc = {version = "57.1.0", features = ["lz4", "zstd"]}
arrow-json = "57.1.0"
arrow-row = "57.1.0"
arrow-schema = "57.1.0"
arrow-select = "57.1.0"
arrow = "59.0.0"
arrow-array = {version = "59.0.0", features = ["chrono-tz"]}
arrow-buffer = "59.0.0"
arrow-csv = "59.0.0"
arrow-data = "59.0.0"
arrow-flight = "59.0.0"
arrow-ipc = {version = "59.0.0", features = ["lz4", "zstd"]}
arrow-json = "59.0.0"
arrow-row = "59.0.0"
arrow-schema = "59.0.0"
arrow-select = "59.0.0"
approx = "0.5.1"
arc-swap = "1.7.1"
async-compat = "0.2.5"
Expand Down Expand Up @@ -334,7 +334,7 @@ opentelemetry = {version = "0.31", features = ["trace", "metrics", "logs"]}
opentelemetry-otlp = {version = "0.31", features = ["grpc-tonic", "logs"]}
opentelemetry_sdk = {version = "0.31", features = ["logs"]}
parking_lot = "0.12.5"
parquet = "57.1.0"
parquet = "59.0.0"
path_macro = "1.0.0"
pretty_assertions = "1.4.1"
proptest = "1.9.0"
Expand Down
9 changes: 8 additions & 1 deletion src/daft-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ ignored = [
"arrow-data-56",
"arrow-data-57",
"arrow-data-58",
"arrow-data-59",
"arrow-schema-56",
"arrow-schema-57",
"arrow-schema-58",
"arrow-schema-59",
"arrow-array-56",
"arrow-array-57",
"arrow-array-58"
"arrow-array-58",
"arrow-array-59"
]

[features]
arrow-56 = ["dep:arrow-schema-56", "dep:arrow-data-56", "dep:arrow-array-56"]
arrow-57 = ["dep:arrow-schema-57", "dep:arrow-data-57", "dep:arrow-array-57"]
arrow-58 = ["dep:arrow-schema-58", "dep:arrow-data-58", "dep:arrow-array-58"]
arrow-59 = ["dep:arrow-schema-59", "dep:arrow-data-59", "dep:arrow-array-59"]

[dependencies]
daft-ext-macros = {path = "../daft-ext-macros", version = "0.1.1"}
Expand All @@ -39,6 +43,9 @@ arrow-array-57 = {package = "arrow-array", version = "57", features = ["ffi"], o
arrow-schema-58 = {package = "arrow-schema", version = "58", features = ["ffi"], optional = true}
arrow-data-58 = {package = "arrow-data", version = "58", features = ["ffi"], optional = true}
arrow-array-58 = {package = "arrow-array", version = "58", features = ["ffi"], optional = true}
arrow-schema-59 = {package = "arrow-schema", version = "59", features = ["ffi"], optional = true}
arrow-data-59 = {package = "arrow-data", version = "59", features = ["ffi"], optional = true}
arrow-array-59 = {package = "arrow-array", version = "59", features = ["ffi"], optional = true}

[lints]
workspace = true
5 changes: 4 additions & 1 deletion src/daft-ext/src/abi/compat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Feature-gated conversions for arrow-rs FFI types.
//!
//! Enable one of `arrow-56`, `arrow-57`, or `arrow-58` to get:
//! Enable one of `arrow-56`, `arrow-57`, `arrow-58`, or `arrow-59` to get:
//! - Safe `.into()` between `FFI_ArrowArray`/`FFI_ArrowSchema` and our types
//! - `TryFrom` between `arrow_schema::Field` and our `ArrowSchema`

Expand Down Expand Up @@ -105,3 +105,6 @@ impl_arrow_conversions!(arrow_schema_57, arrow_data_57);

#[cfg(feature = "arrow-58")]
impl_arrow_conversions!(arrow_schema_58, arrow_data_58);

#[cfg(feature = "arrow-59")]
impl_arrow_conversions!(arrow_schema_59, arrow_data_59);
16 changes: 12 additions & 4 deletions src/daft-ext/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Convenience helpers for converting between `daft-ext` FFI types and arrow-rs types.
//!
//! Requires one of the `arrow-56`, `arrow-57`, or `arrow-58` feature flags.
//! Requires one of the `arrow-56`, `arrow-57`, `arrow-58`, or `arrow-59` feature flags.

#[allow(unused_macros)]
macro_rules! impl_helpers {
Expand Down Expand Up @@ -97,15 +97,23 @@ macro_rules! impl_helpers {

// When multiple arrow features are enabled (e.g. --all-features), pick exactly one.
// Prefer the highest version.
#[cfg(feature = "arrow-58")]
#[cfg(feature = "arrow-59")]
impl_helpers!(arrow_schema_59, arrow_data_59, arrow_array_59);

#[cfg(all(feature = "arrow-58", not(feature = "arrow-59")))]
impl_helpers!(arrow_schema_58, arrow_data_58, arrow_array_58);

#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
#[cfg(all(
feature = "arrow-57",
not(feature = "arrow-58"),
not(feature = "arrow-59")
))]
impl_helpers!(arrow_schema_57, arrow_data_57, arrow_array_57);

#[cfg(all(
feature = "arrow-56",
not(feature = "arrow-57"),
not(feature = "arrow-58")
not(feature = "arrow-58"),
not(feature = "arrow-59")
))]
impl_helpers!(arrow_schema_56, arrow_data_56, arrow_array_56);
7 changes: 6 additions & 1 deletion src/daft-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ pub mod session;

mod ffi;

#[cfg(any(feature = "arrow-56", feature = "arrow-57", feature = "arrow-58"))]
#[cfg(any(
feature = "arrow-56",
feature = "arrow-57",
feature = "arrow-58",
feature = "arrow-59"
))]
pub mod helpers;
pub use daft_ext_macros::*;
7 changes: 6 additions & 1 deletion src/daft-ext/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
pub use daft_ext_macros::{daft_extension, daft_func, daft_func_batch};

#[cfg(any(feature = "arrow-56", feature = "arrow-57", feature = "arrow-58"))]
#[cfg(any(
feature = "arrow-56",
feature = "arrow-57",
feature = "arrow-58",
feature = "arrow-59"
))]
pub use crate::helpers::{export_array, export_field, import_array, import_field};
pub use crate::{
abi::{ArrowArray, ArrowArrayStream, ArrowData, ArrowSchema, ffi::strings::free_string},
Expand Down
12 changes: 6 additions & 6 deletions src/daft-parquet/benches/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn write_single_col_int64(compression: Compression) -> PathBuf {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]));
let props = WriterProperties::builder()
.set_compression(compression)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand Down Expand Up @@ -154,7 +154,7 @@ fn write_wide_table() -> PathBuf {
let schema = Arc::new(Schema::new(fields));
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand Down Expand Up @@ -185,7 +185,7 @@ fn write_string_dict_encoded() -> PathBuf {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand Down Expand Up @@ -215,7 +215,7 @@ fn write_nested_list_of_int() -> PathBuf {
)]));
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand Down Expand Up @@ -246,7 +246,7 @@ fn write_nested_struct() -> PathBuf {
)]));
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand All @@ -273,7 +273,7 @@ fn write_boolean_column() -> PathBuf {
)]));
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(NUM_ROWS)
.set_max_row_group_row_count(Some(NUM_ROWS))
.build();
let file = fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
Expand Down
16 changes: 12 additions & 4 deletions src/daft-parquet/src/reader/field_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,47 +101,55 @@ fn build_primitive_leaf_reader(
));

let reader: Box<dyn ArrayReader> = if matches!(arrow_type, arrow::datatypes::DataType::Null) {
Box::new(NullArrayReader::<Int32Type>::new(pages, col_descr)?)
Box::new(NullArrayReader::<Int32Type>::new(
pages, col_descr, total_rows,
)?)
} else {
match physical_type {
PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::INT32 => Box::new(PrimitiveArrayReader::<Int32Type>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
pages,
col_descr,
Some(arrow_type),
total_rows,
)?),
PhysicalType::BYTE_ARRAY => match arrow_type {
arrow::datatypes::DataType::Utf8View | arrow::datatypes::DataType::BinaryView => {
make_byte_view_array_reader(pages, col_descr, Some(arrow_type))?
make_byte_view_array_reader(pages, col_descr, Some(arrow_type), total_rows)?
}
_ => make_byte_array_reader(pages, col_descr, Some(arrow_type))?,
_ => make_byte_array_reader(pages, col_descr, Some(arrow_type), total_rows)?,
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(pages, col_descr, Some(arrow_type))?
make_fixed_len_byte_array_reader(pages, col_descr, Some(arrow_type), total_rows)?
}
}
};
Comment on lines 103 to 155

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 total_rows as batch_size may cause over-allocation

total_rows here is rg.num_rows() as usize — the full row-group size — yet the actual reads are chunked at the chunk_size level in decode_one_streaming. If batch_size controls internal buffer pre-allocation inside parquet-rs (as is typical), passing the full row-group count (potentially millions of rows) instead of the chunk_size hint could cause the reader to pre-allocate far more memory than is ever needed per decode cycle.

Consider threading chunk_size down through build_top_field_readerbuild_primitive_leaf_reader so each reader pre-allocates only what will actually be consumed per batch.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a reasonable point. wdyt?

Expand Down
32 changes: 12 additions & 20 deletions src/daft-parquet/src/statistics/column_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,11 @@ fn convert_int64_arrowrs(
upper: i64,
col_descr: &ArrowColumnDescriptor,
) -> super::Result<ColumnRangeStatistics> {
if let Some(ArrowLogicalType::Timestamp {
is_adjusted_to_u_t_c,
unit,
}) = col_descr.logical_type_ref()
{
let daft_tu = timeunit_to_daft(*unit);
if let Some(ArrowLogicalType::Timestamp(ts)) = col_descr.logical_type_ref() {
let daft_tu = timeunit_to_daft(ts.unit);
return make_timestamp_column_range_statistics(
daft_tu,
*is_adjusted_to_u_t_c,
ts.is_adjusted_to_u_t_c,
lower,
upper,
);
Expand Down Expand Up @@ -229,17 +225,13 @@ fn convert_int96_arrowrs(
let lower_raw: [u32; 3] = [lower_data[0], lower_data[1], lower_data[2]];
let upper_raw: [u32; 3] = [upper_data[0], upper_data[1], upper_data[2]];

if let Some(ArrowLogicalType::Timestamp {
is_adjusted_to_u_t_c,
unit,
}) = col_descr.logical_type_ref()
{
let daft_tu = timeunit_to_daft(*unit);
if let Some(ArrowLogicalType::Timestamp(ts)) = col_descr.logical_type_ref() {
let daft_tu = timeunit_to_daft(ts.unit);
let lower_ts = convert_i96_to_i64_timestamp(lower_raw, daft_tu);
let upper_ts = convert_i96_to_i64_timestamp(upper_raw, daft_tu);
return make_timestamp_column_range_statistics(
daft_tu,
*is_adjusted_to_u_t_c,
ts.is_adjusted_to_u_t_c,
lower_ts,
upper_ts,
);
Expand Down Expand Up @@ -289,10 +281,10 @@ fn convert_byte_array_arrowrs(
Utf8Array::from_slice("upper", [upper_str.as_str()].as_slice()).into_series();
return Ok(ColumnRangeStatistics::new(Some(lower), Some(upper))?);
}
ArrowLogicalType::Decimal { precision, scale } => {
ArrowLogicalType::Decimal(d) => {
return make_decimal_column_range_statistics(
*precision as usize,
*scale as usize,
d.precision as usize,
d.scale as usize,
lower_bytes,
upper_bytes,
);
Expand Down Expand Up @@ -335,10 +327,10 @@ fn convert_fixed_len_arrowrs(
let lower_bytes = lower.data();
let upper_bytes = upper.data();

if let Some(ArrowLogicalType::Decimal { precision, scale }) = col_descr.logical_type_ref() {
if let Some(ArrowLogicalType::Decimal(d)) = col_descr.logical_type_ref() {
return make_decimal_column_range_statistics(
*precision as usize,
*scale as usize,
d.precision as usize,
d.scale as usize,
lower_bytes,
upper_bytes,
);
Expand Down
2 changes: 1 addition & 1 deletion src/daft-sketch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
arrow-array = {workspace = true}
arrow-schema = {workspace = true}
common-error = {path = "../common/error", default-features = false}
serde_arrow = {version = "0.14.2", features = ["arrow-57"]}
serde_arrow = {version = "0.14.2", features = ["arrow-59"]}
sketches-ddsketch = {workspace = true}
snafu = {workspace = true}

Expand Down
Loading