Skip to content

Commit b5c3cea

Browse files
committed
fix: reject Parquet TimestampLTZ as TimestampNTZ on Spark 3.x for native_datafusion
Pre-Spark-4 (SPARK-36182) rejects reading a Parquet TimestampLTZ column as TimestampNTZ; native_datafusion previously did not, and silently returned the UTC instant. Plumb a per-Spark-version flag from ShimCometConf through the NativeScan proto into SparkParquetOptions, and gate a new rejection arm in the schema adapter on it. INT96 remains a gap because DataFusion's coerce_int96 strips the source timezone before the schema adapter runs, so it is indistinguishable from a true TIMESTAMP_NTZ source. Compatibility guide updated to describe the correctness implications.
1 parent 3209b98 commit b5c3cea

11 files changed

Lines changed: 85 additions & 14 deletions

File tree

common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,11 @@ trait ShimCometConf {
2828
* SQL conf were removed in favor of this per-version constant; see #4298.
2929
*/
3030
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = false
31+
32+
/**
33+
* Whether reading a Parquet TimestampLTZ column as TimestampNTZ is permitted. Spark 3.x rejects
34+
* this read (SPARK-36182), so Comet matches by defaulting to false on 3.x; Spark 4.0
35+
* (SPARK-47447) lifted the restriction. See #4219.
36+
*/
37+
val COMET_ALLOW_TIMESTAMP_LTZ_AS_NTZ: Boolean = false
3138
}

common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,11 @@ trait ShimCometConf {
2828
* per-version constant; see #4298.
2929
*/
3030
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = true
31+
32+
/**
33+
* Whether reading a Parquet TimestampLTZ column as TimestampNTZ is permitted. Spark 4.0+
34+
* (SPARK-47447) lifted the pre-4.0 SPARK-36182 rejection, so Comet matches with true. See
35+
* #4219.
36+
*/
37+
val COMET_ALLOW_TIMESTAMP_LTZ_AS_NTZ: Boolean = true
3138
}

docs/source/user-guide/latest/compatibility/scans.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,18 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b
8383
The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
8484
without falling back to Spark:
8585

