Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ logs.zip
*.iml
.idea/*

# Eclipse / M2E files
.classpath
.factorypath
.project
.settings/

.DS_Store

metastore_db/
Expand Down
12 changes: 12 additions & 0 deletions docs/en/introduction/concepts/incompatible-changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ You need to check this document before you upgrade to related version.

## dev

### JDBC Connector

Comment thread
doyong365 marked this conversation as resolved.
- **Breaking Change: Mapping of timezone-aware timestamp columns to `TIMESTAMP_TZ` type**
- **Affected component**: `seatunnel-connectors-v2/connector-jdbc`, `seatunnel-connectors-v2/connector-iceberg`, `seatunnel-connectors-v2/connector-cdc-base`, `seatunnel-connectors-v2/connector-cdc-tidb`, `seatunnel-connectors-v2/connector-starrocks`, `seatunnel-connectors-v2/connector-hudi`, `seatunnel-connectors-v2/connector-snowflake` (via JDBC dialect)
- **Description**: Previously, JDBC sources mapped both timezone-naive (e.g., MySQL `DATETIME`) and timezone-aware (e.g., MySQL `TIMESTAMP`) timestamp columns to SeaTunnel's internal `TIMESTAMP` type. Now, timezone-aware columns like MySQL `TIMESTAMP`, PostgreSQL `timestamptz`, Oracle `TIMESTAMP WITH LOCAL TIME ZONE`, SQL Server `datetimeoffset`, Snowflake `TIMESTAMP_LTZ/TZ`, and others are explicitly mapped to `TIMESTAMP_TZ`. This ensures that timezone semantics are accurately preserved when writing to formats like Iceberg, where `TIMESTAMP` is saved as `timestamp` (without timezone) and `TIMESTAMP_TZ` is saved as `timestamptz` (with timezone).
- **Impact**: If your downstream Sink relies on receiving `TIMESTAMP` types and does not support `TIMESTAMP_TZ` natively, you may encounter type mismatch errors. For Iceberg users, this means columns previously written as `timestamp` (without timezone) may now be written as `timestamptz` (with timezone) and change the table schema. You may need to cast the column in sql transform or update your sink configurations. (#10685)
- **Connector-specific behavior changes**:
- **Snowflake**: `TIMESTAMP_LTZ` and `TIMESTAMP_TZ` columns are now mapped to `OFFSET_DATE_TIME_TYPE` (`TIMESTAMP_TZ`) instead of `LOCAL_DATE_TIME_TYPE`. This affects both Source and Sink paths for Snowflake.
- **StarRocks**: `TIMESTAMP_TZ` values written to StarRocks Sink are stored as `DATETIME` (wall-clock only, timezone offset is dropped) due to StarRocks not having a native timezone-aware datetime type.
- **Hudi**: `TIMESTAMP_TZ` is now mapped to Avro `timestampMillis` (UTC epoch). Existing Hudi tables written with the old schema may need to be re-created if schema evolution is not supported.
- **CDC (Debezium-based, TiDB)**: CDC connectors now correctly handle `TIMESTAMP_TZ` type in the Debezium deserialization layer. Previously, `TIMESTAMP_TZ` was unsupported and would throw `UnsupportedOperationException`. Users who were previously unable to use timezone-aware columns in CDC pipelines can now do so.

### API Changes

- **Breaking Change: Engine REST table metrics key format**
Expand Down
12 changes: 12 additions & 0 deletions docs/zh/introduction/concepts/incompatible-changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

## dev

### JDBC Connector

Comment thread
doyong365 marked this conversation as resolved.
- **破坏性变更:带时区的时间戳列映射为 `TIMESTAMP_TZ` 类型**
- **影响范围**:`seatunnel-connectors-v2/connector-jdbc`、`seatunnel-connectors-v2/connector-iceberg`、`seatunnel-connectors-v2/connector-cdc-base`、`seatunnel-connectors-v2/connector-cdc-tidb`、`seatunnel-connectors-v2/connector-starrocks`、`seatunnel-connectors-v2/connector-hudi`、`seatunnel-connectors-v2/connector-snowflake`(通过 JDBC 方言)
- **变更说明**:以前,JDBC Source 将无时区(如 MySQL `DATETIME`)和带时区(如 MySQL `TIMESTAMP`)的时间戳列都映射为 SeaTunnel 内部的 `TIMESTAMP` 类型。现在,带时区的列(如 MySQL `TIMESTAMP`、PostgreSQL `timestamptz`、Oracle `TIMESTAMP WITH LOCAL TIME ZONE`、SQL Server `datetimeoffset`、Snowflake `TIMESTAMP_LTZ/TZ` 等)被显式映射为 `TIMESTAMP_TZ`。这确保了在写入 Iceberg 等格式时,时区语义得到准确保留(在 Iceberg 中 `TIMESTAMP` 存为无时区的 `timestamp`,`TIMESTAMP_TZ` 存为带时区的 `timestamptz`)。
- **影响**:如果您的下游 Sink 依赖接收 `TIMESTAMP` 类型且不支持 `TIMESTAMP_TZ`,您可能会遇到类型不匹配错误。对于 Iceberg 用户,这意味着以前作为 `timestamp`(无时区)写入的列现在可能会作为 `timestamptz`(带时区)写入,从而改变表结构。您可能需要在 SQL Transform 中转换该列或更新您的 Sink 配置。(#10685)
- **各连接器具体行为变更**:
- **Snowflake**:`TIMESTAMP_LTZ` 和 `TIMESTAMP_TZ` 列现在映射为 `OFFSET_DATE_TIME_TYPE`(`TIMESTAMP_TZ`),而不是原来的 `LOCAL_DATE_TIME_TYPE`。这同时影响 Snowflake 的 Source 和 Sink 路径。
- **StarRocks**:写入 StarRocks Sink 的 `TIMESTAMP_TZ` 值以 `DATETIME`(仅保留时钟时间,时区偏移量丢失)形式存储,这是由于 StarRocks 不支持原生带时区的日期时间类型。
- **Hudi**:`TIMESTAMP_TZ` 现在映射为 Avro `timestampMillis`(UTC 纪元时间)。如果 Hudi 表不支持 Schema Evolution,以旧 Schema 写入的现有表可能需要重新创建。
- **CDC(基于 Debezium,TiDB)**:CDC 连接器现在可以正确处理 Debezium 反序列化层中的 `TIMESTAMP_TZ` 类型。以前,`TIMESTAMP_TZ` 不受支持,会抛出 `UnsupportedOperationException`。现在,在 CDC 管道中使用带时区列的用户可以正常使用。

Comment thread
doyong365 marked this conversation as resolved.
### API 变更

- **破坏性变更:Engine REST 表级指标 key 格式变化**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return convertToTime();
case TIMESTAMP:
return convertToTimestamp(serverTimeZone);
case TIMESTAMP_TZ:
return convertToTimestampTz(serverTimeZone);
case FLOAT:
return wrapNumericConverter(convertToFloat());
case DOUBLE:
Expand Down Expand Up @@ -392,6 +394,122 @@ public Object convert(Object dbzObj, Schema schema) {
};
}

/**
* Flexible fallback formatter that covers non-ISO variants emitted by Debezium:
*
* <ul>
* <li>Space separator instead of 'T' (MySQL in certain schema-history modes): {@code
* 2024-01-01 12:00:00+08:00}
* <li>Hour-only offset (PostgreSQL short form): {@code 2024-01-01T12:00:00+08}
* <li>Both: {@code 2024-01-01 12:00:00+08}
* </ul>
*/
private static final java.time.format.DateTimeFormatter FLEXIBLE_OFFSET_FORMATTER =
new java.time.format.DateTimeFormatterBuilder()
.parseLenient()
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral('T')
.optionalEnd()
.optionalStart()
.appendLiteral(' ')
.optionalEnd()
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_TIME)
.appendPattern("[XXX][XX][X]")
.toFormatter();

