Skip to content

Commit 0cec8f8

Browse files
authored
fix: configurable fallback when parquet vectorized reader is disabled (#4352) (#4355)
1 parent b1c586a commit 0cec8f8

6 files changed

Lines changed: 35 additions & 140 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,19 @@ object CometConf extends ShimCometConf {
783783
.booleanConf
784784
.createWithDefault(true)
785785

786+
val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] =
787+
conf("spark.comet.scan.allowDisabledParquetVectorizedReader")
788+
.category(CATEGORY_SCAN)
789+
.doc(
790+
"Whether to allow Comet's native scan to replace the Parquet scan when Spark's " +
791+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " +
792+
"(false), Comet falls back to Spark in that case, because Comet's native readers " +
793+
"mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " +
794+
"(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " +
795+
s"that Comet has no equivalent for. $COMPAT_GUIDE.")
796+
.booleanConf
797+
.createWithDefault(false)
798+
786799
val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
787800
conf("spark.comet.exec.strictFloatingPoint")
788801
.category(CATEGORY_EXEC)

dev/diffs/4.0.2.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2877,18 +2877,9 @@ index 0acb21f3e6f..e7c65429119 100644
28772877
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
28782878
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
28792879
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2880-
index 09ed6955a51..82924c83eb5 100644
2880+
index 09ed6955a51..236a4e99824 100644
28812881
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
28822882
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2883-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
2884-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2885-
2886-
import org.apache.spark.SparkException
2887-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2888-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
2889-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2890-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
2891-
import org.apache.spark.sql.functions.col
28922883
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
28932884
withClue(
28942885
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -2919,66 +2910,6 @@ index 09ed6955a51..82924c83eb5 100644
29192910
)
29202911
}
29212912
test(s"parquet widening conversion $fromType -> $toType") {
2922-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
2923-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
2924-
)
2925-
}
2926-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2927-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2928-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2929-
checkAllParquetReaders(values, fromType, toType, expectError = true)
2930-
}
2931-
2932-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
2933-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
2934-
)
2935-
}
2936-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2937-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2938-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2939-
checkAllParquetReaders(values, fromType, toType,
2940-
expectError =
2941-
// parquet-mr allows reading decimals into a smaller precision decimal type without
2942-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
2943-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
2944-
outputTimestampType <- ParquetOutputTimestampType.values
2945-
}
2946-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
2947-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
2948-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2949-
withSQLConf(
2950-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
2951-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
2952-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
2953-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
2954-
}
2955-
test(
2956-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
2957-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
2958-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2959-
checkAllParquetReaders(
2960-
values = Seq("1.23", "10.34"),
2961-
fromType = DecimalType(fromPrecision, 2),
2962-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
2963-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
2964-
}
2965-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
2966-
- s"Decimal($toPrecision, $toScale)"
2967-
+ s"Decimal($toPrecision, $toScale)",
2968-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
2969-
) {
2970-
checkAllParquetReaders(
2971-
values = Seq("1.23", "10.34"),
2972-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
2973-
)
2974-
}
2975-
2976-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
2977-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
2978-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2979-
withTempDir { dir =>
2980-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
2981-
writeParquetFiles(
29822913
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
29832914
index b8f3ea3c6f3..bbd44221288 100644
29842915
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

dev/diffs/4.1.1.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3036,18 +3036,9 @@ index 56076175d60..5872d9962cc 100644
30363036
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
30373037
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
30383038
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3039-
index 09ed6955a51..82924c83eb5 100644
3039+
index 09ed6955a51..236a4e99824 100644
30403040
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
30413041
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3042-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
3043-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
3044-
3045-
import org.apache.spark.SparkException
3046-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
3047-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
3048-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3049-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
3050-
import org.apache.spark.sql.functions.col
30513042
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
30523043
withClue(
30533044
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -3078,66 +3069,6 @@ index 09ed6955a51..82924c83eb5 100644
30783069
)
30793070
}
30803071
test(s"parquet widening conversion $fromType -> $toType") {
3081-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
3082-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
3083-
)
3084-
}
3085-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3086-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3087-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3088-
checkAllParquetReaders(values, fromType, toType, expectError = true)
3089-
}
3090-
3091-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
3092-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
3093-
)
3094-
}
3095-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3096-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3097-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3098-
checkAllParquetReaders(values, fromType, toType,
3099-
expectError =
3100-
// parquet-mr allows reading decimals into a smaller precision decimal type without
3101-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
3102-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
3103-
outputTimestampType <- ParquetOutputTimestampType.values
3104-
}
3105-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
3106-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
3107-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3108-
withSQLConf(
3109-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
3110-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
3111-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
3112-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
3113-
}
3114-
test(
3115-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
3116-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
3117-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3118-
checkAllParquetReaders(
3119-
values = Seq("1.23", "10.34"),
3120-
fromType = DecimalType(fromPrecision, 2),
3121-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
3122-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
3123-
}
3124-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
3125-
- s"Decimal($toPrecision, $toScale)"
3126-
+ s"Decimal($toPrecision, $toScale)",
3127-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
3128-
) {
3129-
checkAllParquetReaders(
3130-
values = Seq("1.23", "10.34"),
3131-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
3132-
)
3133-
}
3134-
3135-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
3136-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
3137-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3138-
withTempDir { dir =>
3139-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
3140-
writeParquetFiles(
31413072
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
31423073
index 1cc6d3afbee..8275727fbb4 100644
31433074
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala

docs/source/user-guide/latest/compatibility/scans.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b
7979
- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns)
8080
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
8181
matching Spark's behavior.
82+
- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into
83+
Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader
84+
does not replicate. By default Comet falls back to Spark in this case. Set
85+
`spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the
86+
`native_datafusion` scan regardless. See
87+
[#4352](https://github.com/apache/datafusion-comet/issues/4352).
8288

8389
The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
8490
without falling back to Spark:

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession)
207207
s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
208208
return None
209209
}
210+
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
211+
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
212+
// backend, so by default fall back to Spark. Users can opt in to letting Comet
213+
// replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.
214+
if (!conf.parquetVectorizedReaderEnabled &&
215+
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
216+
withInfo(
217+
scanExec,
218+
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
219+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
220+
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
221+
return None
222+
}
210223
if (!CometNativeScan.isSupported(scanExec)) {
211224
return None
212225
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ abstract class CometTestBase
8686
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8787
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
8888
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
89+
conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true")
8990
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
9091
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
9192
// SortOrder is incompatible for mixed zero and negative zero floating point values, but

0 commit comments

Comments
 (0)