Skip to content

Commit 627a5af

Browse files
optimizing logic in column rename
1 parent 9e8cb22 commit 627a5af

1 file changed

Lines changed: 35 additions & 94 deletions

File tree

src/column_rename.rs

Lines changed: 35 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::pin::Pin;
1111
use std::sync::Arc;
1212
use std::task::{Context, Poll};
1313

14-
use arrow::datatypes::{Field, Schema, SchemaRef};
14+
use arrow::datatypes::SchemaRef;
1515
use arrow::record_batch::RecordBatch;
1616
use datafusion::error::{DataFusionError, Result as DataFusionResult};
1717
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
@@ -29,8 +29,10 @@ pub struct ColumnRenameExec {
2929
input: Arc<dyn ExecutionPlan>,
3030
/// Output schema with renamed columns
3131
output_schema: SchemaRef,
32-
/// Mapping from old column names to new column names (for display purposes)
32+
/// Mapping from old (Parquet) column names to new (DuckLake) column names
3333
name_mapping: HashMap<String, String>,
34+
/// Reverse mapping: new name -> old name, for looking up input columns
35+
reverse_mapping: Arc<HashMap<String, String>>,
3436
/// Cached plan properties with updated schema
3537
properties: PlanProperties,
3638
}
@@ -42,33 +44,33 @@ impl ColumnRenameExec {
4244
name_mapping: HashMap<String, String>,
4345
) -> Self {
4446
// PlanProperties must use output schema for DataFusion schema validation
45-
let eq_props = EquivalenceProperties::new(output_schema.clone());
47+
let eq_props = EquivalenceProperties::new(Arc::clone(&output_schema));
4648
let properties = PlanProperties::new(
4749
eq_props,
4850
input.output_partitioning().clone(),
4951
input.pipeline_behavior(),
5052
Boundedness::Bounded,
5153
);
5254

55+
// Pre-compute reverse mapping once (new_name -> old_name)
56+
let reverse_mapping: HashMap<String, String> = name_mapping
57+
.iter()
58+
.map(|(old, new)| (new.clone(), old.clone()))
59+
.collect();
60+
5361
Self {
5462
input,
5563
output_schema,
5664
name_mapping,
65+
reverse_mapping: Arc::new(reverse_mapping),
5766
properties,
5867
}
5968
}
6069
}
6170

6271
impl DisplayAs for ColumnRenameExec {
63-
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64-
match t {
65-
DisplayFormatType::Default | DisplayFormatType::Verbose => {
66-
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
67-
},
68-
DisplayFormatType::TreeRender => {
69-
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
70-
},
71-
}
72+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
73+
write!(f, "ColumnRenameExec: renames={}", self.name_mapping.len())
7274
}
7375
}
7476

@@ -99,11 +101,14 @@ impl ExecutionPlan for ColumnRenameExec {
99101
));
100102
}
101103

102-
Ok(Arc::new(ColumnRenameExec::new(
103-
children[0].clone(),
104-
self.output_schema.clone(),
105-
self.name_mapping.clone(),
106-
)))
104+
// Reuse existing reverse_mapping since name_mapping hasn't changed
105+
Ok(Arc::new(Self {
106+
input: Arc::clone(&children[0]),
107+
output_schema: Arc::clone(&self.output_schema),
108+
name_mapping: self.name_mapping.clone(),
109+
reverse_mapping: Arc::clone(&self.reverse_mapping),
110+
properties: self.properties.clone(),
111+
}))
107112
}
108113

109114
fn execute(
@@ -113,17 +118,10 @@ impl ExecutionPlan for ColumnRenameExec {
113118
) -> DataFusionResult<SendableRecordBatchStream> {
114119
let input_stream = self.input.execute(partition, context)?;
115120

116-
// Invert mapping: new_name -> old_name (for looking up input columns)
117-
let reverse_mapping: HashMap<String, String> = self
118-
.name_mapping
119-
.iter()
120-
.map(|(old, new)| (new.clone(), old.clone()))
121-
.collect();
122-
123121
Ok(Box::pin(ColumnRenameStream {
124122
input: input_stream,
125-
output_schema: self.output_schema.clone(),
126-
reverse_mapping,
123+
output_schema: Arc::clone(&self.output_schema),
124+
reverse_mapping: Arc::clone(&self.reverse_mapping),
127125
}))
128126
}
129127
}
@@ -133,7 +131,7 @@ struct ColumnRenameStream {
133131
input: SendableRecordBatchStream,
134132
output_schema: SchemaRef,
135133
/// Mapping from output column name -> input column name (for renamed columns only)
136-
reverse_mapping: HashMap<String, String>,
134+
reverse_mapping: Arc<HashMap<String, String>>,
137135
}
138136

139137
impl Stream for ColumnRenameStream {
@@ -146,7 +144,11 @@ impl Stream for ColumnRenameStream {
146144
// COUNT(*) case: preserve row count with empty schema
147145
use arrow::record_batch::RecordBatchOptions;
148146
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
149-
RecordBatch::try_new_with_options(self.output_schema.clone(), vec![], &options)
147+
RecordBatch::try_new_with_options(
148+
Arc::clone(&self.output_schema),
149+
vec![],
150+
&options,
151+
)
150152
} else {
151153
// Build columns by looking up each output field in the input batch
152154
let input_schema = batch.schema();
@@ -169,7 +171,7 @@ impl Stream for ColumnRenameStream {
169171
.collect();
170172

171173
match columns {
172-
Ok(cols) => RecordBatch::try_new(self.output_schema.clone(), cols),
174+
Ok(cols) => RecordBatch::try_new(Arc::clone(&self.output_schema), cols),
173175
Err(e) => Err(e),
174176
}
175177
};
@@ -190,77 +192,16 @@ impl Stream for ColumnRenameStream {
190192

191193
impl RecordBatchStream for ColumnRenameStream {
192194
fn schema(&self) -> SchemaRef {
193-
self.output_schema.clone()
195+
Arc::clone(&self.output_schema)
194196
}
195197
}
196198

197-
/// Build a schema with renamed fields, preserving data types and metadata.
198-
pub fn build_renamed_schema(
199-
input_schema: &Schema,
200-
name_mapping: &HashMap<String, String>,
201-
) -> Schema {
202-
let renamed_fields: Vec<Field> = input_schema
203-
.fields()
204-
.iter()
205-
.map(|field| {
206-
let new_name = name_mapping
207-
.get(field.name())
208-
.cloned()
209-
.unwrap_or_else(|| field.name().clone());
210-
Field::new(new_name, field.data_type().clone(), field.is_nullable())
211-
.with_metadata(field.metadata().clone())
212-
})
213-
.collect();
214-
215-
Schema::new(renamed_fields).with_metadata(input_schema.metadata().clone())
216-
}
217-
218199
#[cfg(test)]
219200
mod tests {
220201
use super::*;
221-
use arrow::datatypes::DataType;
202+
use arrow::datatypes::{DataType, Field, Schema};
222203
use datafusion::physical_plan::EmptyRecordBatchStream;
223204

224-
#[test]
225-
fn test_build_renamed_schema() {
226-
let input_schema = Schema::new(vec![
227-
Field::new("old_id", DataType::Int32, false),
228-
Field::new("old_name", DataType::Utf8, true),
229-
]);
230-
231-
let mut name_mapping = HashMap::new();
232-
name_mapping.insert("old_id".to_string(), "new_id".to_string());
233-
name_mapping.insert("old_name".to_string(), "new_name".to_string());
234-
235-
let renamed_schema = build_renamed_schema(&input_schema, &name_mapping);
236-
237-
assert_eq!(renamed_schema.fields().len(), 2);
238-
assert_eq!(renamed_schema.field(0).name(), "new_id");
239-
assert_eq!(renamed_schema.field(1).name(), "new_name");
240-
assert_eq!(renamed_schema.field(0).data_type(), &DataType::Int32);
241-
assert_eq!(renamed_schema.field(1).data_type(), &DataType::Utf8);
242-
}
243-
244-
#[test]
245-
fn test_build_renamed_schema_partial_mapping() {
246-
// Test when only some columns are renamed
247-
let input_schema = Schema::new(vec![
248-
Field::new("col1", DataType::Int32, false),
249-
Field::new("col2", DataType::Utf8, true),
250-
Field::new("col3", DataType::Float64, false),
251-
]);
252-
253-
let mut name_mapping = HashMap::new();
254-
name_mapping.insert("col1".to_string(), "renamed_col1".to_string());
255-
// col2 and col3 are not renamed
256-
257-
let renamed_schema = build_renamed_schema(&input_schema, &name_mapping);
258-
259-
assert_eq!(renamed_schema.field(0).name(), "renamed_col1");
260-
assert_eq!(renamed_schema.field(1).name(), "col2"); // unchanged
261-
assert_eq!(renamed_schema.field(2).name(), "col3"); // unchanged
262-
}
263-
264205
#[test]
265206
fn test_column_rename_stream_schema() {
266207
let input_schema = Arc::new(Schema::new(vec![Field::new(
@@ -280,8 +221,8 @@ mod tests {
280221

281222
let stream = ColumnRenameStream {
282223
input: Box::pin(EmptyRecordBatchStream::new(input_schema)),
283-
output_schema: output_schema.clone(),
284-
reverse_mapping,
224+
output_schema: Arc::clone(&output_schema),
225+
reverse_mapping: Arc::new(reverse_mapping),
285226
};
286227

287228
// The stream should report the output schema

0 commit comments

Comments
 (0)