Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ indexmap = "2.2.1"
itertools = "0.14"
parking_lot = "0.12"
percent-encoding = "2"
pin-project-lite = "^0.2.7"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this dependency crept back in, I'll just have to remove it again 😆

tracing = { workspace = true }
rand = "0.8"
sqlparser = { version = "0.59.0" }
Expand Down
22 changes: 16 additions & 6 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ use datafusion::{
prelude::Expr,
scalar::ScalarValue,
};
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectMeta;

use serde::{Deserialize, Serialize};

use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
Expand Down Expand Up @@ -214,7 +212,7 @@ impl DataSink for DeltaDataSink {
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut actions = if self.save_mode == SaveMode::Overwrite {
let current_files = self.snapshot.log_data().iter().map(|f| f.add_action());
let current_files = self.snapshot.log_data().into_iter().map(|f| f.add_action());
current_files
.into_iter()
.map(|add| {
Expand Down Expand Up @@ -651,9 +649,21 @@ impl<'a> DeltaScanBuilder<'a> {
// Should we update datafusion_table_statistics to optionally take the mask?
let stats = if let Some(mask) = pruning_mask {
let es = self.snapshot.snapshot();
let pruned_stats =
filter_record_batch(&self.snapshot.files, &BooleanArray::from(mask))?;
LogDataHandler::new(&pruned_stats, es.table_configuration()).statistics()
let mut pruned_batches = Vec::new();
let mut mask_offset = 0;

for batch in &self.snapshot.files {
let batch_size = batch.num_rows();
let batch_mask = &mask[mask_offset..mask_offset + batch_size];
let batch_mask_array = BooleanArray::from(batch_mask.to_vec());
let pruned_batch = filter_record_batch(batch, &batch_mask_array)?;
if pruned_batch.num_rows() > 0 {
pruned_batches.push(pruned_batch);
}
mask_offset += batch_size;
}

LogDataHandler::new(&pruned_batches, es.table_configuration()).statistics()
Comment on lines +652 to +666
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not really nice before and unfortunately got a bit less nice.

Longer therm i think we may have to decide if we need additional skipping from datafsuion, or rely on the file skipping in delta-kernel to be selective (we have no reason to believe it would not be :)).

} else {
self.snapshot.log_data().statistics()
};
Expand Down
76 changes: 23 additions & 53 deletions crates/core/src/kernel/arrow/engine_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::borrow::Cow;
use std::sync::Arc;

use arrow_array::{Array, StructArray};
use arrow_schema::{
DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
Expand All @@ -12,7 +11,6 @@ use delta_kernel::arrow::compute::filter_record_batch;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::parse_json;
use delta_kernel::expressions::{ColumnName, Scalar, StructData};
use delta_kernel::scan::{Scan, ScanMetadata};
use delta_kernel::schema::{
Expand All @@ -27,7 +25,6 @@ use delta_kernel::{
use itertools::Itertools;

use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError};
use crate::kernel::replay::parse_partitions;
use crate::kernel::SCAN_ROW_ARROW_SCHEMA;

/// [`ScanMetadata`] contains (1) a [`RecordBatch`] specifying data files to be scanned
Expand Down Expand Up @@ -118,9 +115,6 @@ pub(crate) trait SnapshotExt {
/// computations by delta-rs. Specifically the `stats_parsed` and
/// `partitionValues_parsed` fields are added.
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef>;

/// Parse stats column into a struct array.
fn parse_stats_column(&self, batch: &RecordBatch) -> DeltaResultLocal<RecordBatch>;
}

impl SnapshotExt for Snapshot {
Expand Down Expand Up @@ -150,14 +144,15 @@ impl SnapshotExt for Snapshot {
/// scan row (file data).
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef> {
let mut fields = SCAN_ROW_ARROW_SCHEMA.fields().to_vec();
let stats_idx = SCAN_ROW_ARROW_SCHEMA.index_of("stats").unwrap();

let stats_schema = self.stats_schema()?;
let stats_schema: ArrowSchema = stats_schema.as_ref().try_into_arrow()?;
fields.push(Arc::new(Field::new(
fields[stats_idx] = Arc::new(Field::new(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are now replacing the existing stats field with the stats_parsed rather than amending the parsed ones.

"stats_parsed",
ArrowDataType::Struct(stats_schema.fields().to_owned()),
true,
)));
));

if let Some(partition_schema) = self.partitions_schema()? {
let partition_schema: ArrowSchema = partition_schema.as_ref().try_into_arrow()?;
Expand All @@ -171,51 +166,6 @@ impl SnapshotExt for Snapshot {
let schema = Arc::new(ArrowSchema::new(fields));
Ok(schema)
}

fn parse_stats_column(&self, batch: &RecordBatch) -> DeltaResultLocal<RecordBatch> {
let Some((stats_idx, _)) = batch.schema_ref().column_with_name("stats") else {
return Err(DeltaTableError::SchemaMismatch {
msg: "stats column not found".to_string(),
});
};

let mut columns = batch.columns().to_vec();
let mut fields = batch.schema().fields().to_vec();

let stats_schema = self.stats_schema()?;
let stats_batch = batch.project(&[stats_idx])?;
let stats_data = Box::new(ArrowEngineData::new(stats_batch));

let parsed = parse_json(stats_data, stats_schema)?;
let parsed: RecordBatch = ArrowEngineData::try_from_engine_data(parsed)?.into();

let stats_array: Arc<StructArray> = Arc::new(parsed.into());
fields.push(Arc::new(Field::new(
"stats_parsed",
stats_array.data_type().to_owned(),
true,
)));
columns.push(stats_array.clone());

if let Some(partition_schema) = self.partitions_schema()? {
let partition_array = parse_partitions(
batch,
partition_schema.as_ref(),
"fileConstantValues.partitionValues",
)?;
fields.push(Arc::new(Field::new(
"partitionValues_parsed",
partition_array.data_type().to_owned(),
false,
)));
columns.push(Arc::new(partition_array));
}

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(fields)),
columns,
)?)
}
}

fn partitions_schema(
Expand Down Expand Up @@ -916,4 +866,24 @@ mod tests {
assert_eq!(None, result);
Ok(())
}

#[tokio::test]
async fn test_partition_schema2() {
let schema = StructType::try_new(vec![
StructField::new("id", DataType::LONG, true),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
])
.unwrap();

let partition_columns = vec!["date".to_string()];
let expected =
StructType::try_new(vec![StructField::new("date", DataType::DATE, true)]).unwrap();
assert_eq!(
partitions_schema(&schema, &partition_columns).unwrap(),
Some(expected)
);

assert_eq!(partitions_schema(&schema, &[]).unwrap(), None);
}
}
21 changes: 10 additions & 11 deletions crates/core/src/kernel/snapshot/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_array::types::Int64Type;
use arrow_array::{Array, RecordBatch, StructArray};
use arrow_schema::DataType as ArrowDataType;
use chrono::{DateTime, Utc};
use delta_kernel::engine::arrow_expression::evaluate_expression::to_json;
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::scan::scan_row_schema;
use delta_kernel::schema::DataType;
Expand All @@ -19,14 +20,15 @@ use crate::kernel::scalars::ScalarExt;
use crate::kernel::{Add, DeletionVectorDescriptor, Remove};
use crate::{DeltaResult, DeltaTableError};

pub(crate) use self::scan_row::{scan_row_in_eval, ScanRowOutStream};
pub use self::tombstones::TombstoneView;

mod scan_row;
mod tombstones;

const FIELD_NAME_PATH: &str = "path";
const FIELD_NAME_SIZE: &str = "size";
const FIELD_NAME_MODIFICATION_TIME: &str = "modificationTime";
const FIELD_NAME_STATS: &str = "stats";
const FIELD_NAME_STATS_PARSED: &str = "stats_parsed";
const FIELD_NAME_PARTITION_VALUES_PARSED: &str = "partitionValues_parsed";
const FIELD_NAME_DELETION_VECTOR: &str = "deletionVector";
Expand Down Expand Up @@ -55,9 +57,6 @@ static FIELD_INDICES: LazyLock<HashMap<&'static str, usize>> = LazyLock::new(||
let modification_time_idx = schema.index_of(FIELD_NAME_MODIFICATION_TIME).unwrap();
indices.insert(FIELD_NAME_MODIFICATION_TIME, modification_time_idx);

let stats_idx = schema.index_of(FIELD_NAME_STATS).unwrap();
indices.insert(FIELD_NAME_STATS, stats_idx);

indices
});

Expand Down Expand Up @@ -158,12 +157,12 @@ impl LogicalFileView {
}

/// Returns the raw JSON statistics string for this file, if available.
pub fn stats(&self) -> Option<&str> {
get_string_value(
self.files
.column(*FIELD_INDICES.get(FIELD_NAME_STATS).unwrap()),
self.index,
)
pub fn stats(&self) -> Option<String> {
let stats = self.stats_parsed()?.slice(self.index, 1);
let value = to_json(&stats)
.ok()
.map(|arr| arr.as_string::<i32>().value(0).to_string());
value.and_then(|v| (!v.is_empty()).then_some(v))
Comment on lines +160 to +165
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now need to serialise individual fields to get the json stats (for add actions) since we are no longer stacking the stats column.

}

/// Returns the parsed partition values as structured data.
Expand Down Expand Up @@ -275,7 +274,7 @@ impl LogicalFileView {
size: self.size(),
modification_time: self.modification_time(),
data_change: true,
stats: self.stats().map(|v| v.to_string()),
stats: self.stats(),
tags: None,
deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()),
base_row_id: None,
Expand Down
Loading
Loading