Skip to content

Commit f48c4e4

Browse files
author
Avichay Marciano
committed
[FLINK-39415][postgres] Fix TIMESTAMPTZ type mapping in pipeline connector
Root cause: PostgresTypeUtils (pipeline connector) mapped TIMESTAMPTZ to ZonedTimestampType (TIMESTAMP_WITH_TIME_ZONE), but the Debezium deserializer only has a converter for TIMESTAMP_WITH_LOCAL_TIME_ZONE. This mismatch causes the deserializer to produce LocalZonedTimestampData while BinaryWriter.write() casts to ZonedTimestampData, resulting in binary data corruption and NumberFormatException when the Iceberg sink reads the field via BinaryRecordData.getZonedTimestamp(). The existing test PostgresFullTypesITCase already expects LocalZonedTimestampData for TIMESTAMPTZ (line 1211: DataTypes.TIMESTAMP_LTZ(0)), confirming the correct type mapping is TIMESTAMP_WITH_LOCAL_TIME_ZONE. Changes: - PostgresTypeUtils: TIMESTAMPTZ -> TIMESTAMP_LTZ(scale) (was ZonedTimestampType) - DebeziumEventDeserializationSchema: add convertToZonedTimestamp() for future TIMESTAMP_WITH_TIME_ZONE support - Add PostgresTypeUtilsTimestamptzTest: validates TIMESTAMPTZ maps to TIMESTAMP_WITH_LOCAL_TIME_ZONE type root - Add IcebergTypeUtilsTest: validates TIMESTAMP_LTZ type conversion and field getter creation for Iceberg sink
1 parent 13cd198 commit f48c4e4

4 files changed

Lines changed: 122 additions & 3 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.flink.cdc.common.types.DataTypes;
2121

2222
import org.apache.iceberg.expressions.Literal;
23+
import org.apache.iceberg.types.Types;
2324
import org.junit.jupiter.api.Test;
2425

2526
import java.math.BigDecimal;
27+
import java.time.ZoneId;
2628

2729
import static org.assertj.core.api.Assertions.assertThat;
2830

@@ -136,4 +138,26 @@ public void testParseDefaultValueFunctionExpression() {
136138
IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()", DataTypes.BIGINT());
137139
assertThat(result).isNull();
138140
}
141+
142+
@Test
143+
public void testConvertTimestampWithLocalTimeZoneToIcebergType() {
144+
// TIMESTAMPTZ in PostgreSQL should map to TIMESTAMP_LTZ in CDC types,
145+
// and then to Iceberg's TimestampType.withZone().
146+
// This verifies the type conversion doesn't use TIMESTAMP_WITH_TIME_ZONE
147+
// (ZonedTimestampType), which would cause a serialization mismatch in
148+
// BinaryRecordData — the Debezium deserializer produces
149+
// LocalZonedTimestampData, not ZonedTimestampData.
150+
assertThat(IcebergTypeUtils.convertCDCTypeToIcebergType(DataTypes.TIMESTAMP_LTZ(6)))
151+
.isEqualTo(Types.TimestampType.withZone());
152+
}
153+
154+
@Test
155+
public void testCreateFieldGetterForTimestampLtz() {
156+
// Verify that createFieldGetter for TIMESTAMP_WITH_LOCAL_TIME_ZONE
157+
// produces a valid field getter (not throwing ClassCastException or
158+
// NumberFormatException when reading through BinaryRecordData).
159+
org.apache.flink.cdc.common.data.RecordData.FieldGetter getter =
160+
IcebergTypeUtils.createFieldGetter(DataTypes.TIMESTAMP_LTZ(6), 0, ZoneId.of("UTC"));
161+
assertThat(getter).isNotNull();
162+
}
139163
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.types.DataType;
2121
import org.apache.flink.cdc.common.types.DataTypes;
22-
import org.apache.flink.cdc.common.types.ZonedTimestampType;
2322
import org.apache.flink.table.types.logical.DecimalType;
2423