private static DebeziumDeserializationConverter convertToTimestampTz(ZoneId serverTimeZone) {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
return parseOffsetDateTimeFromString((String) dbzObj, serverTimeZone);
}
Comment thread
doyong365 marked this conversation as resolved.
java.time.LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return localDateTime
.atZone(serverTimeZone)
.toOffsetDateTime()
.withOffsetSameInstant(java.time.ZoneOffset.UTC);
}
};
}

/**
* Parses a Debezium-emitted timestamp string into an {@link java.time.OffsetDateTime} using a
* fallback chain that tolerates the variety of formats Debezium may produce:
*
* <ol>
* <li>{@link java.time.OffsetDateTime#parse} — strict ISO-8601 with numeric offset, e.g.
* {@code 2024-01-01T12:00:00+08:00}
* <li>{@link java.time.ZonedDateTime#parse} — IANA zone-region id, e.g. {@code
* 2024-01-01T12:00:00 Asia/Shanghai} (the space before the zone id is normalized to 'T'
* first)
* <li>{@link #FLEXIBLE_OFFSET_FORMATTER} — space date/time separator or short offset, e.g.
* {@code 2024-01-01 12:00:00+08:00} or {@code 2024-01-01T12:00:00+08}
* <li>{@link Instant#parse} — UTC epoch literal, e.g. {@code 2024-01-01T12:00:00Z}
* </ol>
*
* If all attempts fail, an {@link IllegalArgumentException} is thrown with the raw value
* included so that the failing CDC task carries enough context for diagnosis.
*/
@VisibleForTesting
static java.time.OffsetDateTime parseOffsetDateTimeFromString(
String str, ZoneId serverTimeZone) {
// 1. Strict ISO-8601 with numeric offset: 2024-01-01T12:00:00+08:00 / Z
try {
return java.time.OffsetDateTime.parse(str)
.withOffsetSameInstant(java.time.ZoneOffset.UTC);
} catch (java.time.format.DateTimeParseException ignored) {
// fall through
}

// 2. IANA zone-region id: 2024-01-01T12:00:00 Asia/Shanghai
// Debezium separates the zone id with a space; replace the *last* space before a
// letter-only token so that the date/time part is unaffected.
try {
String normalized = str.replaceFirst(" ([A-Za-z])", "[$1").replace(" ", "/") + "]";
// Build a formatter that accepts [...] as a zone-region
java.time.format.DateTimeFormatter zoneRegionFmt =
new java.time.format.DateTimeFormatterBuilder()
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.appendLiteral('[')
.parseCaseSensitive()
.appendZoneRegionId()
.appendLiteral(']')
.toFormatter();
return java.time.ZonedDateTime.parse(normalized, zoneRegionFmt)
.toOffsetDateTime()
.withOffsetSameInstant(java.time.ZoneOffset.UTC);
} catch (Exception ignored) {
// fall through
}

// 3. Space separator or short offset: 2024-01-01 12:00:00+08:00 / +08
try {
return java.time.OffsetDateTime.parse(str, FLEXIBLE_OFFSET_FORMATTER)
.withOffsetSameInstant(java.time.ZoneOffset.UTC);
} catch (java.time.format.DateTimeParseException ignored) {
// fall through
}

// 4. UTC epoch literal: 2024-01-01T12:00:00Z
try {
return Instant.parse(str).atOffset(java.time.ZoneOffset.UTC);
} catch (java.time.format.DateTimeParseException ignored) {
// fall through
}

throw new IllegalArgumentException(
"Unable to parse OffsetDateTime from CDC TIMESTAMP_TZ value: '"
+ str
+ "'. Supported formats: ISO-8601 with numeric offset, IANA zone-region"
+ " id, space-separated date/time, short-form hour-only offset, UTC"
+ " epoch literal.");
}

