diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index 509691efd3..9b3d8930fa 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -290,10 +290,9 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR .collect_vec(); let batch = snapshot.add_actions_table(true)?; - let mut arrays = Vec::new(); - let mut fields = Vec::new(); - let schema = batch.schema(); + let mut arrays = Vec::with_capacity(schema.fields().len()); + let mut fields = Vec::with_capacity(schema.fields().len()); arrays.push( batch diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 566190b3dc..7bb7157360 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -597,7 +597,7 @@ impl DeltaDataChecker { /// Return true if all the nullability checks are valid fn check_nullability(&self, record_batch: &RecordBatch) -> Result { - let mut violations = Vec::new(); + let mut violations = Vec::with_capacity(self.non_nullable_columns.len()); for col in self.non_nullable_columns.iter() { if let Some(arr) = record_batch.column_by_name(col) { if arr.null_count() > 0 { @@ -633,7 +633,7 @@ impl DeltaDataChecker { let table_name: String = uuid::Uuid::new_v4().to_string(); self.ctx.register_table(&table_name, Arc::new(table))?; - let mut violations: Vec = Vec::new(); + let mut violations: Vec = Vec::with_capacity(checks.len()); for check in checks { if check.get_name().contains('.') { diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 452aa798ce..7a1ac88781 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -438,7 +438,7 @@ impl<'a> DeltaScanBuilder<'a> { )?; let logical_schema = if let Some(used_columns) = self.projection { - let mut fields = vec![]; + let mut fields = Vec::with_capacity(used_columns.len()); for idx in used_columns { fields.push(logical_schema.field(*idx).to_owned()); } @@ -517,9 +517,9 @@ impl<'a> DeltaScanBuilder<'a> { // needed to enforce limit and deal with missing statistics // rust port of https://github.com/delta-io/delta/pull/1495 - let mut pruned_without_stats = vec![]; + let mut pruned_without_stats = Vec::new(); let mut rows_collected = 0; - let mut files = vec![]; + let mut files = Vec::with_capacity(num_containers); let file_actions: Vec<_> = self .snapshot diff --git a/crates/core/src/kernel/snapshot/iterators.rs b/crates/core/src/kernel/snapshot/iterators.rs index cbe0217562..cc1cce66bd 100644 --- a/crates/core/src/kernel/snapshot/iterators.rs +++ b/crates/core/src/kernel/snapshot/iterators.rs @@ -334,8 +334,8 @@ where Scalar::Timestamp(v) => Scalar::Timestamp(func(v)), Scalar::TimestampNtz(v) => Scalar::TimestampNtz(func(v)), Scalar::Struct(struct_data) => { - let mut fields = Vec::new(); - let mut scalars = Vec::new(); + let mut fields = Vec::with_capacity(struct_data.fields().len()); + let mut scalars = Vec::with_capacity(struct_data.values().len()); for (field, value) in struct_data.fields().iter().zip(struct_data.values().iter()) { fields.push(field.clone()); diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 682ae58457..1d4fdc6047 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -16,7 +16,7 @@ use std::{ task::{Context, Poll}, }; -use arrow::array::{builder::UInt64Builder, ArrayRef, RecordBatch}; +use arrow::array::{builder::UInt64Builder, Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use dashmap::DashSet; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -253,7 +253,7 @@ impl Stream for MergeBarrierStream { // However this approach exposes the cost of hashing so we want to minimize that as much as possible. // A map from an arrow dictionary key to the correct index of `file_partition` is created for each batch that's processed. // This ensures we only need to hash each file path at most once per batch. - let mut key_map = Vec::new(); + let mut key_map = Vec::with_capacity(file_dictionary.len()); for file_name in file_dictionary.values().into_iter() { let key = match file_name { @@ -273,9 +273,11 @@ impl Stream for MergeBarrierStream { key_map.push(key) } - let mut indices: Vec<_> = (0..(self.file_partitions.len())) - .map(|_| UInt64Builder::with_capacity(batch.num_rows())) - .collect(); + let mut indices: Vec<_> = + Vec::with_capacity(self.file_partitions.len()); + for _ in 0..self.file_partitions.len() { + indices.push(UInt64Builder::with_capacity(batch.num_rows())); + } for (idx, key) in file_dictionary.keys().iter().enumerate() { match key { diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 1e7030cc41..c852ea4093 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -327,7 +327,7 @@ pub(crate) async fn write_execution_plan_v2( }); // spawn one worker per partition stream to drive DataFusion concurrently - let mut worker_handles = Vec::new(); + let mut worker_handles = Vec::with_capacity(partition_streams.len()); let scan_start = std::time::Instant::now(); for mut partition_stream in partition_streams { let tx_clone = tx.clone(); @@ -444,7 +444,7 @@ pub(crate) async fn write_execution_plan_v2( }); // spawn partition workers that split batches and send to appropriate writer channel - let mut worker_handles = Vec::new(); + let mut worker_handles = Vec::with_capacity(partition_streams.len()); let scan_start = std::time::Instant::now(); for mut partition_stream in partition_streams { let txn = tx_normal.clone(); diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 40026c02b1..e652b2272d 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -508,7 +508,8 @@ impl std::future::IntoFuture for WriteBuilder { } } if let Some(new_schema) = new_schema { - let mut schema_evolution_projection = Vec::new(); + let mut schema_evolution_projection = + Vec::with_capacity(new_schema.fields().len()); for field in new_schema.fields() { // If field exist in source data, we cast to new datatype if source_schema.index_of(field.name()).is_ok() { diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index ed02f66b71..1118f13e41 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -375,7 +375,7 @@ impl DeltaWriter> for JsonWriter { #[instrument(skip(self), fields(writer_count = 0))] async fn flush(&mut self) -> Result, DeltaTableError> { let writers = std::mem::take(&mut self.arrow_writers); - let mut actions = Vec::new(); + let mut actions = Vec::with_capacity(writers.len()); Span::current().record("writer_count", writers.len()); @@ -432,8 +432,8 @@ fn quarantine_failed_parquet_rows( arrow_schema: Arc, values: Vec, ) -> Result<(Vec, Vec), DeltaWriterError> { - let mut good: Vec = Vec::new(); - let mut bad: Vec = Vec::new(); + let mut good: Vec = Vec::with_capacity(values.len()); + let mut bad: Vec = Vec::with_capacity(values.len()); for value in values { let record_batch = diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index db1de9f2b3..ead66443d9 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -274,7 +274,7 @@ impl DeltaWriter for RecordBatchWriter { /// Writes the existing parquet bytes to storage and resets internal state to handle another file. async fn flush(&mut self) -> Result, DeltaTableError> { let writers = std::mem::take(&mut self.arrow_writers); - let mut actions = Vec::new(); + let mut actions = Vec::with_capacity(writers.len()); for (_, writer) in writers { let metadata = writer.arrow_writer.close()?; diff --git a/crates/core/src/writer/utils.rs b/crates/core/src/writer/utils.rs index 2f810db520..98c8ba823e 100644 --- a/crates/core/src/writer/utils.rs +++ b/crates/core/src/writer/utils.rs @@ -75,7 +75,7 @@ pub(crate) fn record_batch_without_partitions( record_batch: &RecordBatch, partition_columns: &[String], ) -> Result { - let mut non_partition_columns = Vec::new(); + let mut non_partition_columns = Vec::with_capacity(record_batch.schema().fields().len()); for (i, field) in record_batch.schema().fields().iter().enumerate() { if !partition_columns.contains(field.name()) { non_partition_columns.push(i); diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index c07442a500..7207ca6896 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -141,7 +141,7 @@ impl DeltaFileSystemHandler { fs.call_method("FileInfo", (loc, type_), Some(&kwargs.into_py_dict(py)?)) }; - let mut infos = Vec::new(); + let mut infos = Vec::with_capacity(paths.len()); for file_path in paths { let path = Self::parse_path(&file_path); let listed = py.allow_threads(|| {