Skip to content

Commit 05c15f2

Browse files
Fix: Correctly resolve renamed columns using Parquet field_id → DuckLake column_id mapping (#24)
1 parent c82ddad commit 05c15f2

8 files changed

Lines changed: 870 additions & 32 deletions

src/column_rename.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
//! Custom execution plan for renaming columns
2+
//!
3+
//! This module implements a DataFusion execution plan that wraps a scan
4+
//! and renames columns from their original Parquet names to current DuckLake names.
5+
//! This is needed when columns have been renamed in DuckLake metadata but the
6+
//! Parquet files still have the original column names.
7+
8+
use std::any::Any;
9+
use std::collections::HashMap;
10+
use std::pin::Pin;
11+
use std::sync::Arc;
12+
use std::task::{Context, Poll};
13+
14+
use arrow::datatypes::SchemaRef;
15+
use arrow::record_batch::RecordBatch;
16+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
17+
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
18+
use datafusion::physical_expr::EquivalenceProperties;
19+
use datafusion::physical_plan::execution_plan::Boundedness;
20+
use datafusion::physical_plan::{
21+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
22+
};
23+
use futures::Stream;
24+
25+
/// Custom execution plan that renames columns from Parquet file names to current DuckLake names
26+
#[derive(Debug)]
27+
pub struct ColumnRenameExec {
28+
/// The input execution plan (typically ParquetExec)
29+
input: Arc<dyn ExecutionPlan>,
30+
/// Output schema with renamed columns
31+
output_schema: SchemaRef,
32+
/// Mapping from old (Parquet) column names to new (DuckLake) column names
33+
name_mapping: HashMap<String, String>,
34+
/// Reverse mapping: new name -> old name, for looking up input columns
35+
reverse_mapping: Arc<HashMap<String, String>>,
36+
/// Cached plan properties with updated schema
37+
properties: PlanProperties,
38+
}
39+
40+
impl ColumnRenameExec {
41+
pub fn new(
42+
input: Arc<dyn ExecutionPlan>,
43+
output_schema: SchemaRef,
44+
name_mapping: HashMap<String, String>,
45+
) -> Self {
46+
// PlanProperties must use output schema for DataFusion schema validation
47+
let eq_props = EquivalenceProperties::new(Arc::clone(&output_schema));
48+
let properties = PlanProperties::new(
49+
eq_props,
50+
input.output_partitioning().clone(),
51+
input.pipeline_behavior(),
52+
Boundedness::Bounded,
53+
);
54+
55+
// Pre-compute reverse mapping once (new_name -> old_name)
56+
let reverse_mapping: HashMap<String, String> = name_mapping
57+
.iter()
58+
.map(|(old, new)| (new.clone(), old.clone()))
59+
.collect();
60+
61+
Self {
62+
input,
63+
output_schema,
64+
name_mapping,
65+
reverse_mapping: Arc::new(reverse_mapping),
66+
properties,
67+
}
68+
}
69+
}
70+
71+
impl DisplayAs for ColumnRenameExec {
72+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
73+
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
74+
}
75+
}
76+
77+
impl ExecutionPlan for ColumnRenameExec {
78+
fn name(&self) -> &str {
79+
"ColumnRenameExec"
80+
}
81+
82+
fn as_any(&self) -> &dyn Any {
83+
self
84+
}
85+
86+
fn properties(&self) -> &PlanProperties {
87+
&self.properties
88+
}
89+
90+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
91+
vec![&self.input]
92+
}
93+
94+
fn with_new_children(
95+
self: Arc<Self>,
96+
children: Vec<Arc<dyn ExecutionPlan>>,
97+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
98+
if children.len() != 1 {
99+
return Err(DataFusionError::Internal(
100+
"ColumnRenameExec expects exactly one child".into(),
101+
));
102+
}
103+
104+
// Must call new() to rebuild properties from new child's partitioning
105+
Ok(Arc::new(ColumnRenameExec::new(
106+
Arc::clone(&children[0]),
107+
Arc::clone(&self.output_schema),
108+
self.name_mapping.clone(),
109+
)))
110+
}
111+
112+
fn execute(
113+
&self,
114+
partition: usize,
115+
context: Arc<TaskContext>,
116+
) -> DataFusionResult<SendableRecordBatchStream> {
117+
let input_stream = self.input.execute(partition, context)?;
118+
119+
Ok(Box::pin(ColumnRenameStream {
120+
input: input_stream,
121+
output_schema: Arc::clone(&self.output_schema),
122+
reverse_mapping: Arc::clone(&self.reverse_mapping),
123+
}))
124+
}
125+
}
126+
127+
/// Stream that renames columns in output batches
128+
struct ColumnRenameStream {
129+
input: SendableRecordBatchStream,
130+
output_schema: SchemaRef,
131+
/// Mapping from output column name -> input column name (for renamed columns only)
132+
reverse_mapping: Arc<HashMap<String, String>>,
133+
}
134+
135+
impl Stream for ColumnRenameStream {
136+
type Item = DataFusionResult<RecordBatch>;
137+
138+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
139+
match Pin::new(&mut self.input).poll_next(cx) {
140+
Poll::Ready(Some(Ok(batch))) => {
141+
let result = if batch.num_columns() == 0 {
142+
// COUNT(*) case: preserve row count with empty schema
143+
use arrow::record_batch::RecordBatchOptions;
144+
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
145+
RecordBatch::try_new_with_options(
146+
Arc::clone(&self.output_schema),
147+
vec![],
148+
&options,
149+
)
150+
} else {
151+
// Build columns by looking up each output field in the input batch
152+
let input_schema = batch.schema();
153+
let columns: Result<Vec<_>, _> = self
154+
.output_schema
155+
.fields()
156+
.iter()
157+
.map(|output_field| {
158+
// Check if this column was renamed (new_name -> old_name)
159+
let input_name = self
160+
.reverse_mapping
161+
.get(output_field.name())
162+
.map(|s| s.as_str())
163+
.unwrap_or_else(|| output_field.name().as_str());
164+
165+
input_schema
166+
.index_of(input_name)
167+
.map(|idx| batch.column(idx).clone())
168+
})
169+
.collect();
170+
171+
match columns {
172+
Ok(cols) => RecordBatch::try_new(Arc::clone(&self.output_schema), cols),
173+
Err(e) => Err(e),
174+
}
175+
};
176+
177+
match result {
178+
Ok(renamed_batch) => Poll::Ready(Some(Ok(renamed_batch))),
179+
Err(e) => {
180+
Poll::Ready(Some(Err(DataFusionError::ArrowError(Box::new(e), None))))
181+
},
182+
}
183+
},
184+
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
185+
Poll::Ready(None) => Poll::Ready(None),
186+
Poll::Pending => Poll::Pending,
187+
}
188+
}
189+
}
190+
191+
impl RecordBatchStream for ColumnRenameStream {
192+
fn schema(&self) -> SchemaRef {
193+
Arc::clone(&self.output_schema)
194+
}
195+
}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use super::*;
200+
use arrow::datatypes::{DataType, Field, Schema};
201+
use datafusion::physical_plan::EmptyRecordBatchStream;
202+
203+
#[test]
204+
fn test_column_rename_stream_schema() {
205+
let input_schema = Arc::new(Schema::new(vec![Field::new(
206+
"old_col",
207+
DataType::Int32,
208+
false,
209+
)]));
210+
211+
let output_schema = Arc::new(Schema::new(vec![Field::new(
212+
"new_col",
213+
DataType::Int32,
214+
false,
215+
)]));
216+
217+
let mut reverse_mapping = HashMap::new();
218+
reverse_mapping.insert("new_col".to_string(), "old_col".to_string());
219+
220+
let stream = ColumnRenameStream {
221+
input: Box::pin(EmptyRecordBatchStream::new(input_schema)),
222+
output_schema: Arc::clone(&output_schema),
223+
reverse_mapping: Arc::new(reverse_mapping),
224+
};
225+
226+
// The stream should report the output schema
227+
assert_eq!(stream.schema().field(0).name(), "new_col");
228+
}
229+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
//! ```
3737
3838
pub mod catalog;
39+
pub mod column_rename;
3940
pub mod delete_filter;
4041
pub mod encryption;
4142
pub mod error;