private static DebeziumDeserializationConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -206,4 +209,57 @@ void testGeographyStringConversion() throws Exception {
Assertions.assertTrue(fieldValue instanceof String);
Assertions.assertEquals("0102FF", fieldValue);
}

/**
* Verifies the fallback chain in {@code parseOffsetDateTimeFromString} against the four
* Debezium timestamp formats called out in the review:
*
* <ol>
* <li>Standard ISO-8601 with numeric offset (baseline)
* <li>Oracle TIMESTAMP WITH LOCAL TIME ZONE — IANA zone-region id
* <li>MySQL TIMESTAMP in certain schema-history modes — space date/time separator
* <li>PostgreSQL timestamptz — short-form hour-only offset
* </ol>
*
* All four variants must parse to the same UTC instant.
*/
@Test
void testParseOffsetDateTimeFromStringFallbackChain() {
ZoneId serverTz = ZoneId.of("Asia/Shanghai");
// Expected UTC instant: 2024-01-01 04:00:00Z (2024-01-01 12:00:00+08:00)
OffsetDateTime expected = Instant.parse("2024-01-01T04:00:00Z").atOffset(ZoneOffset.UTC);

// 1. ISO-8601 with numeric offset — handled by OffsetDateTime.parse
OffsetDateTime r1 =
SeaTunnelRowDebeziumDeserializationConverters.parseOffsetDateTimeFromString(
"2024-01-01T12:00:00+08:00", serverTz);
Assertions.assertEquals(expected, r1, "ISO-8601 numeric offset failed");

// 2. IANA zone-region id — Oracle TIMESTAMP WITH LOCAL TIME ZONE
OffsetDateTime r2 =
SeaTunnelRowDebeziumDeserializationConverters.parseOffsetDateTimeFromString(
"2024-01-01T12:00:00 Asia/Shanghai", serverTz);
Assertions.assertEquals(expected, r2, "IANA zone-region id failed");

// 3. Space separator — MySQL in certain schema-history modes
OffsetDateTime r3 =
SeaTunnelRowDebeziumDeserializationConverters.parseOffsetDateTimeFromString(
"2024-01-01 12:00:00+08:00", serverTz);
Assertions.assertEquals(expected, r3, "Space date/time separator failed");

// 4. Short-form hour-only offset — PostgreSQL timestamptz
OffsetDateTime r4 =
SeaTunnelRowDebeziumDeserializationConverters.parseOffsetDateTimeFromString(
"2024-01-01T12:00:00+08", serverTz);
Assertions.assertEquals(expected, r4, "Short-form hour-only offset failed");
}

