Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,9 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR
.collect_vec();

let batch = snapshot.add_actions_table(true)?;
let mut arrays = Vec::new();
let mut fields = Vec::new();

let schema = batch.schema();
let mut arrays = Vec::with_capacity(schema.fields().len());
let mut fields = Vec::with_capacity(schema.fields().len());

arrays.push(
batch
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ impl DeltaDataChecker {

/// Return true if all the nullability checks are valid
fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
let mut violations = Vec::new();
let mut violations = Vec::with_capacity(self.non_nullable_columns.len());
for col in self.non_nullable_columns.iter() {
if let Some(arr) = record_batch.column_by_name(col) {
if arr.null_count() > 0 {
Expand Down Expand Up @@ -633,7 +633,7 @@ impl DeltaDataChecker {
let table_name: String = uuid::Uuid::new_v4().to_string();
self.ctx.register_table(&table_name, Arc::new(table))?;

let mut violations: Vec<String> = Vec::new();
let mut violations: Vec<String> = Vec::with_capacity(checks.len());

for check in checks {
if check.get_name().contains('.') {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl<'a> DeltaScanBuilder<'a> {
)?;

let logical_schema = if let Some(used_columns) = self.projection {
let mut fields = vec![];
let mut fields = Vec::with_capacity(used_columns.len());
for idx in used_columns {
fields.push(logical_schema.field(*idx).to_owned());
}
Expand Down Expand Up @@ -517,9 +517,9 @@ impl<'a> DeltaScanBuilder<'a> {

// needed to enforce limit and deal with missing statistics
// rust port of https://github.com/delta-io/delta/pull/1495
let mut pruned_without_stats = vec![];
let mut pruned_without_stats = Vec::new();
let mut rows_collected = 0;
let mut files = vec![];
let mut files = Vec::with_capacity(num_containers);

let file_actions: Vec<_> = self
.snapshot
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/kernel/snapshot/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ where
Scalar::Timestamp(v) => Scalar::Timestamp(func(v)),
Scalar::TimestampNtz(v) => Scalar::TimestampNtz(func(v)),
Scalar::Struct(struct_data) => {
let mut fields = Vec::new();
let mut scalars = Vec::new();
let mut fields = Vec::with_capacity(struct_data.fields().len());
let mut scalars = Vec::with_capacity(struct_data.values().len());

for (field, value) in struct_data.fields().iter().zip(struct_data.values().iter()) {
fields.push(field.clone());
Expand Down
12 changes: 7 additions & 5 deletions crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
task::{Context, Poll},
};

use arrow::array::{builder::UInt64Builder, ArrayRef, RecordBatch};
use arrow::array::{builder::UInt64Builder, Array, ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use dashmap::DashSet;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Stream for MergeBarrierStream {
// However this approach exposes the cost of hashing so we want to minimize that as much as possible.
// A map from an arrow dictionary key to the correct index of `file_partition` is created for each batch that's processed.
// This ensures we only need to hash each file path at most once per batch.
let mut key_map = Vec::new();
let mut key_map = Vec::with_capacity(file_dictionary.len());

for file_name in file_dictionary.values().into_iter() {
let key = match file_name {
Expand All @@ -273,9 +273,11 @@ impl Stream for MergeBarrierStream {
key_map.push(key)
}

let mut indices: Vec<_> = (0..(self.file_partitions.len()))
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.collect();
let mut indices: Vec<_> =
Vec::with_capacity(self.file_partitions.len());
for _ in 0..self.file_partitions.len() {
indices.push(UInt64Builder::with_capacity(batch.num_rows()));
}

for (idx, key) in file_dictionary.keys().iter().enumerate() {
match key {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/write/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ pub(crate) async fn write_execution_plan_v2(
});

// spawn one worker per partition stream to drive DataFusion concurrently
let mut worker_handles = Vec::new();
let mut worker_handles = Vec::with_capacity(partition_streams.len());
let scan_start = std::time::Instant::now();
for mut partition_stream in partition_streams {
let tx_clone = tx.clone();
Expand Down Expand Up @@ -444,7 +444,7 @@ pub(crate) async fn write_execution_plan_v2(
});

// spawn partition workers that split batches and send to appropriate writer channel
let mut worker_handles = Vec::new();
let mut worker_handles = Vec::with_capacity(partition_streams.len());
let scan_start = std::time::Instant::now();
for mut partition_stream in partition_streams {
let txn = tx_normal.clone();
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ impl std::future::IntoFuture for WriteBuilder {
}
}
if let Some(new_schema) = new_schema {
let mut schema_evolution_projection = Vec::new();
let mut schema_evolution_projection =
Vec::with_capacity(new_schema.fields().len());
for field in new_schema.fields() {
// If field exist in source data, we cast to new datatype
if source_schema.index_of(field.name()).is_ok() {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
#[instrument(skip(self), fields(writer_count = 0))]
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError> {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();
let mut actions = Vec::with_capacity(writers.len());

Span::current().record("writer_count", writers.len());

Expand Down Expand Up @@ -432,8 +432,8 @@ fn quarantine_failed_parquet_rows(
arrow_schema: Arc<ArrowSchema>,
values: Vec<Value>,
) -> Result<(Vec<Value>, Vec<BadValue>), DeltaWriterError> {
let mut good: Vec<Value> = Vec::new();
let mut bad: Vec<BadValue> = Vec::new();
let mut good: Vec<Value> = Vec::with_capacity(values.len());
let mut bad: Vec<BadValue> = Vec::with_capacity(values.len());

for value in values {
let record_batch =
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
/// Writes the existing parquet bytes to storage and resets internal state to handle another file.
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError> {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();
let mut actions = Vec::with_capacity(writers.len());

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) fn record_batch_without_partitions(
record_batch: &RecordBatch,
partition_columns: &[String],
) -> Result<RecordBatch, DeltaWriterError> {
let mut non_partition_columns = Vec::new();
let mut non_partition_columns = Vec::with_capacity(record_batch.schema().fields().len());
for (i, field) in record_batch.schema().fields().iter().enumerate() {
if !partition_columns.contains(field.name()) {
non_partition_columns.push(i);
Expand Down
2 changes: 1 addition & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl DeltaFileSystemHandler {
fs.call_method("FileInfo", (loc, type_), Some(&kwargs.into_py_dict(py)?))
};

let mut infos = Vec::new();
let mut infos = Vec::with_capacity(paths.len());
for file_path in paths {
let path = Self::parse_path(&file_path);
let listed = py.allow_threads(|| {
Expand Down
Loading