Skip to content

Commit 2be791c

Browse files
committed
fix mapping issue with native_datafusion
1 parent 1413355 commit 2be791c

2 files changed

Lines changed: 95 additions & 6 deletions

File tree

native/core/src/parquet/schema_adapter.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,26 @@ impl SparkPhysicalExprAdapter {
296296
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
297297
expr.transform(|e| {
298298
if let Some(column) = e.as_any().downcast_ref::<Column>() {
299-
let col_idx = column.index();
300299
let col_name = column.name();
301300

302-
let logical_field = self.logical_file_schema.fields().get(col_idx);
303-
// Look up physical field by name instead of index for correctness
304-
// when logical and physical schemas have different column orderings
301+
// Resolve fields by name because this is the fallback path
302+
// that runs on the original expression when the default
303+
// adapter fails. The original expression was built against
304+
// the required (pruned) schema, so column indices refer to
305+
// that schema — not the logical or physical file schemas.
306+
// DataFusion's DefaultPhysicalExprAdapter::resolve_physical_column
307+
// also resolves by name for the same reason.
308+
let logical_field = if self.parquet_options.case_sensitive {
309+
self.logical_file_schema
310+
.fields()
311+
.iter()
312+
.find(|f| f.name() == col_name)
313+
} else {
314+
self.logical_file_schema
315+
.fields()
316+
.iter()
317+
.find(|f| f.name().eq_ignore_ascii_case(col_name))
318+
};
305319
let physical_field = if self.parquet_options.case_sensitive {
306320
self.physical_file_schema
307321
.fields()
@@ -314,19 +328,40 @@ impl SparkPhysicalExprAdapter {
314328
.find(|f| f.name().eq_ignore_ascii_case(col_name))
315329
};
316330

317-
if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
331+
// Remap the column index to the physical file schema so
332+
// downstream evaluation reads the correct column from the
333+
// parquet batch.
334+
let physical_index = if self.parquet_options.case_sensitive {
335+
self.physical_file_schema.index_of(col_name).ok()
336+
} else {
337+
self.physical_file_schema
338+
.fields()
339+
.iter()
340+
.position(|f| f.name().eq_ignore_ascii_case(col_name))
341+
};
342+
343+
if let (Some(logical_field), Some(physical_field), Some(phys_idx)) =
344+
(logical_field, physical_field, physical_index)
318345
{
346+
let remapped: Arc<dyn PhysicalExpr> = if column.index() != phys_idx {
347+
Arc::new(Column::new(col_name, phys_idx))
348+
} else {
349+
Arc::clone(&e)
350+
};
351+
319352
if logical_field.data_type() != physical_field.data_type() {
320353
let cast_expr: Arc<dyn PhysicalExpr> = Arc::new(
321354
CometCastColumnExpr::new(
322-
Arc::clone(&e),
355+
remapped,
323356
Arc::clone(physical_field),
324357
Arc::clone(logical_field),
325358
None,
326359
)
327360
.with_parquet_options(self.parquet_options.clone()),
328361
);
329362
return Ok(Transformed::yes(cast_expr));
363+
} else if column.index() != phys_idx {
364+
return Ok(Transformed::yes(remapped));
330365
}
331366
}
332367
}

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.comet.exec
2121

22+
import org.apache.hadoop.fs.Path
2223
import org.scalactic.source.Position
2324
import org.scalatest.Tag
2425

@@ -602,4 +603,57 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
602603
|""".stripMargin,
603604
"select array(array(1, 2, null), array(), array(10), null, array(null)) from tbl")
604605
}
606+
607+
test("native reader - nested schema pruning with array of struct and filter") {
608+
// Regression test found during DataFusion 53 upgrade (PR #3629).
609+
// Spark's SchemaPruningSuite tests (e.g. "select a single complex field array
610+
// and in clause", "select explode of nested field of array of struct",
611+
// "SPARK-34638: nested column prune on generator output") were failing with:
612+
// native panic: called `Result::unwrap()` on an `Err` value:
613+
// Internal("Unexpected data type in GetArrayStructFields: Int32")
614+
// The root cause was wrap_all_type_mismatches in Comet's schema adapter
615+
// looking up the logical field by column index instead of by name. When
616+
// filter expressions are created against the pruned required_schema (where
617+
// "friends" is at index 0), the fallback would index into the full
618+
// logical_file_schema and get "id: Int32" instead of "friends: List<...>".
619+
withTempDir { dir =>
620+
val path = new Path(dir.toURI.toString, "test").toUri.toString
621+
622+
// Create a table with multiple columns so that nested schema pruning
623+
// can prune away unneeded columns. The friends column is an array of
624+
// structs with first/middle/last, but the query only needs first and middle.
625+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
626+
spark.sql(
627+
"""
628+
|select
629+
| 0 as id,
630+
| named_struct('first', 'Jane', 'middle', 'X.', 'last', 'Doe') as name,
631+
| '123 Main Street' as address,
632+
| 1 as pets,
633+
| array(
634+
| named_struct('first', 'Susan', 'middle', 'Z.', 'last', 'Smith')
635+
| ) as friends
636+
|union all
637+
|select
638+
| 1 as id,
639+
| named_struct('first', 'John', 'middle', 'Y.', 'last', 'Doe') as name,
640+
| '321 Wall Street' as address,
641+
| 3 as pets,
642+
| array(
643+
| named_struct('first', 'Alice', 'middle', 'A.', 'last', 'Jones')
644+
| ) as friends
645+
|""".stripMargin).repartition(1).write.parquet(path)
646+
}
647+
648+
val schema = spark.read.parquet(path).schema
649+
650+
readParquetFile(path, Some(schema)) { df =>
651+
df.createOrReplaceTempView("tbl")
652+
}
653+
654+
val query = "select friends.middle from tbl where friends.first[0] = 'Susan'"
655+
val df = sql(query)
656+
checkSparkAnswer(df)
657+
}
658+
}
605659
}

0 commit comments

Comments
 (0)