Skip to content

Commit 4bcb25c

Browse files
committed
fix: configurable fallback when parquet vectorized reader is disabled (#4352)
Comet's native_datafusion scan rejects Parquet-to-Spark conversions that Spark's vectorized reader rejects, but Spark's parquet-mr (non-vectorized) path silently overflows / nulls. Disabling spark.sql.parquet.enableVectorizedReader opts into parquet-mr semantics that Comet has no equivalent for, so by default Comet now falls back to Spark in that case. Users who want Comet to handle the scan regardless can opt in. - New config spark.comet.scan.allowDisabledParquetVectorizedReader (default false: fall back to Spark when vectorized reader is disabled). - CometScanRule.nativeDataFusionScan skips itself when the vectorized reader is disabled and the opt-in flag is false. - CometTestBase sets the flag to true so existing Comet tests continue to exercise the native scan. - Re-enables (un-ignores) the affected ParquetTypeWideningSuite tests in the 4.0.2 and 4.1.1 diffs.
1 parent dc08a96 commit 4bcb25c

5 files changed

Lines changed: 29 additions & 140 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,19 @@ object CometConf extends ShimCometConf {
783783
.booleanConf
784784
.createWithDefault(true)
785785

786+
val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] =
787+
conf("spark.comet.scan.allowDisabledParquetVectorizedReader")
788+
.category(CATEGORY_SCAN)
789+
.doc(
790+
"Whether to allow Comet's native scan to replace the Parquet scan when Spark's " +
791+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " +
792+
"(false), Comet falls back to Spark in that case, because Comet's native readers " +
793+
"mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " +
794+
"(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " +
795+
s"that Comet has no equivalent for. $COMPAT_GUIDE.")
796+
.booleanConf
797+
.createWithDefault(false)
798+
786799
val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
787800
conf("spark.comet.exec.strictFloatingPoint")
788801
.category(CATEGORY_EXEC)

dev/diffs/4.0.2.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2877,18 +2877,9 @@ index 0acb21f3e6f..e7c65429119 100644
28772877
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
28782878
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
28792879
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2880-
index 09ed6955a51..82924c83eb5 100644
2880+
index 09ed6955a51..236a4e99824 100644
28812881
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
28822882
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2883-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
2884-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2885-
2886-
import org.apache.spark.SparkException
2887-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2888-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
2889-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2890-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
2891-
import org.apache.spark.sql.functions.col
28922883
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
28932884
withClue(
28942885
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -2919,66 +2910,6 @@ index 09ed6955a51..82924c83eb5 100644
29192910
)
29202911
}
29212912
test(s"parquet widening conversion $fromType -> $toType") {
2922-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
2923-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
2924-
)
2925-
}
2926-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2927-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2928-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2929-
checkAllParquetReaders(values, fromType, toType, expectError = true)
2930-
}
2931-
2932-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
2933-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
2934-
)
2935-
}
2936-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2937-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2938-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2939-
checkAllParquetReaders(values, fromType, toType,
2940-
expectError =
2941-
// parquet-mr allows reading decimals into a smaller precision decimal type without
2942-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
2943-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
2944-
outputTimestampType <- ParquetOutputTimestampType.values
2945-
}
2946-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
2947-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
2948-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2949-
withSQLConf(
2950-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
2951-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
2952-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
2953-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
2954-
}
2955-
test(
2956-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
2957-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
2958-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2959-
checkAllParquetReaders(
2960-
values = Seq("1.23", "10.34"),
2961-
fromType = DecimalType(fromPrecision, 2),
2962-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
2963-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
2964-
}
2965-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
2966-
- s"Decimal($toPrecision, $toScale)"
2967-
+ s"Decimal($toPrecision, $toScale)",
2968-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
2969-
) {
2970-
checkAllParquetReaders(
2971-
values = Seq("1.23", "10.34"),
2972-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
2973-
)
2974-
}
2975-
2976-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
2977-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
2978-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2979-
withTempDir { dir =>
2980-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
2981-
writeParquetFiles(
29822913
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
29832914
index b8f3ea3c6f3..bbd44221288 100644
29842915
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

dev/diffs/4.1.1.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3036,18 +3036,9 @@ index 56076175d60..5872d9962cc 100644
30363036
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
30373037
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
30383038
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3039-
index 09ed6955a51..82924c83eb5 100644
3039+
index 09ed6955a51..236a4e99824 100644
30403040
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
30413041
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3042-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
3043-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
3044-
3045-
import org.apache.spark.SparkException
3046-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
3047-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
3048-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3049-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
3050-
import org.apache.spark.sql.functions.col
30513042
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
30523043
withClue(
30533044
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -3078,66 +3069,6 @@ index 09ed6955a51..82924c83eb5 100644
30783069
)
30793070
}
30803071
test(s"parquet widening conversion $fromType -> $toType") {
3081-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
3082-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
3083-
)
3084-
}
3085-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3086-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3087-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3088-
checkAllParquetReaders(values, fromType, toType, expectError = true)
3089-
}
3090-
3091-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
3092-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
3093-
)
3094-
}
3095-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3096-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3097-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3098-
checkAllParquetReaders(values, fromType, toType,
3099-
expectError =
3100-
// parquet-mr allows reading decimals into a smaller precision decimal type without
3101-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
3102-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
3103-
outputTimestampType <- ParquetOutputTimestampType.values
3104-
}
3105-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
3106-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
3107-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3108-
withSQLConf(
3109-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
3110-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
3111-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
3112-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
3113-
}
3114-
test(
3115-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
3116-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
3117-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3118-
checkAllParquetReaders(
3119-
values = Seq("1.23", "10.34"),
3120-
fromType = DecimalType(fromPrecision, 2),
3121-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
3122-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
3123-
}
3124-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
3125-
- s"Decimal($toPrecision, $toScale)"
3126-
+ s"Decimal($toPrecision, $toScale)",
3127-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
3128-
) {
3129-
checkAllParquetReaders(
3130-
values = Seq("1.23", "10.34"),
3131-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
3132-
)
3133-
}
3134-
3135-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
3136-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
3137-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3138-
withTempDir { dir =>
3139-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
3140-
writeParquetFiles(
31413072
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
31423073
index 1cc6d3afbee..8275727fbb4 100644
31433074
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession)
207207
s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
208208
return None
209209
}
210+
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
211+
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
212+
// backend, so by default fall back to Spark. Users can opt in to letting Comet
213+
// replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.
214+
if (!conf.parquetVectorizedReaderEnabled &&
215+
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
216+
withInfo(
217+
scanExec,
218+
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
219+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
220+
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
221+
return None
222+
}
210223
if (!CometNativeScan.isSupported(scanExec)) {
211224
return None
212225
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ abstract class CometTestBase
8686
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8787
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
8888
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
89+
conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true")
8990
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
9091
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
9192
// SortOrder is incompatible for mixed zero and negative zero floating point values, but

0 commit comments

Comments
 (0)