@Test
void testParseOffsetDateTimeFromStringThrowsOnUnknownFormat() {
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
SeaTunnelRowDebeziumDeserializationConverters.parseOffsetDateTimeFromString(
"not-a-timestamp", ZoneId.systemDefault()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testParseDDLForAddColumn() {
addEvent3.get(12),
"col13".toUpperCase(),
"timestamp with time zone(6)",
"TIMESTAMP",
"TIMESTAMP_TZ",
null,
6,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;

@Slf4j
public class DefaultDataConverter implements DataConverter<Object[]> {
Expand Down Expand Up @@ -101,6 +103,11 @@ public SeaTunnelRow convert(Object[] values, TiTableInfo tableInfo, SeaTunnelRow
case TIMESTAMP:
fields[fieldIndex] = convertToTimestamp(value, dataType);
break;
case TIMESTAMP_TZ:
// TiDB TIMESTAMP is stored as UTC (LTZ).
// Convert to OffsetDateTime with UTC offset to preserve timezone semantics.
fields[fieldIndex] = convertToOffsetDateTime(value, dataType);
break;
case BYTES:
fields[fieldIndex] = convertToBinary(value);
break;
Expand Down Expand Up @@ -259,6 +266,45 @@ private static Object convertToTime(Object value) {
return TemporalConversions.toLocalTime(value);
}

private static Object convertToOffsetDateTime(
Object value, org.tikv.common.types.DataType dataType) {
if (value instanceof Timestamp) {
// TiDB TIMESTAMP is stored in UTC; convert to OffsetDateTime with UTC zone.
Instant instant = ((Timestamp) value).toInstant();
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
}
if (value instanceof Long) {
// TiDB may emit TIMESTAMP as epoch milliseconds in some snapshot modes.
return OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) value), ZoneOffset.UTC);
}
if (value instanceof LocalDateTime) {
// LocalDateTime without explicit zone — treat as UTC wall-clock value.
return ((LocalDateTime) value).atOffset(ZoneOffset.UTC);
}
if (value instanceof String) {
// String representation from TiDB CDC — attempt ISO-8601 parse.
try {
return OffsetDateTime.parse((String) value);
} catch (java.time.format.DateTimeParseException e) {
throw new IllegalArgumentException(
"Unable to convert TIMESTAMP_TZ from String value: '"
+ value
+ "' for TiDB dataType: "
+ dataType,
e);
}
}
// Unknown type — fail fast with enough context for diagnosis instead of silently
// returning the raw value which would cause a ClassCastException downstream.
throw new IllegalArgumentException(
"Unsupported value type for TIMESTAMP_TZ conversion: "
+ value.getClass().getName()
+ ", value='"
+ value
+ "', TiDB dataType="
+ dataType);
}

private static Object convertToTimestamp(
Object value, org.tikv.common.types.DataType dataType) {
switch (dataType.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ protected BasicTypeDefine sampleReconvert(
builder.columnType(String.format("%s(%s)", DORIS_VARCHAR, 8));
builder.dataType(DORIS_VARCHAR);
break;
case TIMESTAMP_TZ:
// Doris has no timezone-aware datetime type; store as DATETIME (wall-clock value)
builder.columnType(String.format("%s(%s)", DORIS_DATETIME, MAX_DATETIME_SCALE));
builder.dataType(DORIS_DATETIME);
builder.scale(MAX_DATETIME_SCALE);
break;
case ARRAY:
SeaTunnelDataType<?> dataType = column.getDataType();
SeaTunnelDataType elementType = null;
Expand Down Expand Up @@ -426,6 +432,7 @@ private void reconvertBuildArrayInternal(
builder.dataType(DORIS_DATEV2_ARRAY);
break;
case TIMESTAMP:
case TIMESTAMP_TZ:
builder.columnType(DORIS_DATETIMEV2_ARRAY);
builder.dataType(DORIS_DATETIMEV2_ARRAY);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public BasicTypeDefine reconvert(Column column) {
builder.dataType(DORIS_DATEV2);
break;
case TIMESTAMP:
case TIMESTAMP_TZ:
if (column.getScale() != null
&& column.getScale() > 0
&& column.getScale() <= MAX_DATETIME_SCALE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public BasicTypeDefine reconvert(Column column) {
builder.dataType(DORIS_DATE);
break;
case TIMESTAMP:
case TIMESTAMP_TZ:
if (column.getScale() != null
&& column.getScale() >= 0
&& column.getScale() <= MAX_DATETIME_SCALE) {
Expand Down
Loading
Loading