Skip to content

Commit 004a781

Browse files
Shefeek JinnahShefeek Jinnah
authored andcommitted
fix: handle renamed columns by matching Parquet field_id to DuckLake column_id (#24)
1 parent e5157f6 commit 004a781

9 files changed

Lines changed: 1608 additions & 41 deletions

Cargo.lock

Lines changed: 691 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/column_rename.rs

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
//! Custom execution plan for renaming columns
2+
//!
3+
//! This module implements a DataFusion execution plan that wraps a scan
4+
//! and renames columns from their original Parquet names to current DuckLake names.
5+
//! This is needed when columns have been renamed in DuckLake metadata but the
6+
//! Parquet files still have the original column names.
7+
8+
use std::any::Any;
9+
use std::collections::HashMap;
10+
use std::pin::Pin;
11+
use std::sync::Arc;
12+
use std::task::{Context, Poll};
13+
14+
use arrow::datatypes::{Field, Schema, SchemaRef};
15+
use arrow::record_batch::RecordBatch;
16+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
17+
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
18+
use datafusion::physical_expr::EquivalenceProperties;
19+
use datafusion::physical_plan::execution_plan::Boundedness;
20+
use datafusion::physical_plan::{
21+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
22+
};
23+
use futures::Stream;
24+
25+
/// Custom execution plan that renames columns from Parquet file names to current DuckLake names
26+
#[derive(Debug)]
27+
pub struct ColumnRenameExec {
28+
/// The input execution plan (typically ParquetExec)
29+
input: Arc<dyn ExecutionPlan>,
30+
/// Output schema with renamed columns
31+
output_schema: SchemaRef,
32+
/// Mapping from old column names to new column names (for display purposes)
33+
name_mapping: HashMap<String, String>,
34+
/// Cached plan properties with updated schema
35+
properties: PlanProperties,
36+
}
37+
38+
impl ColumnRenameExec {
39+
pub fn new(
40+
input: Arc<dyn ExecutionPlan>,
41+
output_schema: SchemaRef,
42+
name_mapping: HashMap<String, String>,
43+
) -> Self {
44+
// PlanProperties must use output schema for DataFusion schema validation
45+
let eq_props = EquivalenceProperties::new(output_schema.clone());
46+
let properties = PlanProperties::new(
47+
eq_props,
48+
input.output_partitioning().clone(),
49+
input.pipeline_behavior(),
50+
Boundedness::Bounded,
51+
);
52+
53+
Self {
54+
input,
55+
output_schema,
56+
name_mapping,
57+
properties,
58+
}
59+
}
60+
}
61+
62+
impl DisplayAs for ColumnRenameExec {
63+
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64+
match t {
65+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
66+
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
67+
},
68+
DisplayFormatType::TreeRender => {
69+
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
70+
},
71+
}
72+
}
73+
}
74+
75+
impl ExecutionPlan for ColumnRenameExec {
76+
fn name(&self) -> &str {
77+
"ColumnRenameExec"
78+
}
79+
80+
fn as_any(&self) -> &dyn Any {
81+
self
82+
}
83+
84+
fn properties(&self) -> &PlanProperties {
85+
&self.properties
86+
}
87+
88+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
89+
vec![&self.input]
90+
}
91+
92+
fn with_new_children(
93+
self: Arc<Self>,
94+
children: Vec<Arc<dyn ExecutionPlan>>,
95+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
96+
if children.len() != 1 {
97+
return Err(DataFusionError::Internal(
98+
"ColumnRenameExec expects exactly one child".into(),
99+
));
100+
}
101+
102+
Ok(Arc::new(ColumnRenameExec::new(
103+
children[0].clone(),
104+
self.output_schema.clone(),
105+
self.name_mapping.clone(),
106+
)))
107+
}
108+
109+
fn execute(
110+
&self,
111+
partition: usize,
112+
context: Arc<TaskContext>,
113+
) -> DataFusionResult<SendableRecordBatchStream> {
114+
let input_stream = self.input.execute(partition, context)?;
115+
116+
Ok(Box::pin(ColumnRenameStream {
117+
input: input_stream,
118+
output_schema: self.output_schema.clone(),
119+
}))
120+
}
121+
}
122+
123+
/// Stream that renames columns in output batches
124+
struct ColumnRenameStream {
125+
input: SendableRecordBatchStream,
126+
output_schema: SchemaRef,
127+
}
128+
129+
impl Stream for ColumnRenameStream {
130+
type Item = DataFusionResult<RecordBatch>;
131+
132+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133+
match Pin::new(&mut self.input).poll_next(cx) {
134+
Poll::Ready(Some(Ok(batch))) => {
135+
let result = if batch.num_columns() == 0 {
136+
// COUNT(*) case: preserve row count with empty schema
137+
use arrow::record_batch::RecordBatchOptions;
138+
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
139+
RecordBatch::try_new_with_options(self.output_schema.clone(), vec![], &options)
140+
} else {
141+
RecordBatch::try_new(self.output_schema.clone(), batch.columns().to_vec())
142+
};
143+
144+
match result {
145+
Ok(renamed_batch) => Poll::Ready(Some(Ok(renamed_batch))),
146+
Err(e) => {
147+
Poll::Ready(Some(Err(DataFusionError::ArrowError(Box::new(e), None))))
148+
},
149+
}
150+
},
151+
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
152+
Poll::Ready(None) => Poll::Ready(None),
153+
Poll::Pending => Poll::Pending,
154+
}
155+
}
156+
}
157+
158+
impl RecordBatchStream for ColumnRenameStream {
159+
fn schema(&self) -> SchemaRef {
160+
self.output_schema.clone()
161+
}
162+
}
163+
164+
/// Build a schema with renamed fields, preserving data types and metadata.
165+
pub fn build_renamed_schema(
166+
input_schema: &Schema,
167+
name_mapping: &HashMap<String, String>,
168+
) -> Schema {
169+
let renamed_fields: Vec<Field> = input_schema
170+
.fields()
171+
.iter()
172+
.map(|field| {
173+
let new_name = name_mapping
174+
.get(field.name())
175+
.cloned()
176+
.unwrap_or_else(|| field.name().clone());
177+
Field::new(new_name, field.data_type().clone(), field.is_nullable())
178+
.with_metadata(field.metadata().clone())
179+
})
180+
.collect();
181+
182+
Schema::new(renamed_fields).with_metadata(input_schema.metadata().clone())
183+
}
184+
185+
#[cfg(test)]
186+
mod tests {
187+
use super::*;
188+
use arrow::datatypes::DataType;
189+
use datafusion::physical_plan::EmptyRecordBatchStream;
190+
191+
#[test]
192+
fn test_build_renamed_schema() {
193+
let input_schema = Schema::new(vec![
194+
Field::new("old_id", DataType::Int32, false),
195+
Field::new("old_name", DataType::Utf8, true),
196+
]);
197+
198+
let mut name_mapping = HashMap::new();
199+
name_mapping.insert("old_id".to_string(), "new_id".to_string());
200+
name_mapping.insert("old_name".to_string(), "new_name".to_string());
201+
202+
let renamed_schema = build_renamed_schema(&input_schema, &name_mapping);
203+
204+
assert_eq!(renamed_schema.fields().len(), 2);
205+
assert_eq!(renamed_schema.field(0).name(), "new_id");
206+
assert_eq!(renamed_schema.field(1).name(), "new_name");
207+
assert_eq!(renamed_schema.field(0).data_type(), &DataType::Int32);
208+
assert_eq!(renamed_schema.field(1).data_type(), &DataType::Utf8);
209+
}
210+
211+
#[test]
212+
fn test_build_renamed_schema_partial_mapping() {
213+
// Test when only some columns are renamed
214+
let input_schema = Schema::new(vec![
215+
Field::new("col1", DataType::Int32, false),
216+
Field::new("col2", DataType::Utf8, true),
217+
Field::new("col3", DataType::Float64, false),
218+
]);
219+
220+
let mut name_mapping = HashMap::new();
221+
name_mapping.insert("col1".to_string(), "renamed_col1".to_string());
222+
// col2 and col3 are not renamed
223+
224+
let renamed_schema = build_renamed_schema(&input_schema, &name_mapping);
225+
226+
assert_eq!(renamed_schema.field(0).name(), "renamed_col1");
227+
assert_eq!(renamed_schema.field(1).name(), "col2"); // unchanged
228+
assert_eq!(renamed_schema.field(2).name(), "col3"); // unchanged
229+
}
230+
231+
#[test]
232+
fn test_column_rename_stream_schema() {
233+
let input_schema = Arc::new(Schema::new(vec![Field::new(
234+
"old_col",
235+
DataType::Int32,
236+
false,
237+
)]));
238+
239+
let output_schema = Arc::new(Schema::new(vec![Field::new(
240+
"new_col",
241+
DataType::Int32,
242+
false,
243+
)]));
244+
245+
let stream = ColumnRenameStream {
246+
input: Box::pin(EmptyRecordBatchStream::new(input_schema)),
247+
output_schema: output_schema.clone(),
248+
};
249+
250+
// The stream should report the output schema
251+
assert_eq!(stream.schema().field(0).name(), "new_col");
252+
}
253+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
//! ```
3737
3838
pub mod catalog;
39+
pub mod column_rename;
3940
pub mod delete_filter;
4041
pub mod encryption;
4142
pub mod error;

src/metadata_provider.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub const SQL_LIST_TABLES: &str =
1717
AND ? >= begin_snapshot
1818
AND (? < end_snapshot OR end_snapshot IS NULL)";
1919

20-
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type
20+
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type, nulls_allowed
2121
FROM ducklake_column
2222
WHERE table_id = ?
2323
ORDER BY column_order";
@@ -116,7 +116,8 @@ pub const SQL_LIST_ALL_COLUMNS: &str = "
116116
t.table_name,
117117
c.column_id,
118118
c.column_name,
119-
c.column_type
119+
c.column_type,
120+
c.nulls_allowed
120121
FROM ducklake_schema s
121122
JOIN ducklake_table t ON s.schema_id = t.schema_id
122123
JOIN ducklake_column c ON t.table_id = c.table_id
@@ -232,14 +233,22 @@ pub struct DuckLakeTableColumn {
232233
pub column_name: String,
233234
/// DuckLake type string (e.g., "varchar", "int64", "decimal(10,2)")
234235
pub column_type: String,
236+
/// Whether this column allows NULL values
237+
pub is_nullable: bool,
235238
}
236239

237240
impl DuckLakeTableColumn {
238-
pub fn new(column_id: i64, column_name: String, column_type: String) -> Self {
241+
pub fn new(
242+
column_id: i64,
243+
column_name: String,
244+
column_type: String,
245+
is_nullable: bool,
246+
) -> Self {
239247
Self {
240248
column_id,
241249
column_name,
242250
column_type,
251+
is_nullable,
243252
}
244253
}
245254
}

src/metadata_provider_duckdb.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,12 @@ impl MetadataProvider for DuckdbMetadataProvider {
145145
let column_id: i64 = row.get(0)?;
146146
let column_name: String = row.get(1)?;
147147
let column_type: String = row.get(2)?;
148+
let nulls_allowed: Option<bool> = row.get(3)?;
148149
Ok(DuckLakeTableColumn::new(
149150
column_id,
150151
column_name,
151152
column_type,
153+
nulls_allowed.unwrap_or(true),
152154
))
153155
})?
154156
.collect::<Result<Vec<_>, _>>()?;
@@ -303,10 +305,12 @@ impl MetadataProvider for DuckdbMetadataProvider {
303305
|row| {
304306
let schema_name: String = row.get(0)?;
305307
let table_name: String = row.get(1)?;
308+
let nulls_allowed: Option<bool> = row.get(5)?;
306309
let column = DuckLakeTableColumn {
307310
column_id: row.get(2)?,
308311
column_name: row.get(3)?,
309312
column_type: row.get(4)?,
313+
is_nullable: nulls_allowed.unwrap_or(true),
310314
};
311315
Ok(ColumnWithTable {
312316
schema_name,

src/metadata_provider_postgres.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl MetadataProvider for PostgresMetadataProvider {
163163
fn get_table_structure(&self, table_id: i64) -> Result<Vec<DuckLakeTableColumn>> {
164164
block_on(async {
165165
let rows = sqlx::query(
166-
"SELECT column_id, column_name, column_type
166+
"SELECT column_id, column_name, column_type, nulls_allowed
167167
FROM ducklake_column
168168
WHERE table_id = $1
169169
ORDER BY column_order",
@@ -174,10 +174,12 @@ impl MetadataProvider for PostgresMetadataProvider {
174174

175175
rows.into_iter()
176176
.map(|row| {
177+
let nulls_allowed: Option<bool> = row.try_get(3)?;
177178
Ok(DuckLakeTableColumn {
178179
column_id: row.try_get(0)?,
179180
column_name: row.try_get(1)?,
180181
column_type: row.try_get(2)?,
182+
is_nullable: nulls_allowed.unwrap_or(true),
181183
})
182184
})
183185
.collect()
@@ -375,7 +377,7 @@ impl MetadataProvider for PostgresMetadataProvider {
375377
fn list_all_columns(&self, snapshot_id: i64) -> Result<Vec<ColumnWithTable>> {
376378
block_on(async {
377379
let rows = sqlx::query(
378-
"SELECT s.schema_name, t.table_name, c.column_id, c.column_name, c.column_type
380+
"SELECT s.schema_name, t.table_name, c.column_id, c.column_name, c.column_type, c.nulls_allowed
379381
FROM ducklake_schema s
380382
JOIN ducklake_table t ON s.schema_id = t.schema_id
381383
JOIN ducklake_column c ON t.table_id = c.table_id
@@ -396,10 +398,12 @@ impl MetadataProvider for PostgresMetadataProvider {
396398
.map(|row| {
397399
let schema_name: String = row.try_get(0)?;
398400
let table_name: String = row.try_get(1)?;
401+
let nulls_allowed: Option<bool> = row.try_get(5)?;
399402
let column = DuckLakeTableColumn {
400403
column_id: row.try_get(2)?,
401404
column_name: row.try_get(3)?,
402405
column_type: row.try_get(4)?,
406+
is_nullable: nulls_allowed.unwrap_or(true),
403407
};
404408
Ok(ColumnWithTable {
405409
schema_name,

0 commit comments

Comments
 (0)