typeDefine = converter.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(SR_DATETIME, typeDefine.getColumnType());
+ Assertions.assertEquals(SR_DATETIME, typeDefine.getDataType());
+ }
+
@Test
public void testReconvertArray() {
Column column =
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
index 54c4401b8d0d..d85af7743e52 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
@@ -29,6 +29,8 @@
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
@@ -70,4 +72,24 @@ public void serialize() {
"{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"},\"timestamp\":\"2024-01-25 07:55:45.123\"}",
jsonString);
}
+
+ @Test
+ public void serializeTimestampTz() {
+ // TIMESTAMP_TZ (OffsetDateTime / LTZ) → StarRocks DATETIME string (wall-clock, no tz)
+ String[] fieldNames = {"id", "ts_tz"};
+ SeaTunnelDataType>[] fieldTypes = {
+ BasicType.LONG_TYPE, LocalTimeType.OFFSET_DATE_TIME_TYPE
+ };
+
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
+ StarRocksJsonSerializer serializer = new StarRocksJsonSerializer(seaTunnelRowType, false);
+
+ // 2026-04-15T04:15:23Z → toLocalDateTime() → "2026-04-15 04:15:23"
+ OffsetDateTime odt = OffsetDateTime.of(2026, 4, 15, 4, 15, 23, 0, ZoneOffset.UTC);
+ Object[] fields = {1L, odt};
+ SeaTunnelRow row = new SeaTunnelRow(fields);
+
+ String jsonString = serializer.serialize(row);
+ Assertions.assertEquals("{\"id\":1,\"ts_tz\":\"2026-04-15 04:15:23\"}", jsonString);
+ }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
index 11c147432d03..ce344d435a0f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
@@ -105,6 +105,19 @@
test
+
+ org.testcontainers
+ postgresql
+ ${testcontainer.version}
+ test
+
+
+
+ org.postgresql
+ postgresql
+ test
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/JdbcToIcebergTimestampIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/JdbcToIcebergTimestampIT.java
new file mode 100644
index 000000000000..fc233ddcefad
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/JdbcToIcebergTimestampIT.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.iceberg;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+/**
+ * E2E test verifying that NTZ (No Time Zone) and LTZ (Local Time Zone) timestamp columns from JDBC
+ * sources are stored with the correct Iceberg timestamp type:
+ *
+ *
+ * NTZ → Iceberg {@code TimestampType.withoutZone()} (e.g. MySQL DATETIME, PG timestamp)
+ * LTZ → Iceberg {@code TimestampType.withZone()} (e.g. MySQL TIMESTAMP, PG timestamptz)
+ *
+ *
+ * Covers the fix for https://github.com/apache/seatunnel/issues/10685
+ */
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason =
+ "Spark engine does not support TIMESTAMP_TZ (OffsetDateTime) natively; "
+ + "TIMESTAMP_TZ is serialized as a custom Decimal struct in Spark translation layer, "
+ + "which is incompatible with standard Sink connectors. "
+ + "Tested on Zeta and Flink engines only.")
+@DisabledOnOs(OS.WINDOWS)
+public class JdbcToIcebergTimestampIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger log = LoggerFactory.getLogger(JdbcToIcebergTimestampIT.class);
+
+ // -------------------------------------------------------------------------
+ // Catalog directories (inside the SeaTunnel container)
+ // -------------------------------------------------------------------------
+ private static final String MYSQL_CATALOG_DIR = "/tmp/seatunnel_mnt/iceberg/hadoop-ts-mysql/";
+
+ private static final String PG_CATALOG_DIR = "/tmp/seatunnel_mnt/iceberg/hadoop-ts-pg/";
+
+ // -------------------------------------------------------------------------
+ // MySQL container
+ // -------------------------------------------------------------------------
+ private static final String MYSQL_IMAGE = "mysql:8.0";
+ private static final String MYSQL_HOST = "mysql_timestamp_e2e";
+ private static final String MYSQL_DATABASE = "ts_test";
+ private static final String MYSQL_USER = "root";
+ private static final String MYSQL_PASSWORD = "root";
+
+ private static final MySQLContainer> MYSQL_CONTAINER =
+ new MySQLContainer<>(DockerImageName.parse(MYSQL_IMAGE))
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER)
+ .withPassword(MYSQL_PASSWORD)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger("mysql-timestamp-image")));
+
+ // -------------------------------------------------------------------------
+ // PostgreSQL container
+ // -------------------------------------------------------------------------
+ private static final String PG_IMAGE = "postgres:14-alpine";
+ private static final String PG_HOST = "pg_timestamp_e2e";
+ private static final String PG_DATABASE = "ts_test";
+ private static final String PG_USER = "postgres";
+ private static final String PG_PASSWORD = "postgres";
+
+ private static final PostgreSQLContainer> PG_CONTAINER =
+ new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withDatabaseName(PG_DATABASE)
+ .withUsername(PG_USER)
+ .withPassword(PG_PASSWORD)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PG_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger("pg-timestamp-image")));
+
+ // -------------------------------------------------------------------------
+ // Driver / plugin JARs downloaded into the SeaTunnel container
+ // -------------------------------------------------------------------------
+ private static final String MYSQL_DRIVER_URL =
+ "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ private static final String PG_DRIVER_URL =
+ "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+ private static final String ZSTD_URL =
+ "https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.5-5/zstd-jni-1.5.5-5.jar";
+
+ // -------------------------------------------------------------------------
+ // Container setup: create Iceberg dirs + download driver JARs
+ // -------------------------------------------------------------------------
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ for (String dir :
+ new String[] {
+ MYSQL_CATALOG_DIR + "seatunnel_namespace/mysql_ts_sink/data",
+ MYSQL_CATALOG_DIR + "seatunnel_namespace/mysql_ts_sink/metadata",
+ PG_CATALOG_DIR + "seatunnel_namespace/pg_ts_sink/data",
+ PG_CATALOG_DIR + "seatunnel_namespace/pg_ts_sink/metadata",
+ }) {
+ container.execInContainer("sh", "-c", "mkdir -p " + dir);
+ }
+ container.execInContainer("sh", "-c", "chmod -R 777 /tmp/seatunnel_mnt/iceberg/");
+
+ // Download Iceberg compression codec
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib"
+ + " && cd /tmp/seatunnel/plugins/Iceberg/lib"
+ + " && wget -q "
+ + ZSTD_URL);
+
+ // Download JDBC drivers into the Jdbc plugin directory
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && cd /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && wget -q "
+ + MYSQL_DRIVER_URL
+ + " && wget -q "
+ + PG_DRIVER_URL);
+ };
+
+ // -------------------------------------------------------------------------
+ // Lifecycle
+ // -------------------------------------------------------------------------
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ log.info("Starting MySQL and PostgreSQL containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER, PG_CONTAINER)).join();
+ log.info("DB containers started. Initializing test data...");
+ initMysqlData();
+ initPostgresData();
+ log.info("Test data initialised.");
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ if (PG_CONTAINER != null) {
+ PG_CONTAINER.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Test: MySQL DATETIME (NTZ) → Iceberg withoutZone()
+ // -------------------------------------------------------------------------
+ @TestTemplate
+ public void testMysqlDatetimeToIcebergNtz(TestContainer container)
+ throws IOException, InterruptedException {
+ // Step 1: Run job to write data from MySQL to Iceberg
+ org.testcontainers.containers.Container.ExecResult result =
+ container.executeJob("/iceberg/mysql_jdbc_to_iceberg_timestamp.conf");
+ Assertions.assertEquals(
+ 0, result.getExitCode(), "Write job failed:\n" + result.getStderr());
+
+ // Step 2: Run verification job (Iceberg -> Assert)
+ // This job verifies that the data in Iceberg matches expected types and values
+ org.testcontainers.containers.Container.ExecResult verifyResult =
+ container.executeJob("/iceberg/mysql_iceberg_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ verifyResult.getExitCode(),
+ "Verification job failed:\n" + verifyResult.getStderr());
+ }
+
+ // -------------------------------------------------------------------------
+ // Test: PostgreSQL timestamp (NTZ) → Iceberg withoutZone()
+ // PostgreSQL timestamptz (LTZ) → Iceberg withZone()
+ // -------------------------------------------------------------------------
+ @TestTemplate
+ public void testPgTimestampToIceberg(TestContainer container)
+ throws IOException, InterruptedException {
+ // Step 1: Run job to write data from PostgreSQL to Iceberg
+ org.testcontainers.containers.Container.ExecResult result =
+ container.executeJob("/iceberg/pg_jdbc_to_iceberg_timestamp.conf");
+ Assertions.assertEquals(
+ 0, result.getExitCode(), "Write job failed:\n" + result.getStderr());
+
+ // Step 2: Run verification job (Iceberg -> Assert)
+ org.testcontainers.containers.Container.ExecResult verifyResult =
+ container.executeJob("/iceberg/pg_iceberg_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ verifyResult.getExitCode(),
+ "Verification job failed:\n" + verifyResult.getStderr());
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+ private void initMysqlData() throws Exception {
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE IF NOT EXISTS ts_table ("
+ + " id INT PRIMARY KEY,"
+ + " dt_col DATETIME,"
+ + " ts_col TIMESTAMP"
+ + ")");
+ stmt.execute(
+ "INSERT INTO ts_table (id, dt_col, ts_col) VALUES"
+ + " (1, '2026-01-01 00:00:00', '2026-01-01 00:00:00')");
+ }
+ }
+
+ private void initPostgresData() throws Exception {
+ try (Connection conn =
+ DriverManager.getConnection(
+ PG_CONTAINER.getJdbcUrl(), PG_USER, PG_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE IF NOT EXISTS ts_table ("
+ + " id INT PRIMARY KEY,"
+ + " ts_col TIMESTAMP WITHOUT TIME ZONE,"
+ + " tstz_col TIMESTAMP WITH TIME ZONE"
+ + ")");
+ stmt.execute(
+ "INSERT INTO ts_table (id, ts_col, tstz_col) VALUES"
+ + " (1, '2026-01-01 00:00:00', '2026-01-01 00:00:00+00')");
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index fcec73e5d01e..3e42c56f69de 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -34,7 +34,7 @@ source {
f5 = "float"
f6 = "double"
f7 = "date"
- f9 = "timestamp"
+ f9 = "timestamp_tz"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_iceberg_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_iceberg_to_assert.conf
new file mode 100644
index 000000000000..3ba2afb306fa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_iceberg_to_assert.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Iceberg {
+ catalog_name = "seatunnel_test"
+ iceberg.catalog.config = {
+ "type" = "hadoop"
+ "warehouse" = "file:///tmp/seatunnel_mnt/iceberg/hadoop-ts-mysql/"
+ }
+ namespace = "seatunnel_namespace"
+ table = "mysql_ts_sink"
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ { rule_type = MIN_ROW, rule_value = 1 },
+ { rule_type = MAX_ROW, rule_value = 1 }
+ ]
+ field_rules = [
+ {
+ field_name = dt_col
+ field_type = timestamp
+ field_value = [
+ { rule_type = NOT_NULL, equals_to = "2026-01-01T00:00:00" }
+ ]
+ },
+ {
+ field_name = ts_col
+ field_type = timestamp_tz
+ field_value = [
+ { rule_type = NOT_NULL, equals_to = "2026-01-01T00:00:00Z" }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_jdbc_to_iceberg_timestamp.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_jdbc_to_iceberg_timestamp.conf
new file mode 100644
index 000000000000..545a54e0e74b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_jdbc_to_iceberg_timestamp.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:mysql://mysql_timestamp_e2e:3306/ts_test?useSSL=false&serverTimezone=UTC"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "root"
+ query = "SELECT id, dt_col, ts_col FROM ts_table"
+ plugin_output = "jdbc_source"
+ }
+}
+
+sink {
+ Iceberg {
+ catalog_name = "seatunnel_test"
+ iceberg.catalog.config = {
+ "type" = "hadoop"
+ "warehouse" = "file:///tmp/seatunnel_mnt/iceberg/hadoop-ts-mysql/"
+ }
+ namespace = "seatunnel_namespace"
+ table = "mysql_ts_sink"
+ plugin_input = "jdbc_source"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_iceberg_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_iceberg_to_assert.conf
new file mode 100644
index 000000000000..647dc1da8e96
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_iceberg_to_assert.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Iceberg {
+ catalog_name = "seatunnel_test"
+ iceberg.catalog.config = {
+ "type" = "hadoop"
+ "warehouse" = "file:///tmp/seatunnel_mnt/iceberg/hadoop-ts-pg/"
+ }
+ namespace = "seatunnel_namespace"
+ table = "pg_ts_sink"
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ { rule_type = MIN_ROW, rule_value = 1 },
+ { rule_type = MAX_ROW, rule_value = 1 }
+ ]
+ field_rules = [
+ {
+ field_name = ts_col
+ field_type = timestamp
+ field_value = [
+ { rule_type = NOT_NULL, equals_to = "2026-01-01T00:00:00" }
+ ]
+ },
+ {
+ field_name = tstz_col
+ field_type = timestamp_tz
+ field_value = [
+ { rule_type = NOT_NULL, equals_to = "2026-01-01T00:00:00Z" }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_jdbc_to_iceberg_timestamp.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_jdbc_to_iceberg_timestamp.conf
new file mode 100644
index 000000000000..7640356e601c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/pg_jdbc_to_iceberg_timestamp.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://pg_timestamp_e2e:5432/ts_test"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+ query = "SELECT id, ts_col, tstz_col FROM ts_table"
+ plugin_output = "jdbc_source"
+ }
+}
+
+sink {
+ Iceberg {
+ catalog_name = "seatunnel_test"
+ iceberg.catalog.config = {
+ "type" = "hadoop"
+ "warehouse" = "file:///tmp/seatunnel_mnt/iceberg/hadoop-ts-pg/"
+ }
+ namespace = "seatunnel_namespace"
+ table = "pg_ts_sink"
+ plugin_input = "jdbc_source"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
index 1430d77e505c..0be4ac65ed53 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -38,7 +38,7 @@ source {
f5 = "float"
f6 = "double"
f7 = "date"
- f9 = "timestamp"
+ f9 = "timestamp_tz"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
index 6b50aba96fbe..36386dd25f43 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -38,7 +38,7 @@ source {
f5 = "float"
f6 = "double"
f7 = "date"
- f9 = "timestamp"
+ f9 = "timestamp_tz"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index c92c887ca6ef..0f18eb0deea9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -366,9 +366,25 @@ public void tearDown() throws SQLException {
}
}
+ /**
+ * Hook for subclasses to skip testJdbcDb for specific engine types without overriding
+ * the @TestTemplate method itself. Overriding a @TestTemplate method in a subclass causes JUnit
+ * 5 to register and run the test twice (once from the parent, once from the child), leading to
+ * duplicate data in the sink table and incorrect row-count assertions.
+ *
+ * @param container the current test container
+ * @return true if testJdbcDb should be skipped for this container
+ */
+ protected boolean isDisabledOnContainer(TestContainer container) {
+ return false;
+ }
+
@TestTemplate
public void testJdbcDb(TestContainer container)
throws IOException, InterruptedException, SQLException {
+ if (isDisabledOnContainer(container)) {
+ return;
+ }
List configFiles = jdbcCase.getConfigFile();
for (String configFile : configFiles) {
try {
@@ -572,7 +588,23 @@ private Object checkData(Object data) throws SQLException, IOException {
javaArray[index] = checkData(jdbcArray[index]);
}
return javaArray;
+ } else if (data instanceof java.time.OffsetDateTime) {
+ // Normalize OffsetDateTime to Timestamp for comparison
+ return java.sql.Timestamp.valueOf(((java.time.OffsetDateTime) data).toLocalDateTime());
} else {
+ // oracle.sql.TIMESTAMPLTZ / TIMESTAMPTZ objects do not override equals() correctly
+ // for cross-object comparison. Normalize to byte[] via toBytes() so that
+ // assertArrayEquals() can compare the raw timestamp bytes directly.
+ String className = data.getClass().getName();
+ if (className.equals("oracle.sql.TIMESTAMPLTZ")
+ || className.equals("oracle.sql.TIMESTAMPTZ")) {
+ try {
+ java.lang.reflect.Method toBytes = data.getClass().getMethod("toBytes");
+ return toBytes.invoke(data);
+ } catch (Exception e) {
+ return data;
+ }
+ }
return data;
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
index 01bcf2db2f07..c300b3dd2c23 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
@@ -503,6 +503,23 @@ private List> querySink(String sql) {
Object object = resultSet.getObject(i);
if (object instanceof NClob) {
objects.add(readNClobAsString((NClob) object));
+ } else if (object instanceof java.time.OffsetDateTime) {
+ // TIMESTAMP_TZ (OffsetDateTime) → normalize to Timestamp for comparison
+ // with MySQL source which returns java.sql.Timestamp
+ objects.add(
+ java.sql.Timestamp.valueOf(
+ ((java.time.OffsetDateTime) object).toLocalDateTime()));
+ } else if (object != null
+ && object.getClass().getName().equals("microsoft.sql.DateTimeOffset")) {
+ // SQL Server DATETIMEOFFSET → normalize to Timestamp for comparison
+ // microsoft.sql.DateTimeOffset.getTimestamp() returns java.sql.Timestamp
+ try {
+ java.lang.reflect.Method getTimestamp =
+ object.getClass().getMethod("getTimestamp");
+ objects.add(getTimestamp.invoke(object));
+ } catch (Exception e) {
+ objects.add(object);
+ }
} else {
objects.add(object);
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlTimestampIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlTimestampIT.java
new file mode 100644
index 000000000000..7aaba3b8c1da
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlTimestampIT.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * E2E test verifying that MySQL NTZ/LTZ timestamp types are correctly distinguished by the JDBC
+ * connector after the fix for https://github.com/apache/seatunnel/issues/10685.
+ *
+ *
+ * MySQL {@code DATETIME} (NTZ) → SeaTunnel internal {@code TIMESTAMP} type
+ * MySQL {@code TIMESTAMP} (LTZ) → SeaTunnel internal {@code TIMESTAMP_TZ} type
+ *
+ *
+ * The Assert sink's {@code field_type} check is used to validate the internal type mapping.
+ */
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason =
+ "Spark engine does not support TIMESTAMP_TZ (OffsetDateTime) natively; "
+ + "TIMESTAMP_TZ is serialized as a custom Decimal struct in Spark translation layer, "
+ + "which is incompatible with standard Sink connectors. "
+ + "Tested on Zeta and Flink engines only.")
+@Slf4j
+public class JdbcMysqlTimestampIT extends TestSuiteBase implements TestResource {
+
+ private static final String MYSQL_IMAGE = "mysql:8.0";
+ private static final String MYSQL_HOST = "mysql_ts_e2e";
+ private static final String MYSQL_DATABASE = "ts_test";
+ private static final String MYSQL_USER = "root";
+ private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
+
+ private static final String MYSQL_DRIVER_URL =
+ "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ private MySQLContainer> mysqlContainer;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult result =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && cd /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && wget -q "
+ + MYSQL_DRIVER_URL);
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "Failed to download MySQL driver: " + result.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ mysqlContainer =
+ new MySQLContainer<>(DockerImageName.parse(MYSQL_IMAGE))
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER)
+ .withPassword(MYSQL_PASSWORD)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_IMAGE)));
+
+ Startables.deepStart(Stream.of(mysqlContainer)).join();
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> initMysqlData());
+ log.info("MySQL container started and test data initialised.");
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (mysqlContainer != null) {
+ mysqlContainer.close();
+ }
+ }
+
+ /**
+ * Verifies that MySQL {@code DATETIME} (NTZ) columns are read as SeaTunnel {@code TIMESTAMP}
+ * (i.e. {@code LOCAL_DATE_TIME_TYPE}), not {@code TIMESTAMP_TZ}.
+ *
+ *
The Assert sink's {@code field_type = timestamp} assertion will fail if the connector
+ * incorrectly maps {@code DATETIME} to {@code TIMESTAMP_TZ}.
+ */
+ @TestTemplate
+ public void testMysqlDatetimeIsNtz(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult result = container.executeJob("/jdbc_mysql_datetime_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "MySQL DATETIME (NTZ) assertion failed:\n" + result.getStderr());
+ }
+
+ /**
+ * Verifies that MySQL {@code TIMESTAMP} (LTZ) columns are read as SeaTunnel {@code
+ * TIMESTAMP_TZ} (i.e. {@code OFFSET_DATE_TIME_TYPE}).
+ *
+ *
The Assert sink's {@code field_type = timestamp_tz} assertion will fail if the connector
+ * incorrectly maps {@code TIMESTAMP} to plain {@code TIMESTAMP}.
+ */
+ @TestTemplate
+ public void testMysqlTimestampIsLtz(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult result = container.executeJob("/jdbc_mysql_timestamp_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "MySQL TIMESTAMP (LTZ) assertion failed:\n" + result.getStderr());
+ }
+
+ /**
+ * Core fix scenario: verifies that MySQL {@code TIMESTAMP} (LTZ) preserves the correct UTC
+ * instant when the JDBC connection uses a non-UTC {@code serverTimezone} (Asia/Seoul, +09:00).
+ *
+ *
Before the fix, {@code JdbcFieldTypeUtils.getOffsetDateTime()} applied the JVM default
+ * timezone during {@code ResultSet} traversal. In a Seoul-timezone session a value stored as
+ * UTC midnight would be shifted by +09:00 and read back incorrectly.
+ *
+ *
After the fix the UTC instant must be preserved regardless of the JDBC session timezone.
+ * If the fix regresses, the {@code field_type = timestamp_tz} assertion inside the Assert sink
+ * will fail because the column will be read as a plain {@code TIMESTAMP} (LocalDateTime)
+ * shifted by the Seoul offset.
+ */
+ @TestTemplate
+ public void testMysqlTimestampIsLtzInNonUtcSession(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult result =
+ container.executeJob("/jdbc_mysql_timestamp_non_utc_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "MySQL TIMESTAMP (LTZ) assertion failed with non-UTC serverTimezone (Asia/Seoul):\n"
+ + result.getStderr());
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private void initMysqlData() throws Exception {
+ String jdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%d/%s?useSSL=false&serverTimezone=UTC",
+ mysqlContainer.getHost(),
+ mysqlContainer.getFirstMappedPort(),
+ MYSQL_DATABASE);
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, MYSQL_USER, MYSQL_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE IF NOT EXISTS ts_source ("
+ + " id INT PRIMARY KEY,"
+ + " dt_col DATETIME,"
+ + " ts_col TIMESTAMP NULL"
+ + ")");
+ // Insert a fixed wall-clock value: 2026-01-01 00:00:00
+ // DATETIME stores it as-is (NTZ); TIMESTAMP stores UTC and displays in session TZ.
+ stmt.execute(
+ "INSERT INTO ts_source (id, dt_col, ts_col) VALUES"
+ + " (1, '2026-01-01 00:00:00', '2026-01-01 00:00:00')");
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 255bcc9c00de..1f42bf9973e5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -28,6 +28,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.junit.jupiter.api.Assertions;
@@ -166,6 +167,16 @@ public void testSampleDataFromColumnSuccess() throws Exception {
dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 1024);
}
+ /**
+ * Disabled on Spark: TIMESTAMP WITH LOCAL TIME ZONE is now mapped to TIMESTAMP_TZ
+ * (OffsetDateTime). Spark encodes TIMESTAMP_TZ as DecimalType(18, 5) internally, causing
+ * byte-level mismatch on Oracle round-trip. See JdbcMysqlTimestampIT for the same limitation.
+ */
+ @Override
+ protected boolean isDisabledOnContainer(TestContainer container) {
+ return container.identifier().getEngineType() == EngineType.SPARK;
+ }
+
@TestTemplate
public void testOracleWithoutDecimalTypeNarrowing(TestContainer container) throws Exception {
Container.ExecResult execResult =
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_datetime_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_datetime_to_assert.conf
new file mode 100644
index 000000000000..cb2d002a3a8f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_datetime_to_assert.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Verifies that MySQL DATETIME (NTZ) is read as SeaTunnel TIMESTAMP (LOCAL_DATE_TIME_TYPE).
+# Covers the fix for https://github.com/apache/seatunnel/issues/10685
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:mysql://mysql_ts_e2e:3306/ts_test?useSSL=false&serverTimezone=UTC"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "Abc!@#135_seatunnel"
+ query = "SELECT id, dt_col FROM ts_source"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ # dt_col is MySQL DATETIME → must map to SeaTunnel TIMESTAMP (NTZ)
+ field_name = dt_col
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2026-01-01T00:00:00"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_non_utc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_non_utc_to_assert.conf
new file mode 100644
index 000000000000..1e1e85b19f8e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_non_utc_to_assert.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Verifies that MySQL TIMESTAMP (LTZ) preserves the correct UTC instant even when the
+# JDBC connection is made with a non-UTC serverTimezone (Asia/Seoul, +09:00).
+#
+# Core fix scenario: before the fix, JdbcFieldTypeUtils.getOffsetDateTime() used the
+# JVM default timezone during ResultSet traversal, so a value stored as UTC midnight
+# would be read back as 09:00 (+09:00) in a Seoul-timezone environment — a 9-hour shift.
+# After the fix, the UTC instant must be preserved regardless of serverTimezone.
+#
+# Covers the fix for https://github.com/apache/seatunnel/issues/10685
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ # serverTimezone=Asia/Seoul simulates a non-UTC JDBC session.
+ # The stored UTC epoch must still be read correctly as TIMESTAMP_TZ.
+ url = "jdbc:mysql://mysql_ts_e2e:3306/ts_test?useSSL=false&serverTimezone=Asia%2FSeoul"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "Abc!@#135_seatunnel"
+ query = "SELECT id, ts_col FROM ts_source"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ # ts_col is MySQL TIMESTAMP → must still map to TIMESTAMP_TZ in non-UTC session.
+ # If the fix regresses, the field_type check will fail because the value will
+ # be read as a plain TIMESTAMP (LocalDateTime) shifted by the Seoul offset.
+ field_name = ts_col
+ field_type = timestamp_tz
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_to_assert.conf
new file mode 100644
index 000000000000..9cdec934bbfd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_timestamp_to_assert.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Verifies that MySQL TIMESTAMP (LTZ) is read as SeaTunnel TIMESTAMP_TZ (OFFSET_DATE_TIME_TYPE).
+# Covers the fix for https://github.com/apache/seatunnel/issues/10685
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:mysql://mysql_ts_e2e:3306/ts_test?useSSL=false&serverTimezone=UTC"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "Abc!@#135_seatunnel"
+ query = "SELECT id, ts_col FROM ts_source"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ # ts_col is MySQL TIMESTAMP → must map to SeaTunnel TIMESTAMP_TZ (LTZ)
+ field_name = ts_col
+ field_type = timestamp_tz
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresTimestampIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresTimestampIT.java
new file mode 100644
index 000000000000..b9f14c4d1dac
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresTimestampIT.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * E2E test verifying that PostgreSQL NTZ/LTZ timestamp types are correctly distinguished by the
+ * JDBC connector after the fix for https://github.com/apache/seatunnel/issues/10685.
+ *
+ *
+ * PostgreSQL {@code TIMESTAMP WITHOUT TIME ZONE} (NTZ) → SeaTunnel internal {@code TIMESTAMP}
+ * PostgreSQL {@code TIMESTAMP WITH TIME ZONE} (LTZ) → SeaTunnel internal {@code TIMESTAMP_TZ}
+ *
+ *
+ * The Assert sink's {@code field_type} check is used to validate the internal type mapping.
+ */
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason =
+ "Spark engine does not support TIMESTAMP_TZ (OffsetDateTime) natively; "
+ + "TIMESTAMP_TZ is serialized as a custom Decimal struct in Spark translation layer, "
+ + "which is incompatible with standard Sink connectors. "
+ + "Tested on Zeta and Flink engines only.")
+@Slf4j
+public class JdbcPostgresTimestampIT extends TestSuiteBase implements TestResource {
+
+ private static final String PG_IMAGE = "postgres:14-alpine";
+ private static final String PG_HOST = "pg_ts_e2e";
+ private static final String PG_DATABASE = "ts_test";
+ private static final String PG_USER = "postgres";
+ private static final String PG_PASSWORD = "postgres";
+
+ private static final String PG_DRIVER_URL =
+ "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+ private PostgreSQLContainer> pgContainer;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult result =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && cd /tmp/seatunnel/plugins/Jdbc/lib"
+ + " && wget -q "
+ + PG_DRIVER_URL);
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "Failed to download PostgreSQL driver: " + result.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ pgContainer =
+ new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withDatabaseName(PG_DATABASE)
+ .withUsername(PG_USER)
+ .withPassword(PG_PASSWORD)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PG_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+
+ Startables.deepStart(Stream.of(pgContainer)).join();
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> initPgData());
+ log.info("PostgreSQL container started and test data initialised.");
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (pgContainer != null) {
+ pgContainer.close();
+ }
+ }
+
+ /**
+ * Verifies that PostgreSQL {@code TIMESTAMP WITHOUT TIME ZONE} (NTZ) columns are read as
+ * SeaTunnel {@code TIMESTAMP} (i.e. {@code LOCAL_DATE_TIME_TYPE}).
+ *
+ *
The Assert sink's {@code field_type = timestamp} assertion will fail if the connector
+ * incorrectly maps plain {@code TIMESTAMP} to {@code TIMESTAMP_TZ}.
+ */
+ @TestTemplate
+ public void testPgTimestampIsNtz(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult result = container.executeJob("/jdbc_pg_timestamp_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "PostgreSQL TIMESTAMP (NTZ) assertion failed:\n" + result.getStderr());
+ }
+
+ /**
+ * Verifies that PostgreSQL {@code TIMESTAMP WITH TIME ZONE} (LTZ) columns are read as SeaTunnel
+ * {@code TIMESTAMP_TZ} (i.e. {@code OFFSET_DATE_TIME_TYPE}).
+ *
+ *
The Assert sink's {@code field_type = timestamp_tz} assertion will fail if the connector
+ * incorrectly maps {@code TIMESTAMPTZ} to plain {@code TIMESTAMP}.
+ */
+ @TestTemplate
+ public void testPgTimestamptzIsLtz(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult result = container.executeJob("/jdbc_pg_timestamptz_to_assert.conf");
+ Assertions.assertEquals(
+ 0,
+ result.getExitCode(),
+ "PostgreSQL TIMESTAMPTZ (LTZ) assertion failed:\n" + result.getStderr());
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private void initPgData() throws Exception {
+ String jdbcUrl =
+ String.format(
+ "jdbc:postgresql://%s:%d/%s",
+ pgContainer.getHost(), pgContainer.getFirstMappedPort(), PG_DATABASE);
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, PG_USER, PG_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE IF NOT EXISTS ts_source ("
+ + " id INT PRIMARY KEY,"
+ + " ts_col TIMESTAMP WITHOUT TIME ZONE,"
+ + " tstz_col TIMESTAMP WITH TIME ZONE"
+ + ")");
+ // ts_col: wall-clock value stored as-is (NTZ, no timezone conversion)
+ // tstz_col: value with explicit UTC offset stored in UTC internally (LTZ)
+ stmt.execute(
+ "INSERT INTO ts_source (id, ts_col, tstz_col) VALUES"
+ + " (1, '2026-01-01 00:00:00', '2026-01-01 00:00:00+00')");
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamp_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamp_to_assert.conf
new file mode 100644
index 000000000000..f3de0481c7a0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamp_to_assert.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Verifies that PostgreSQL TIMESTAMP WITHOUT TIME ZONE (NTZ) is read as SeaTunnel TIMESTAMP.
+# Covers the fix for https://github.com/apache/seatunnel/issues/10685
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://pg_ts_e2e:5432/ts_test?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+ query = "SELECT id, ts_col FROM ts_source"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ # ts_col is TIMESTAMP WITHOUT TIME ZONE → must map to SeaTunnel TIMESTAMP (NTZ)
+ field_name = ts_col
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2026-01-01T00:00:00"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamptz_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamptz_to_assert.conf
new file mode 100644
index 000000000000..802c798cc7aa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_pg_timestamptz_to_assert.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Verifies that PostgreSQL TIMESTAMP WITH TIME ZONE (LTZ) is read as SeaTunnel TIMESTAMP_TZ.
+# Covers the fix for https://github.com/apache/seatunnel/issues/10685
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://pg_ts_e2e:5432/ts_test?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+ query = "SELECT id, tstz_col FROM ts_source"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ # tstz_col is TIMESTAMP WITH TIME ZONE → must map to SeaTunnel TIMESTAMP_TZ (LTZ)
+ field_name = tstz_col
+ field_type = timestamp_tz
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
index f5b7522499b9..8d5d8a927f69 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
@@ -54,6 +54,7 @@
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
@@ -443,17 +444,21 @@ private List> query(String sql, Connection connection) {
while (resultSet.next()) {
ArrayList objects = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
- if (resultSet.getObject(i) instanceof Timestamp) {
- Timestamp timestamp = resultSet.getTimestamp(i);
- objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER));
- break;
+ Object obj = resultSet.getObject(i);
+ if (obj instanceof Timestamp) {
+ objects.add(
+ ((Timestamp) obj).toLocalDateTime().format(DATE_TIME_FORMATTER));
+ } else if (obj instanceof LocalDateTime) {
+ objects.add(((LocalDateTime) obj).format(DATE_TIME_FORMATTER));
+ } else if (obj instanceof OffsetDateTime) {
+ // TIMESTAMP_TZ (LTZ) → normalize to wall-clock string for comparison
+ objects.add(
+ ((OffsetDateTime) obj)
+ .toLocalDateTime()
+ .format(DATE_TIME_FORMATTER));
+ } else {
+ objects.add(obj);
}
- if (resultSet.getObject(i) instanceof LocalDateTime) {
- LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class);
- objects.add(localDateTime.format(DATE_TIME_FORMATTER));
- break;
- }
- objects.add(resultSet.getObject(i));
}
log.debug(String.format("Print query, sql: %s, data: %s", sql, objects));
result.add(objects);
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 749295a50349..5de7c116e4c3 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -67,6 +67,14 @@ public JsonSerializationSchema(SeaTunnelRowType rowType, String nullValue) {
this.charset = StandardCharsets.UTF_8;
}
+ public JsonSerializationSchema(SeaTunnelRowType rowType, boolean serializeTimestampTzAsLocal) {
+ this.rowType = rowType;
+ this.runtimeConverter =
+ new RowToJsonConverters(serializeTimestampTzAsLocal)
+ .createConverter(checkNotNull(rowType));
+ this.charset = StandardCharsets.UTF_8;
+ }
+
{
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
}
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 13a30442d172..5aaf0a49950d 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -53,6 +53,16 @@ public class RowToJsonConverters implements Serializable {
private String nullValue;
+ private final boolean serializeTimestampTzAsLocal;
+
+ public RowToJsonConverters() {
+ this.serializeTimestampTzAsLocal = false;
+ }
+
+ public RowToJsonConverters(boolean serializeTimestampTzAsLocal) {
+ this.serializeTimestampTzAsLocal = serializeTimestampTzAsLocal;
+ }
+
public RowToJsonConverter createConverter(SeaTunnelDataType> type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
@@ -186,6 +196,17 @@ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
}
};
case TIMESTAMP_TZ:
+ if (serializeTimestampTzAsLocal) {
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory()
+ .textNode(
+ ISO_LOCAL_DATE_TIME.format(
+ ((OffsetDateTime) value).toLocalDateTime()));
+ }
+ };
+ }
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 08f7bd7eab67..61a137ef261a 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -175,6 +175,12 @@ private String convert(Object field, SeaTunnelDataType> fieldType, int level)
return TimeUtils.toString((LocalTime) field, timeFormatter);
case TIMESTAMP:
return DateTimeUtils.toString((LocalDateTime) field, dateTimeFormatter);
+ case TIMESTAMP_TZ:
+ return DateTimeUtils.toString(
+ ((java.time.OffsetDateTime) field)
+ .withOffsetSameInstant(java.time.ZoneOffset.UTC)
+ .toLocalDateTime(),
+ dateTimeFormatter);
case NULL:
return "";
case BYTES: