Skip to content

Commit c26c407

Browse files
authored
fix: reorder MySQL result columns to match projected schema (#568)
When SqlTable is created with new_with_schema, the projected schema field order may differ from MySQL's physical column order. After upgrading to DataFusion 52, the new BatchCoalescer assumes column order matches the plan schema and panics on type mismatch. Reorder the RecordBatch columns by name to match the projected schema's field order in rows_to_arrow.
1 parent 27b773f commit c26c407

2 files changed

Lines changed: 132 additions & 2 deletions

File tree

core/src/sql/arrow_sql_gen/mysql.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ pub enum Error {
5858

5959
#[snafu(display("No column name for index: {index}"))]
6060
NoColumnNameForIndex { index: usize },
61+
62+
#[snafu(display("Projected schema field \"{field_name}\" not found in query result"))]
63+
FieldNotFoundInResult { field_name: String },
6164
}
6265

6366
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -567,8 +570,57 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
567570
.collect::<Vec<ArrayRef>>();
568571
let arrow_fields = arrow_fields.into_iter().flatten().collect::<Vec<Field>>();
569572
let options = &RecordBatchOptions::new().with_row_count(Some(rows.len()));
570-
RecordBatch::try_new_with_options(Arc::new(Schema::new(arrow_fields)), columns, options)
571-
.map_err(|err| Error::FailedToBuildRecordBatch { source: err })
573+
let batch =
574+
RecordBatch::try_new_with_options(Arc::new(Schema::new(arrow_fields)), columns, options)
575+
.map_err(|err| Error::FailedToBuildRecordBatch { source: err })?;
576+
577+
// When SqlTable is created with new_with_schema, the projected schema field
578+
// order may differ from MySQL's physical column order. Reorder the result
579+
// columns to match the projected schema. After upgrading to DataFusion 52,
580+
// the new BatchCoalescer assumes column order matches the plan schema and
581+
// panics on type mismatch.
582+
if let Some(schema) = projected_schema {
583+
let batch_schema = batch.schema();
584+
let needs_reorder = schema
585+
.fields()
586+
.iter()
587+
.zip(batch_schema.fields())
588+
.any(|(expected, actual)| expected.name() != actual.name());
589+
590+
if needs_reorder {
591+
let mut reordered_columns = Vec::with_capacity(schema.fields().len());
592+
let mut reordered_fields = Vec::with_capacity(schema.fields().len());
593+
594+
for field in schema.fields() {
595+
let idx = batch_schema.index_of(field.name()).map_err(|_| {
596+
Error::FieldNotFoundInResult {
597+
field_name: field.name().clone(),
598+
}
599+
})?;
600+
601+
reordered_columns.push(batch.column(idx).clone());
602+
reordered_fields.push(batch_schema.field(idx).clone());
603+
}
604+
605+
for (idx, field) in batch_schema.fields().iter().enumerate() {
606+
if schema.index_of(field.name()).is_err() {
607+
reordered_columns.push(batch.column(idx).clone());
608+
reordered_fields.push(field.as_ref().clone());
609+
}
610+
}
611+
612+
let options = &RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
613+
614+
return RecordBatch::try_new_with_options(
615+
Arc::new(Schema::new(reordered_fields)),
616+
reordered_columns,
617+
options,
618+
)
619+
.map_err(|err| Error::FailedToBuildRecordBatch { source: err });
620+
}
621+
}
622+
623+
Ok(batch)
572624
}
573625

574626
#[allow(clippy::unnecessary_wraps)]

core/tests/mysql/mod.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,83 @@ async fn test_arrow_mysql_roundtrip(
821821
.await;
822822
}
823823

824+
/// When SqlTable is created with new_with_schema, the projected schema field
825+
/// order may differ from MySQL's physical column order. rows_to_arrow must
826+
/// reorder the result columns to match the projected schema. After upgrading
827+
/// to DataFusion 52, the new BatchCoalescer assumes column order matches the
828+
/// plan schema and panics on type mismatch.
829+
async fn test_mysql_projected_schema_column_reorder(port: usize) {
830+
let create_table_stmt = "
831+
CREATE TABLE reorder_table (
832+
a INT,
833+
b VARCHAR(50),
834+
c DOUBLE
835+
);
836+
";
837+
let insert_table_stmt = "
838+
INSERT INTO reorder_table (a, b, c) VALUES (1, 'hello', 3.14);
839+
";
840+
841+
// Schema with reversed column order compared to MySQL's physical order
842+
let reordered_schema = Arc::new(Schema::new(vec![
843+
Field::new("c", DataType::Float64, true),
844+
Field::new("b", DataType::Utf8, true),
845+
Field::new("a", DataType::Int32, true),
846+
]));
847+
848+
let ctx = SessionContext::new();
849+
let pool = common::get_mysql_connection_pool(port)
850+
.await
851+
.expect("MySQL connection pool should be created");
852+
853+
let db_conn = pool
854+
.connect_direct()
855+
.await
856+
.expect("Connection should be established");
857+
858+
let _ = db_conn
859+
.execute(create_table_stmt, &[])
860+
.await
861+
.expect("MySQL table should be created");
862+
863+
let _ = db_conn
864+
.execute(insert_table_stmt, &[])
865+
.await
866+
.expect("MySQL table data should be inserted");
867+
868+
let sqltable_pool: Arc<
869+
dyn DbConnectionPool<mysql_async::Conn, &'static (dyn ToValue + Sync)>
870+
+ Send
871+
+ Sync
872+
+ 'static,
873+
> = Arc::new(pool);
874+
875+
// Use new_with_schema with a different column order than the physical table
876+
let table = SqlTable::new_with_schema(
877+
"mysql",
878+
&sqltable_pool,
879+
reordered_schema.clone(),
880+
"reorder_table",
881+
);
882+
883+
ctx.register_table("reorder_table", Arc::new(table))
884+
.expect("Table should be registered");
885+
886+
let df = ctx
887+
.sql("SELECT * FROM reorder_table")
888+
.await
889+
.expect("DataFrame should be created from query");
890+
891+
let record_batch = df.collect().await.expect("RecordBatch should be collected");
892+
assert_eq!(record_batch.len(), 1);
893+
894+
let batch = &record_batch[0];
895+
// Verify columns are in the projected schema order (c, b, a)
896+
assert_eq!(batch.schema().field(0).name(), "c");
897+
assert_eq!(batch.schema().field(1).name(), "b");
898+
assert_eq!(batch.schema().field(2).name(), "a");
899+
}
900+
824901
#[rstest]
825902
#[test_log::test(tokio::test)]
826903
async fn test_mysql_arrow_oneway() {
@@ -836,6 +913,7 @@ async fn test_mysql_arrow_oneway() {
836913
test_mysql_decimal_types_to_decimal128(port).await;
837914
test_mysql_decimal_types_to_decimal256(port).await;
838915
test_mysql_zero_date_type(port).await;
916+
test_mysql_projected_schema_column_reorder(port).await;
839917

840918
mysql_container.remove().await.expect("container to stop");
841919
}

0 commit comments

Comments
 (0)