Skip to content

Commit e31e915

Browse files
committed
fix: enabling correctly pulling partition values out of column mapped tables
Juggling this unit test took a *long* time but found a few other places that needed to use the right names in the process 😩 Signed-off-by: R. Tyler Croy <[email protected]>
1 parent 5f79945 commit e31e915

File tree

1 file changed

+118
-4
lines changed

1 file changed

+118
-4
lines changed

crates/core/src/kernel/snapshot/replay.rs

Lines changed: 118 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
88
use delta_kernel::expressions::Scalar;
99
use delta_kernel::schema::DataType;
1010
use delta_kernel::schema::PrimitiveType;
11+
use tracing::log::*;
1112

1213
use super::parse::collect_map;
1314
use crate::kernel::arrow::extract::{self as ex};
@@ -19,6 +20,7 @@ pub(crate) fn parse_partitions(
1920
partition_schema: &StructType,
2021
raw_path: &str,
2122
) -> DeltaResult<StructArray> {
23+
trace!("parse_partitions: batch: {batch:?}\npartition_schema: {partition_schema:?}\npath: {raw_path}");
2224
let partitions =
2325
ex::extract_and_cast_opt::<MapArray>(batch, raw_path).ok_or(DeltaTableError::generic(
2426
"No partitionValues column found in files batch. This is unexpected.",
@@ -29,7 +31,7 @@ pub(crate) fn parse_partitions(
2931
.fields()
3032
.map(|f| {
3133
(
32-
f.name().to_string(),
34+
f.physical_name().to_string(),
3335
Vec::<Scalar>::with_capacity(partitions.len()),
3436
)
3537
})
@@ -69,17 +71,17 @@ pub(crate) fn parse_partitions(
6971

7072
partition_schema.fields().for_each(|f| {
7173
let value = data
72-
.get(f.name())
74+
.get(f.physical_name())
7375
.cloned()
7476
.unwrap_or(Scalar::Null(f.data_type().clone()));
75-
values.get_mut(f.name()).unwrap().push(value);
77+
values.get_mut(f.physical_name()).unwrap().push(value);
7678
});
7779
}
7880

7981
let columns = partition_schema
8082
.fields()
8183
.map(|f| {
82-
let values = values.get(f.name()).unwrap();
84+
let values = values.get(f.physical_name()).unwrap();
8385
match f.data_type() {
8486
DataType::Primitive(p) => {
8587
// Safety: we created the Scalars above using the parsing function of the same PrimitiveType
@@ -201,3 +203,115 @@ pub(crate) fn parse_partitions(
201203
num_rows,
202204
)?)
203205
}
206+
207+
#[cfg(test)]
208+
mod tests {
209+
use super::*;
210+
use arrow::array::MapBuilder;
211+
use arrow::array::MapFieldNames;
212+
use arrow::array::StringBuilder;
213+
use arrow::datatypes::DataType as ArrowDataType;
214+
use arrow::datatypes::Field as ArrowField;
215+
use arrow::datatypes::Schema as ArrowSchema;
216+
use arrow_schema::Field;
217+
use delta_kernel::schema::{MapType, MetadataValue, SchemaRef, StructField};
218+
219+
#[test]
220+
fn test_physical_partition_name_mapping() -> crate::DeltaResult<()> {
221+
let physical_partition_name = "col-173b4db9-b5ad-427f-9e75-516aae37fbbb".to_string();
222+
let schema: SchemaRef = delta_kernel::scan::scan_row_schema().project(&[
223+
"path",
224+
"size",
225+
"fileConstantValues",
226+
])?;
227+
let partition_schema = StructType::new(vec![StructField::nullable(
228+
"Company Very Short",
229+
DataType::STRING,
230+
)
231+
.with_metadata(vec![
232+
(
233+
"delta.columnMapping.id".to_string(),
234+
MetadataValue::Number(1),
235+
),
236+
(
237+
"delta.columnMapping.physicalName".to_string(),
238+
MetadataValue::String(physical_partition_name.clone()),
239+
),
240+
])]);
241+
242+
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
243+
let file_constant_values: SchemaRef = Arc::new(StructType::new([StructField::nullable(
244+
"partitionValues",
245+
partition_values,
246+
)]));
247+
// Inspecting the schema of file_constant_values:
248+
let _: ArrowSchema = file_constant_values.as_ref().try_into_arrow()?;
249+
250+
// Constructing complex types in Arrow is hell.
251+
// Absolute hell.
252+
//
253+
// The partition column values that should be coming off the log are:
254+
// "col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"
255+
let keys_builder = StringBuilder::new();
256+
let values_builder = StringBuilder::new();
257+
let map_fields = MapFieldNames {
258+
entry: "key_value".into(),
259+
key: "key".into(),
260+
value: "value".into(),
261+
};
262+
let mut partitions = MapBuilder::new(Some(map_fields), keys_builder, values_builder);
263+
264+
// The partition named in the schema, we need to get the physical name's "rename" out though
265+
partitions
266+
.keys()
267+
.append_value(physical_partition_name.clone());
268+
partitions.values().append_value("BMS");
269+
partitions.append(true).unwrap();
270+
let partitions = partitions.finish();
271+
272+
let struct_fields = Fields::from(vec![
273+
Field::new("key", ArrowDataType::Utf8, false),
274+
Field::new("value", ArrowDataType::Utf8, true),
275+
]);
276+
let map_field = Arc::new(ArrowField::new(
277+
"key_value",
278+
ArrowDataType::Struct(struct_fields),
279+
false,
280+
));
281+
282+
let parts = StructArray::from(vec![(
283+
Arc::new(ArrowField::new(
284+
"partitionValues",
285+
ArrowDataType::Map(map_field, false),
286+
true,
287+
)),
288+
Arc::new(partitions) as ArrayRef,
289+
)]);
290+
291+
let batch = RecordBatch::try_new(
292+
Arc::new(schema.as_ref().try_into_arrow()?),
293+
vec![
294+
// path
295+
Arc::new(StringArray::from(vec!["foo.parquet".to_string()])),
296+
// size
297+
Arc::new(Int64Array::from(vec![1])),
298+
// fileConstantValues
299+
Arc::new(parts),
300+
],
301+
)?;
302+
303+
let raw_path = "fileConstantValues.partitionValues";
304+
let partitions = parse_partitions(&batch, &partition_schema, raw_path)?;
305+
assert_eq!(
306+
None,
307+
partitions.column_by_name(&physical_partition_name),
308+
"Should not have found the physical column name"
309+
);
310+
assert_ne!(
311+
None,
312+
partitions.column_by_name("Company Very Short"),
313+
"Should have found the renamed column"
314+
);
315+
Ok(())
316+
}
317+
}

0 commit comments

Comments
 (0)