86-
- Reading `TimestampLTZ` as `TimestampNTZ`. On Spark 3.x, Spark raises an error per
87-
[SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because LTZ encodes UTC-adjusted instants
88-
that cannot be safely reinterpreted as timezone-free values. Comet does not raise this error and instead
89-
returns the raw UTC instant as a `TimestampNTZ` value. This applies to all LTZ physical encodings (INT96,
90-
TIMESTAMP_MICROS, TIMESTAMP_MILLIS). On Spark 4.0+, this read is permitted
91-
([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior.
86+
- Reading Parquet `INT96` as `TimestampNTZ` on Spark 3.x. Spark raises
87+
`SchemaColumnConvertNotSupportedException` for this read per
88+
[SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182); Comet does not, and silently
89+
returns the column's UTC instant as the `TimestampNTZ` value (the Spark 4.0+ semantics from
90+
[SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)). This is a correctness
91+
divergence on Spark 3.x: queries that Spark would have failed instead return values, and those
92+
values reflect UTC rather than the session-local wall clock a `TimestampNTZ` is normally
93+
understood as, so downstream filters, joins, and aggregations on the column may produce
94+
different results than running the same query without Comet. The annotated LTZ encodings
95+
(`TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` with `isAdjustedToUTC=true`) are rejected correctly.
96+
INT96 is a gap because DataFusion's `coerce_int96` strips the source timezone before Comet's
97+
schema adapter runs, leaving INT96 indistinguishable from a true `TIMESTAMP_NTZ` source.
9298
See [#4219](https://github.com/apache/datafusion-comet/issues/4219).
9399

94100
### Schema Mismatch Handling

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,6 +1373,7 @@ impl PhysicalPlanner {
13731373
common.case_sensitive,
13741374
common.return_null_struct_if_all_fields_missing,
13751375
common.allow_type_promotion,
1376+
common.allow_timestamp_ltz_to_ntz,
13761377
self.session_ctx(),
13771378
common.encryption_enabled,
13781379
common.use_field_id,

native/core/src/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
514514
case_sensitive != JNI_FALSE,
515515
return_null_struct_if_all_fields_missing != JNI_FALSE,
516516
true, // allow_type_promotion: JVM side already validated via TypeUtil.checkParquetType
517+
true, // allow_timestamp_ltz_to_ntz: JVM side already validated via TypeUtil.checkParquetType
517518
session_ctx,
518519
encryption_enabled,
519520
// The iceberg-compat path resolves IDs in the JVM via NativeBatchReader,

native/core/src/parquet/parquet_exec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub(crate) fn init_datasource_exec(
7171
case_sensitive: bool,
7272
return_null_struct_if_all_fields_missing: bool,
7373
allow_type_promotion: bool,
74+
allow_timestamp_ltz_to_ntz: bool,
7475
session_ctx: &Arc<SessionContext>,
7576
encryption_enabled: bool,
7677
use_field_id: bool,
@@ -81,6 +82,7 @@ pub(crate) fn init_datasource_exec(
8182
case_sensitive,
8283
return_null_struct_if_all_fields_missing,
8384
allow_type_promotion,
85+
allow_timestamp_ltz_to_ntz,
8486
&object_store_url,
8587
encryption_enabled,
8688
);
@@ -200,6 +202,7 @@ fn get_options(
200202
case_sensitive: bool,
201203
return_null_struct_if_all_fields_missing: bool,
202204
allow_type_promotion: bool,
205+
allow_timestamp_ltz_to_ntz: bool,
203206
object_store_url: &ObjectStoreUrl,
204207
encryption_enabled: bool,
205208
) -> (TableParquetOptions, SparkParquetOptions) {
@@ -214,6 +217,7 @@ fn get_options(
214217
spark_parquet_options.return_null_struct_if_all_fields_missing =
215218
return_null_struct_if_all_fields_missing;
216219
spark_parquet_options.allow_type_promotion = allow_type_promotion;
220+
spark_parquet_options.allow_timestamp_ltz_to_ntz = allow_timestamp_ltz_to_ntz;
217221

218222
if encryption_enabled {
219223
table_parquet_options.crypto.configure_factory(

native/core/src/parquet/parquet_support.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ pub struct SparkParquetOptions {
9696
/// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> INT64,
9797
/// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled.
9898
pub allow_type_promotion: bool,
99+
/// When true, reading a Parquet TimestampLTZ column as TimestampNTZ is
100+
/// permitted (Spark 4.0+, SPARK-47447); when false, it is rejected
101+
/// (Spark 3.x, SPARK-36182). Mirrors Comet's per-Spark-version constant
102+
/// in ShimCometConf.
103+
pub allow_timestamp_ltz_to_ntz: bool,
99104
}
100105

101106
impl SparkParquetOptions {
@@ -112,6 +117,7 @@ impl SparkParquetOptions {
112117
use_field_id: false,
113118
ignore_missing_field_id: false,
114119
allow_type_promotion: false,
120+
allow_timestamp_ltz_to_ntz: false,
115121
}
116122
}
117123

@@ -128,6 +134,7 @@ impl SparkParquetOptions {
128134
use_field_id: false,
129135
ignore_missing_field_id: false,
130136
allow_type_promotion: false,
137+
allow_timestamp_ltz_to_ntz: false,
131138
}
132139
}
133140
}

native/core/src/parquet/schema_adapter.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,31 @@ impl SparkPhysicalExprAdapter {
807807
return Ok(Transformed::yes(rejection));
808808
}
809809

810+
// Spark 3.x refuses to read a Parquet TimestampLTZ column as
811+
// TimestampNTZ (SPARK-36182); Spark 4.0 (SPARK-47447) lifted that.
812+
// The flag tracks Comet's per-Spark-version constant in
813+
// ShimCometConf. Deferred to runtime so empty files (SPARK-26709)
814+
// still pass. See #4219.
815+
//
816+
// INT96 columns surface as `Timestamp(_, None)` after `coerce_int96`
817+
// strips the timezone, so this pattern only catches TIMESTAMP_MICROS
818+
// / TIMESTAMP_MILLIS reads. INT96 -> TimestampNTZ is handled elsewhere.
819+
if !self.parquet_options.allow_timestamp_ltz_to_ntz
820+
&& matches!(
821+
(physical_type, target_type),
822+
(DataType::Timestamp(_, Some(_)), DataType::Timestamp(_, None))
823+
)
824+
{
825+
let rejection = reject_on_non_empty_expr(
826+
Arc::clone(&child),
827+
cast.target_field(),
828+
cast.input_field().name(),
829+
physical_type,
830+
target_type,
831+
);
832+
return Ok(Transformed::yes(rejection));
833+
}
834+
810835
// Scalar/complex mismatch (e.g. TIMESTAMP read as ARRAY<TIMESTAMP>):
811836
// Spark's vectorized reader rejects with
812837
// SchemaColumnConvertNotSupportedException (SPARK-45604). Same-shape

native/proto/src/proto/operator.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ message NativeScanCommon {
127127
// with a disallowed promoted type throws an error matching Spark's
128128
// SchemaColumnConvertNotSupportedException behavior.
129129
bool allow_type_promotion = 17;
130+
// When true, reading a Parquet TimestampLTZ column as TimestampNTZ is
131+
// permitted (Spark 4.0+, SPARK-47447); when false, it is rejected with
132+
// SchemaColumnConvertNotSupportedException (Spark 3.x, SPARK-36182). Set
133+
// from Comet's per-Spark-version constant in ShimCometConf.
134+
bool allow_timestamp_ltz_to_ntz = 18;
130135
}
131136

132137
message NativeScan {

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
212212
scan.conf.getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID))
213213

214214
commonBuilder.setAllowTypePromotion(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED)
215+
commonBuilder.setAllowTimestampLtzToNtz(CometConf.COMET_ALLOW_TIMESTAMP_LTZ_AS_NTZ)
215216

216217
// Collect S3/cloud storage configurations
217218
val hadoopConf = scan.relation.sparkSession.sessionState

0 commit comments

Comments
 (0)