Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
)]))
});

static LOG_REMOVE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
REMOVE_NAME,
Remove::to_schema(),
)]))
});

static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
COMMIT_INFO_NAME,
Expand Down Expand Up @@ -120,6 +127,10 @@ pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

pub(crate) fn get_log_remove_schema() -> &'static SchemaRef {
&LOG_REMOVE_SCHEMA
}

pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}
Expand Down Expand Up @@ -781,6 +792,12 @@ pub(crate) struct Remove {
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,
Expand Down Expand Up @@ -1127,6 +1144,7 @@ mod tests {
StructField::nullable("extendedFileMetadata", DataType::BOOLEAN),
partition_values_field(),
StructField::nullable("size", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
tags_field(),
deletion_vector_field(),
StructField::nullable("baseRowId", DataType::LONG),
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl RemoveVisitor {
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Remove> {
require!(
getters.len() == 14,
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of RemoveVisitor getters: {}",
getters.len()
Expand All @@ -194,10 +194,10 @@ impl RemoveVisitor {
getters[4].get_opt(row_index, "remove.partitionValues")?;

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;
let stats: Option<String> = getters[6].get_opt(row_index, "remove.stats")?;
// TODO(nick) tags are skipped in getters[7]

// TODO(nick) tags are skipped in getters[6]

let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..])?;

let base_row_id: Option<i64> = getters[12].get_opt(row_index, "remove.baseRowId")?;
let default_row_commit_version: Option<i64> =
Expand All @@ -210,6 +210,7 @@ impl RemoveVisitor {
extended_file_metadata,
partition_values,
size,
stats,
tags: None,
deletion_vector,
base_row_id,
Expand Down Expand Up @@ -834,7 +835,7 @@ mod tests {
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452,"stats":"{\"numRecords\":1}"}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
Expand All @@ -850,6 +851,7 @@ mod tests {
("c2".to_string(), "c".to_string()),
])),
size: Some(452),
stats: Some(r#"{"numRecords":1}"#.to_string()),
..Default::default()
};
assert_eq!(
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_expression/evaluate_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ pub fn evaluate_expression(
(Struct(fields), Some(DataType::Struct(output_schema))) => {
evaluate_struct_expression(fields, batch, output_schema)
}
(Struct(_), _) => Err(Error::generic(
"Data type is required to evaluate struct expressions",
)),
(Struct(_), dt) => Err(Error::Generic(format!(
"Struct expression expects a DataType::Struct result, but got {dt:?}"
))),
(Transform(transform), Some(DataType::Struct(output_schema))) => {
evaluate_transform_expression(transform, batch, output_schema)
}
Expand Down
34 changes: 28 additions & 6 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,21 +260,37 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
}
}

pub(crate) static FILE_CONSTANT_VALUES_NAME: &str = "fileConstantValues";
pub(crate) static BASE_ROW_ID_NAME: &str = "baseRowId";
pub(crate) static DEFAULT_ROW_COMMIT_VERSION_NAME: &str = "defaultRowCommitVersion";
pub(crate) static TAGS_NAME: &str = "tags";

// NB: If you update this schema, ensure you update the comment describing it in the doc comment
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
// indexes will be off, and [`get_add_transform_expr`] below to match it.
pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
// Note that fields projected out of a nullable struct must be nullable
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values =
StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]);
let file_constant_values = StructType::new_unchecked([
StructField::nullable("partitionValues", partition_values),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
StructField::nullable(
"tags",
MapType::new(
DataType::STRING,
DataType::STRING,
/*valueContainsNull*/ true,
),
),
]);
Arc::new(StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable("fileConstantValues", file_constant_values),
StructField::nullable(FILE_CONSTANT_VALUES_NAME, file_constant_values),
]))
});

Expand All @@ -290,9 +306,12 @@ fn get_add_transform_expr() -> ExpressionRef {
column_expr_ref!("add.modificationTime"),
column_expr_ref!("add.stats"),
column_expr_ref!("add.deletionVector"),
Arc::new(Expression::Struct(vec![column_expr_ref!(
"add.partitionValues"
)])),
Arc::new(Expression::Struct(vec![
column_expr_ref!("add.partitionValues"),
column_expr_ref!("add.baseRowId"),
column_expr_ref!("add.defaultRowCommitVersion"),
column_expr_ref!("add.tags"),
])),
]))
});
EXPR.clone()
Expand All @@ -311,6 +330,9 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
column_expr_ref!("modificationTime"),
column_expr_ref!("stats"),
column_expr_ref!("deletionVector"),
column_expr_ref!("fileConstantValues.tags"),
column_expr_ref!("fileConstantValues.baseRowId"),
column_expr_ref!("fileConstantValues.defaultRowCommitVersion"),
],
))]))
});
Expand Down
14 changes: 13 additions & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResol
use crate::listed_log_files::ListedLogFiles;
use crate::log_replay::{ActionsBatch, HasSelectionVector};
use crate::log_segment::LogSegment;
use crate::scan::log_replay::BASE_ROW_ID_NAME;
use crate::scan::state::{DvInfo, Stats};
use crate::schema::ToSchema as _;
use crate::schema::{
Expand Down Expand Up @@ -498,6 +499,8 @@ impl Scan {
_existing_predicate: Option<PredicateRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> {
static RESTORED_ADD_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
use crate::scan::log_replay::DEFAULT_ROW_COMMIT_VERSION_NAME;

let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
DataType::struct_type_unchecked(vec![StructField::nullable(
"add",
Expand All @@ -508,6 +511,12 @@ impl Scan {
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable(
"tags",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
]),
)])
});
Expand Down Expand Up @@ -737,7 +746,10 @@ impl Scan {
/// cardinality: long,
/// },
/// fileConstantValues: {
/// partitionValues: map<string, string>
/// partitionValues: map<string, string>,
/// tags: map<string, string>,
/// baseRowId: long,
/// defaultRowCommitVersion: long,
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 10,
getters.len() == 13,
Error::InternalError(format!(
"Wrong number of ScanFileVisitor getters: {}",
getters.len()
Expand Down
Loading
Loading