feat(delta-lake): support column mapping for reads#7005
Conversation
Read Delta tables with `delta.columnMapping.mode = id|name` by routing the existing Parquet field-id rename path through delta-rs's column metadata. - `_iter_mapped_fields` walks the Delta schema (incl. nested structs/arrays/maps) and builds a `field_id -> PyField(logical_name, dtype)` map, passed to `ParquetSourceConfig.field_id_mapping` so the Rust reader renames physical parquet columns to logical names at read time. - Stats columns from `add_actions` (keyed by physical name) get translated to logical names via a top-level `physical -> logical` map. - Strip the embedded `ARROW:schema` hint after a field-id rename (`metadata.rs`): the hint still carries physical names and disagrees with the renamed parquet type tree, causing arrow-rs to fail with `incompatible arrow schema, expected field named <physical> got <logical>`. - Tests hand-craft `_delta_log` JSON + parquet (delta-rs 1.5.x cannot enable columnMapping via the Python writer). Covers `id` / `name` modes across flat, projection, partitioned, nested-struct, and malformed-table cases. Closes Eventual-Inc#1955
Greptile SummaryThis PR enables reading Delta Lake tables with
Confidence Score: 5/5Safe to merge; changes are well-scoped to column-mapped tables and reuse the existing Iceberg field-ID rename path without touching non-column-mapped reads. Column-mapping logic is correctly isolated behind a cm_mode guard, existing table reads are unaffected, and the ARROW:schema stripping flag is explicitly threaded so it cannot regress the raw-string or Iceberg paths. The stats physical→logical translation in delta_lake_scan.py is the one path not exercised by any test fixture; worth revisiting when stats-aware fixtures are added. Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant DeltaLakeScanOperator
participant delta_rs
participant ParquetReader
participant metadata_rs
User->>DeltaLakeScanOperator: read_deltalake(path)
DeltaLakeScanOperator->>delta_rs: DeltaTable(path)
delta_rs-->>DeltaLakeScanOperator: table + schema
DeltaLakeScanOperator->>DeltaLakeScanOperator: _column_mapping_maps(schema)
DeltaLakeScanOperator->>delta_rs: get_add_actions()
delta_rs-->>DeltaLakeScanOperator: add_actions (stats keyed by physical names)
loop per add action
DeltaLakeScanOperator->>DeltaLakeScanOperator: translate stats physical to logical
DeltaLakeScanOperator-->>User: ScanTask with field_id_mapping
end
User->>ParquetReader: execute ScanTask
ParquetReader->>metadata_rs: apply_field_ids_to_arrowrs_parquet_metadata
metadata_rs->>metadata_rs: rewrite parquet type tree
metadata_rs->>metadata_rs: strip ARROW schema hint
metadata_rs-->>ParquetReader: renamed ParquetMetaData
ParquetReader-->>User: DataFrame with logical column names
Reviews (2): Last reviewed commit: "style: hoist inline imports to top of fi..." | Re-trigger Greptile |
| def _delta_field_to_pyfield(field: deltalake.schema.Field) -> PyField: | ||
| """Convert a Delta `Field` to a Daft `PyField` carrying its logical name and real dtype.""" | ||
| from deltalake.schema import Schema | ||
|
|
||
| from daft.io.delta_lake._deltalake import delta_schema_to_pyarrow | ||
|
|
||
| pa_field = delta_schema_to_pyarrow(Schema([field])).field(0) | ||
| return PyField.create(field.name, DataType.from_arrow_type(pa_field.type)._dtype) | ||
|
|
||
|
|
||
| def _iter_mapped_fields(schema: deltalake.Schema) -> Iterator[deltalake.schema.Field]: | ||
| """Yield every Delta `Field` in the schema that carries column-mapping metadata. | ||
|
|
||
| Per Delta protocol, list elements / map keys / map values are anonymous (no | ||
| `columnMapping.id`), but their *element type* may still contain mapped struct | ||
| fields (e.g. `array<struct<...>>`). We recurse through container types but only | ||
| yield fields that actually carry the mapping metadata. | ||
| """ | ||
| from deltalake.schema import ArrayType, MapType, StructType | ||
|
|
||
| def walk_type(t: object) -> Iterator[deltalake.schema.Field]: | ||
| if isinstance(t, StructType): | ||
| for sub in t.fields: | ||
| if (sub.metadata or {}).get(_CM_ID_KEY) is not None: | ||
| yield sub | ||
| yield from walk_type(sub.type) | ||
| elif isinstance(t, ArrayType): | ||
| yield from walk_type(t.element_type) | ||
| elif isinstance(t, MapType): | ||
| yield from walk_type(t.key_type) | ||
| yield from walk_type(t.value_type) | ||
|
|
||
| for field in schema.fields: | ||
| if (field.metadata or {}).get(_CM_ID_KEY) is not None: | ||
| yield field | ||
| yield from walk_type(field.type) |
There was a problem hiding this comment.
Inline imports inside module-level functions
_delta_field_to_pyfield and _iter_mapped_fields both place import statements inside their function bodies. Per the project's import style guide, imports should live at the top of the file rather than inline within functions or methods. The deltalake package is already imported at the module level on line 10, so ArrayType, MapType, StructType, and Schema from its sub-modules can be moved up. delta_schema_to_pyarrow from ._deltalake can also be hoisted (it's already imported inline in __init__ as a pre-existing violation, but new code should not extend the pattern). If deferring is intentional for circular-import reasons, a comment should explain it.
Rule Used: Import statements should be placed at the top of t... (source)
Learned From
Eventual-Inc/Daft#5078
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| "type": "long", | ||
| "nullable": True, | ||
| "metadata": { |
There was a problem hiding this comment.
Inline import inside test function
from deltalake import DeltaTable appears inside the test body rather than at the top of the file. Even though pytest.importorskip("deltalake") guards the test, importorskip both skips and returns the module, so the import can be pulled to the module level or obtained from the importorskip return value. Having the import inline violates the project import style guide that applies here as well.
Rule Used: Import statements should be placed at the top of t... (source)
Learned From
Eventual-Inc/Daft#5078
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
cckellogg
left a comment
There was a problem hiding this comment.
The physical→logical min/max stats rename (delta_lake_scan.py:361-384) isn't exercised by any test — none of the fixtures emit stats in the add actions, so that block never runs in CI. Could you add a predicate-pushdown test on a column-mapped table Two data files with disjoint ranges where(df["col"] == ) and assert the result.
| const FOOTER_SIZE: usize = 8; | ||
|
|
||
| /// Parquet key-value metadata key for the embedded Arrow schema hint written by | ||
| /// pyarrow/arrow-cpp. See `parquet-format-thrift` KeyValue. |
There was a problem hiding this comment.
Do other arrow clients write this as well?
|
|
||
| actions = [ | ||
| { | ||
| "protocol": { |
Changes Made
Read Delta Lake tables with
delta.columnMapping.mode = idorname. Previously raised at scan-op construction.How it works.
Builds a
field_id → PyField(logical_name, dtype)map from delta-rs's column metadata and threads it throughParquetSourceConfig.field_id_mapping, reusing the Parquet field-id rename path originally added for Iceberg.Stats columns from
add_actions(keyed by physical names) are translated to logical names via a top-levelphysical → logicallookup.ARROW:schema fix.
After a field-id rename the embedded
ARROW:schemahint still carries physical names and disagrees with the renamed parquet type tree, so arrow-rs errors withincompatible arrow schema, expected field named <physical> got <logical>.Strip the hint so arrow-rs re-infers from the renamed parquet types.
rebuild_file_metadatagains astrip_arrow_schema: boolflag; the raw-string strip path passesfalse(no rename, hint stays valid).Test fixture.
Hand-crafts
_delta_logJSON + parquet because deltalake 1.5.x cannot enable column mapping through the Python writer (CommitFailedError: Unsupported table features required: [ColumnMapping]).Coverage.
Both
idandnamemodes across flat reads, projection, partitioning, top-level nested struct (exercises walk recursion + nested rename), and a malformed-table error path.Related Issues
Closes #1955