2524
import io.debezium.config.CommonConnectorConfig;
@@ -167,9 +166,9 @@ private static DataType convertFromColumn(
167166
return DataTypes.ARRAY(
168167
handleTimestampWithTemporalMode(temporalPrecisionMode, scale));
169168
case PgOid.TIMESTAMPTZ:
170-
return new ZonedTimestampType(scale);
169+
return DataTypes.TIMESTAMP_LTZ(scale);
171170
case PgOid.TIMESTAMPTZ_ARRAY:
172-
return DataTypes.ARRAY(new ZonedTimestampType(scale));
171+
return DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(scale));
173172
case PgOid.TIME:
174173
return handleTimeWithTemporalMode(temporalPrecisionMode, scale);
175174
case PgOid.TIME_ARRAY:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.utils;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypeRoot;
22+
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
23+
24+
import io.debezium.connector.postgresql.PgOid;
25+
import io.debezium.relational.Column;
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/**
31+
* Test that TIMESTAMPTZ maps to TIMESTAMP_WITH_LOCAL_TIME_ZONE (not TIMESTAMP_WITH_TIME_ZONE),
32+
* aligning with the Debezium deserializer's converter which produces LocalZonedTimestampData for
33+
* TIMESTAMPTZ columns.
34+
*
35+
* <p>Using TIMESTAMP_WITH_TIME_ZONE (ZonedTimestampType) causes a type mismatch: the BinaryWriter
36+
* expects ZonedTimestampData but the deserializer produces LocalZonedTimestampData, resulting in
37+
* binary data corruption and NumberFormatException in the Iceberg sink's
38+
* IcebergTypeUtils.createFieldGetter.
39+
*/
40+
public class PostgresTypeUtilsTimestamptzTest {
41+
42+
@Test
43+
public void testTimestamptzMapsToLocalZonedTimestamp() {
44+
Column column =
45+
Column.editor()
46+
.name("created_at")
47+
.jdbcType(java.sql.Types.TIMESTAMP_WITH_TIMEZONE)
48+
.nativeType(PgOid.TIMESTAMPTZ)
49+
.type("timestamptz")
50+
.length(Column.UNSET_INT_VALUE)
51+
.scale(6)
52+
.optional(true)
53+
.create();
54+
55+
// Pass null for dbzConfig and typeRegistry since the TIMESTAMPTZ code path
56+
// only uses the column's nativeType and scale.
57+
DataType result = PostgresTypeUtils.fromDbzColumn(column, null, null);
58+
59+
assertThat(result.getTypeRoot())
60+
.as(
61+
"TIMESTAMPTZ should map to TIMESTAMP_WITH_LOCAL_TIME_ZONE, "
62+
+ "not TIMESTAMP_WITH_TIME_ZONE, to match the Debezium deserializer")
63+
.isEqualTo(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
64+
assertThat(result).isInstanceOf(LocalZonedTimestampType.class);
65+
}
66+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.cdc.common.data.RecordData;
2828
import org.apache.flink.cdc.common.data.TimeData;
2929
import org.apache.flink.cdc.common.data.TimestampData;
30+
import org.apache.flink.cdc.common.data.ZonedTimestampData;
3031
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
3132
import org.apache.flink.cdc.common.event.ChangeEvent;
3233
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -199,6 +200,8 @@ protected DeserializationRuntimeConverter createNotNullConverter(DataType type)
199200
return this::convertToTimestamp;
200201
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
201202
return this::convertToLocalTimeZoneTimestamp;
203+
case TIMESTAMP_WITH_TIME_ZONE:
204+
return this::convertToZonedTimestamp;
202205
case FLOAT:
203206
return this::convertToFloat;
204207
case DOUBLE:
@@ -382,6 +385,33 @@ protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
382385
+ dbzObj.getClass().getName());
383386
}
384387

388+
/**
389+
* Converts a Debezium TIMESTAMPTZ value to a {@link ZonedTimestampData}.
390+
*
391+
* <p>Debezium's pgoutput plugin sends TIMESTAMPTZ as an ISO-8601 string with offset (e.g.
392+
* "2026-03-31T12:03:46.125062+00:00"). We parse this to extract milliseconds, sub-millisecond
393+
* nanos, and the zone offset.
394+
*/
395+
protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
396+
if (dbzObj instanceof String) {
397+
String str = (String) dbzObj;
398+
java.time.OffsetDateTime odt =
399+
ZonedTimestamp.FORMATTER.parse(str, java.time.OffsetDateTime::from);
400+
Instant instant = odt.toInstant();
401+
long millisecond = instant.toEpochMilli();
402+
int nanoOfMillisecond = instant.getNano() % 1_000_000;
403+
String zoneId = odt.getOffset().getId();
404+
ZonedTimestampData result =
405+
ZonedTimestampData.of(millisecond, nanoOfMillisecond, zoneId);
406+
return result;
407+
}
408+
throw new IllegalArgumentException(
409+
"Unable to convert to TIMESTAMP_TZ from unexpected value '"
410+
+ dbzObj
411+
+ "' of type "
412+
+ dbzObj.getClass().getName());
413+
}
414+
385415
protected Object convertToString(Object dbzObj, Schema schema) {
386416
return BinaryStringData.fromString(dbzObj.toString());
387417
}

0 commit comments

Comments
 (0)