src/metadata_provider.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub const SQL_LIST_TABLES: &str =
1717
AND ? >= begin_snapshot
1818
AND (? < end_snapshot OR end_snapshot IS NULL)";
1919

20-
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type
20+
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type, nulls_allowed
2121
FROM ducklake_column
2222
WHERE table_id = ?
2323
ORDER BY column_order";
@@ -118,7 +118,8 @@ pub const SQL_LIST_ALL_COLUMNS: &str = "
118118
t.table_name,
119119
c.column_id,
120120
c.column_name,
121-
c.column_type
121+
c.column_type,
122+
c.nulls_allowed
122123
FROM ducklake_schema s
123124
JOIN ducklake_table t ON s.schema_id = t.schema_id
124125
JOIN ducklake_column c ON t.table_id = c.table_id
@@ -236,14 +237,22 @@ pub struct DuckLakeTableColumn {
236237
pub column_name: String,
237238
/// DuckLake type string (e.g., "varchar", "int64", "decimal(10,2)")
238239
pub column_type: String,
240+
/// Whether this column allows NULL values
241+
pub is_nullable: bool,
239242
}
240243

241244
impl DuckLakeTableColumn {
242-
pub fn new(column_id: i64, column_name: String, column_type: String) -> Self {
245+
pub fn new(
246+
column_id: i64,
247+
column_name: String,
248+
column_type: String,
249+
is_nullable: bool,
250+
) -> Self {
243251
Self {
244252
column_id,
245253
column_name,
246254
column_type,
255+
is_nullable,
247256
}
248257
}
249258
}

src/metadata_provider_duckdb.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,12 @@ impl MetadataProvider for DuckdbMetadataProvider {
145145
let column_id: i64 = row.get(0)?;
146146
let column_name: String = row.get(1)?;
147147
let column_type: String = row.get(2)?;
148+
let nulls_allowed: Option<bool> = row.get(3)?;
148149
Ok(DuckLakeTableColumn::new(
149150
column_id,
150151
column_name,
151152
column_type,
153+
nulls_allowed.unwrap_or(true),
152154
))
153155
})?
154156
.collect::<Result<Vec<_>, _>>()?;
@@ -306,10 +308,12 @@ impl MetadataProvider for DuckdbMetadataProvider {
306308
|row| {
307309
let schema_name: String = row.get(0)?;
308310
let table_name: String = row.get(1)?;
311+
let nulls_allowed: Option<bool> = row.get(5)?;
309312
let column = DuckLakeTableColumn {
310313
column_id: row.get(2)?,
311314
column_name: row.get(3)?,
312315
column_type: row.get(4)?,
316+
is_nullable: nulls_allowed.unwrap_or(true),
313317
};
314318
Ok(ColumnWithTable {
315319
schema_name,

src/metadata_provider_postgres.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl MetadataProvider for PostgresMetadataProvider {
174174
fn get_table_structure(&self, table_id: i64) -> Result<Vec<DuckLakeTableColumn>> {
175175
block_on(async {
176176
let rows = sqlx::query(
177-
"SELECT column_id, column_name, column_type
177+
"SELECT column_id, column_name, column_type, nulls_allowed
178178
FROM ducklake_column
179179
WHERE table_id = $1
180180
ORDER BY column_order",
@@ -185,10 +185,12 @@ impl MetadataProvider for PostgresMetadataProvider {
185185

186186
rows.into_iter()
187187
.map(|row| {
188+
let nulls_allowed: Option<bool> = row.try_get(3)?;
188189
Ok(DuckLakeTableColumn {
189190
column_id: row.try_get(0)?,
190191
column_name: row.try_get(1)?,
191192
column_type: row.try_get(2)?,
193+
is_nullable: nulls_allowed.unwrap_or(true),
192194
})
193195
})
194196
.collect()
@@ -390,7 +392,7 @@ impl MetadataProvider for PostgresMetadataProvider {
390392
fn list_all_columns(&self, snapshot_id: i64) -> Result<Vec<ColumnWithTable>> {
391393
block_on(async {
392394
let rows = sqlx::query(
393-
"SELECT s.schema_name, t.table_name, c.column_id, c.column_name, c.column_type
395+
"SELECT s.schema_name, t.table_name, c.column_id, c.column_name, c.column_type, c.nulls_allowed
394396
FROM ducklake_schema s
395397
JOIN ducklake_table t ON s.schema_id = t.schema_id
396398
JOIN ducklake_column c ON t.table_id = c.table_id
@@ -411,10 +413,12 @@ impl MetadataProvider for PostgresMetadataProvider {
411413
.map(|row| {
412414
let schema_name: String = row.try_get(0)?;
413415
let table_name: String = row.try_get(1)?;
416+
let nulls_allowed: Option<bool> = row.try_get(5)?;
414417
let column = DuckLakeTableColumn {
415418
column_id: row.try_get(2)?,
416419
column_name: row.try_get(3)?,
417420
column_type: row.try_get(4)?,
421+
is_nullable: nulls_allowed.unwrap_or(true),
418422
};
419423
Ok(ColumnWithTable {
420424
schema_name,

0 commit comments

Comments
 (0)