Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.flink.cdc.common.types.DataTypes;

import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.ZoneId;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -136,4 +138,26 @@ public void testParseDefaultValueFunctionExpression() {
IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()", DataTypes.BIGINT());
assertThat(result).isNull();
}

@Test
public void testConvertTimestampWithLocalTimeZoneToIcebergType() {
// TIMESTAMPTZ in PostgreSQL should map to TIMESTAMP_LTZ in CDC types,
// and then to Iceberg's TimestampType.withZone().
// This verifies the type conversion doesn't use TIMESTAMP_WITH_TIME_ZONE
// (ZonedTimestampType), which would cause a serialization mismatch in
// BinaryRecordData — the Debezium deserializer produces
// LocalZonedTimestampData, not ZonedTimestampData.
assertThat(IcebergTypeUtils.convertCDCTypeToIcebergType(DataTypes.TIMESTAMP_LTZ(6)))
.isEqualTo(Types.TimestampType.withZone());
}

@Test
public void testCreateFieldGetterForTimestampLtz() {
// Verify that createFieldGetter for TIMESTAMP_WITH_LOCAL_TIME_ZONE
// produces a valid field getter (not throwing ClassCastException or
// NumberFormatException when reading through BinaryRecordData).
org.apache.flink.cdc.common.data.RecordData.FieldGetter getter =
IcebergTypeUtils.createFieldGetter(DataTypes.TIMESTAMP_LTZ(6), 0, ZoneId.of("UTC"));
assertThat(getter).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.table.types.logical.DecimalType;

import io.debezium.config.CommonConnectorConfig;
Expand Down Expand Up @@ -167,9 +166,9 @@ private static DataType convertFromColumn(
return DataTypes.ARRAY(
handleTimestampWithTemporalMode(temporalPrecisionMode, scale));
case PgOid.TIMESTAMPTZ:
return new ZonedTimestampType(scale);
return DataTypes.TIMESTAMP_LTZ(scale);
case PgOid.TIMESTAMPTZ_ARRAY:
return DataTypes.ARRAY(new ZonedTimestampType(scale));
return DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(scale));
case PgOid.TIME:
return handleTimeWithTemporalMode(temporalPrecisionMode, scale);
case PgOid.TIME_ARRAY:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.utils;

import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;

import io.debezium.connector.postgresql.PgOid;
import io.debezium.relational.Column;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Test that TIMESTAMPTZ maps to TIMESTAMP_WITH_LOCAL_TIME_ZONE (not TIMESTAMP_WITH_TIME_ZONE),
* aligning with the Debezium deserializer's converter which produces LocalZonedTimestampData for
* TIMESTAMPTZ columns.
*
* <p>Using TIMESTAMP_WITH_TIME_ZONE (ZonedTimestampType) causes a type mismatch: the BinaryWriter
* expects ZonedTimestampData but the deserializer produces LocalZonedTimestampData, resulting in
* binary data corruption and NumberFormatException in the Iceberg sink's
* IcebergTypeUtils.createFieldGetter.
*/
public class PostgresTypeUtilsTimestamptzTest {

@Test
public void testTimestamptzMapsToLocalZonedTimestamp() {
Column column =
Column.editor()
.name("created_at")
.jdbcType(java.sql.Types.TIMESTAMP_WITH_TIMEZONE)
.nativeType(PgOid.TIMESTAMPTZ)
.type("timestamptz")
.length(Column.UNSET_INT_VALUE)
.scale(6)
.optional(true)
.create();

// Pass null for dbzConfig and typeRegistry since the TIMESTAMPTZ code path
// only uses the column's nativeType and scale.
DataType result = PostgresTypeUtils.fromDbzColumn(column, null, null);

assertThat(result.getTypeRoot())
.as(
"TIMESTAMPTZ should map to TIMESTAMP_WITH_LOCAL_TIME_ZONE, "
+ "not TIMESTAMP_WITH_TIME_ZONE, to match the Debezium deserializer")
.isEqualTo(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
assertThat(result).isInstanceOf(LocalZonedTimestampType.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -199,6 +200,8 @@ protected DeserializationRuntimeConverter createNotNullConverter(DataType type)
return this::convertToTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return this::convertToLocalTimeZoneTimestamp;
case TIMESTAMP_WITH_TIME_ZONE:
return this::convertToZonedTimestamp;
case FLOAT:
return this::convertToFloat;
case DOUBLE:
Expand Down Expand Up @@ -382,6 +385,33 @@ protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
+ dbzObj.getClass().getName());
}

/**
* Converts a Debezium TIMESTAMPTZ value to a {@link ZonedTimestampData}.
*
* <p>Debezium's pgoutput plugin sends TIMESTAMPTZ as an ISO-8601 string with offset (e.g.
* "2026-03-31T12:03:46.125062+00:00"). We parse this to extract milliseconds, sub-millisecond
* nanos, and the zone offset.
*/
protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
java.time.OffsetDateTime odt =
ZonedTimestamp.FORMATTER.parse(str, java.time.OffsetDateTime::from);
Instant instant = odt.toInstant();
long millisecond = instant.toEpochMilli();
int nanoOfMillisecond = instant.getNano() % 1_000_000;
String zoneId = odt.getOffset().getId();
ZonedTimestampData result =
ZonedTimestampData.of(millisecond, nanoOfMillisecond, zoneId);
return result;
}
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP_TZ from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}

protected Object convertToString(Object dbzObj, Schema schema) {
return BinaryStringData.fromString(dbzObj.toString());
}
Expand Down