diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index b01274ad091..8dc5ceae0cb 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -172,27 +172,28 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 来解析表达式并且 ## 时间函数 -| 函数 | Janino 代码 | 描述 | -|------------------------------------------------------|------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| LOCALTIME | localtime() | 返回本地时区的当前 SQL 时间,返回类型为 TIME(0)。 | -| LOCALTIMESTAMP | localtimestamp() | 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP(3)。 | -| CURRENT_TIME | currentTime() | 返回本地时区的当前 SQL 时间,是 LOCAL_TIME 的同义词。 | -| CURRENT_DATE | currentDate() | 返回本地时区的当前 SQL 日期。 | -| CURRENT_TIMESTAMP | currentTimestamp() | 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)。 | -| NOW() | now() | 返回本地时区的当前 SQL 时间戳,是 CURRENT_TIMESTAMP 的同义词。 | -| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | 将时间戳转换为指定日期格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | -| DATE_FORMAT(date, string) | dateFormat(date, string) | 将给定日期转换为指定格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | -| DATE_FORMAT(time, string) | dateFormat(time, string) | 将给定时间转换为指定格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | -| DATE_FORMAT_TZ(timestamp, format, timezone) | dateFormatTz(timestamp, format, timezone) | 使用给定的模式和指定的时区将时间戳或日期时间值格式化为字符串。timezone 参数可以是时区 ID(例如 'UTC'、'Asia/Shanghai')或偏移量(如 '+08:00')。 | -| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | 返回 timepoint 加上 interval 后的时间戳。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。 | -| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | 返回 timepoint1 和 timepoint2 之间的(有符号)时间单位数。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。 | -| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | 将日期字符串 string1 按格式 string2(默认为 'yyyy-MM-dd')转换为日期。 | -| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,不带时区。 | -| TO_TIMESTAMP_LTZ(string1[, string2]) | toTimestampLtz(string1[, string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,带本地时区。 | -| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | 返回 numeric 参数的字符串格式表示(默认为 'yyyy-MM-dd HH:mm:ss')。numeric 是表示自 '1970-01-01 00:00:00' UTC 以来秒数的内部时间戳值,例如由 UNIX_TIMESTAMP() 函数产生的值。返回值以会话时区(在 TableConfig 中指定)表示。例如,如果在 UTC 时区,FROM_UNIXTIME(44) 返回 '1970-01-01 00:00:44',但如果在 'Asia/Tokyo' 时区则返回 '1970-01-01 09:00:44'。 | -| UNIX_TIMESTAMP() | unixTimestamp() | 获取当前 Unix 时间戳(秒)。此函数是非确定性的,意味着每条记录都会重新计算该值。 | -| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 将日期时间字符串 string1 按格式 string2(如果未指定,默认为 yyyy-MM-dd HH:mm:ss)转换为 Unix 时间戳(秒),使用表配置中指定的时区。
如果日期时间字符串中指定了时区并以 UTC+X 格式(如 "yyyy-MM-dd HH:mm:ss.SSS X")解析,则此函数将使用日期时间字符串中指定的时区而不是表配置中的时区。如果无法解析日期时间字符串,将返回默认值 Long.MIN_VALUE(-9223372036854775808)。 | -| DATE_ADD(date, int) | dateAdd(date, int) | 将 N 天添加到给定日期,返回格式为 'yyyy-MM-dd' 的字符串。 | +| 函数 | Janino 代码 | 描述 | +|------------------------------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| LOCALTIME | localtime() | 返回本地时区的当前 SQL 时间,返回类型为 TIME(0)。 | +| LOCALTIMESTAMP | localtimestamp() | 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP(3)。 | +| CURRENT_TIME | currentTime() | 返回本地时区的当前 SQL 时间,是 LOCAL_TIME 的同义词。 | +| CURRENT_DATE | currentDate() | 返回本地时区的当前 SQL 日期。 | +| CURRENT_TIMESTAMP | currentTimestamp() | 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)。 | +| NOW() | now() | 返回本地时区的当前 SQL 时间戳,是 CURRENT_TIMESTAMP 的同义词。 | +| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | 将时间戳转换为指定日期格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | +| DATE_FORMAT(date, string) | dateFormat(date, string) | 将给定日期转换为指定格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | +| DATE_FORMAT(time, string) | dateFormat(time, string) | 将给定时间转换为指定格式字符串的值。格式字符串与 Java 的 SimpleDateFormat 兼容。 | +| DATE_FORMAT_TZ(timestamp, format, timezone) | dateFormatTz(timestamp, format, timezone) | 使用给定的模式和指定的时区将时间戳或日期时间值格式化为字符串。timezone 参数可以是时区 ID(例如 'UTC'、'Asia/Shanghai')或偏移量(如 '+08:00')。 | +| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | 返回 timepoint 加上 interval 后的时间戳。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。 | +| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | 返回 timepoint1 和 timepoint2 之间的(有符号)时间单位数。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。 | +| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | 将日期字符串 string1 按格式 string2(默认为 'yyyy-MM-dd')转换为日期。格式参数仅适用于该字符串重载。 | +| TO_DATE(timestamp) | toDate(timestamp) | 将 TIMESTAMP、带时区的 TIMESTAMP(TIMESTAMP_TZ)或 TIMESTAMP_LTZ 值按该时间值的日历日期转换为 DATE。该重载不支持格式模式;若要解析带格式的字符串,请使用 `TO_DATE(string1[, string2])`。 | +| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,不带时区。 | +| TO_TIMESTAMP_LTZ(string1[, string2]) | toTimestampLtz(string1[, string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,带本地时区。 | +| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | 返回 numeric 参数的字符串格式表示(默认为 'yyyy-MM-dd HH:mm:ss')。numeric 是表示自 '1970-01-01 00:00:00' UTC 以来秒数的内部时间戳值,例如由 UNIX_TIMESTAMP() 函数产生的值。返回值以会话时区(在 TableConfig 中指定)表示。例如,如果在 UTC 时区,FROM_UNIXTIME(44) 返回 '1970-01-01 00:00:44',但如果在 'Asia/Tokyo' 时区则返回 '1970-01-01 09:00:44'。 | +| UNIX_TIMESTAMP() | unixTimestamp() | 获取当前 Unix 时间戳(秒)。此函数是非确定性的,意味着每条记录都会重新计算该值。 | +| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 将日期时间字符串 string1 按格式 string2(如果未指定,默认为 yyyy-MM-dd HH:mm:ss)转换为 Unix 时间戳(秒),使用表配置中指定的时区。
如果日期时间字符串中指定了时区并以 UTC+X 格式(如 "yyyy-MM-dd HH:mm:ss.SSS X")解析,则此函数将使用日期时间字符串中指定的时区而不是表配置中的时区。如果无法解析日期时间字符串,将返回默认值 Long.MIN_VALUE(-9223372036854775808)。 | +| DATE_ADD(date, int) | dateAdd(date, int) | 将 N 天添加到给定日期,返回格式为 'yyyy-MM-dd' 的字符串。 | ## 条件函数 diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 5df6dbcf97e..4a6d40b61f3 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -187,7 +187,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | DATE_FORMAT_TZ(timestamp, format, timezone) | dateFormatTz(timestamp, format, timezone) | Formats a timestamp or datetime value as a string using the given pattern and the specified time zone. The timezone argument can be a time zone ID (for example, 'UTC', 'Asia/Shanghai') or an offset such as '+08:00'. | | TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | -| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | +| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. The format argument applies only to this string overload. | +| TO_DATE(timestamp) | toDate(timestamp) | Converts a TIMESTAMP, TIMESTAMP WITH TIME ZONE (TIMESTAMP_TZ), or TIMESTAMP_LTZ value to a DATE using that value's calendar date. A format pattern is not supported for this overload; use `TO_DATE(string1[, string2])` to parse formatted strings. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | | TO_TIMESTAMP_LTZ(string1[, string2]) | toTimestampLtz(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, with local time zone. | | FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. | diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java index 21e7ddc5d1b..0d8079e71c1 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java @@ -511,6 +511,7 @@ void runTransformSpecs(String group, String name, TestSpec spec) throws Exceptio "specs/logical.yaml", "specs/meta.yaml", "specs/nested.yaml", + "specs/regression.yaml", "specs/string.yaml", "specs/temporal.yaml" }; diff --git a/flink-cdc-composer/src/test/resources/specs/regression.yaml b/flink-cdc-composer/src/test/resources/specs/regression.yaml new file mode 100644 index 00000000000..3df082eebd3 --- /dev/null +++ b/flink-cdc-composer/src/test/resources/specs/regression.yaml @@ -0,0 +1,275 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP) + time-zone: Asia/Tokyo + projection: |- + id_ + timestamp_0_ + TO_DATE(timestamp_0_) AS comp_1 + TIMESTAMPADD(HOUR, 18, timestamp_0_) AS comp_2 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_0_)) AS comp_3 + timestamp_6_ + TO_DATE(timestamp_6_) AS comp_4 + TIMESTAMPADD(HOUR, 18, timestamp_6_) AS comp_5 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_6_)) AS comp_6 + timestamp_9_ + TO_DATE(timestamp_9_) AS comp_7 + TIMESTAMPADD(HOUR, 18, timestamp_9_) AS comp_8 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_9_)) AS comp_9 + filter: id_ > 0 + primary-key: id_ + # timestamp_0_ is: 1970-01-02 10:17:36, TO_DATE should yield 1970-01-02 + # after +18 hours: 1970-01-03 04:17:36, TO_DATE should yield 1970-01-03 + # + # timestamp_6_ is: 1970-01-03 17:09:27, TO_DATE should yield 1970-01-03 + # after +18 hours: 1970-01-04 11:09:27, TO_DATE should yield 1970-01-04 + # + # timestamp_9_ is: 1970-01-05 00:01:18, TO_DATE should yield 1970-01-05 + # after +18 hours: 1970-01-05 18:01:18, TO_DATE should yield 1970-01-05 + # + # None of them are affected by pipeline time-zone config. + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`timestamp_0_` TIMESTAMP(0),`comp_1` DATE,`comp_2` TIMESTAMP(0),`comp_3` DATE,`timestamp_6_` TIMESTAMP(6),`comp_4` DATE,`comp_5` TIMESTAMP(3),`comp_6` DATE,`timestamp_9_` TIMESTAMP(9),`comp_7` DATE,`comp_8` TIMESTAMP(3),`comp_9` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03, 1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912, 1970-01-05], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03, 1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912, 1970-01-05], after=[], op=DELETE, meta=()} + +- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_TZ) + time-zone: America/Los_Angeles + projection: |- + id_ + timestamp_tz_0_ + TO_DATE(timestamp_tz_0_) AS comp_1 + timestamp_tz_6_ + TO_DATE(timestamp_tz_6_) AS comp_2 + timestamp_tz_9_ + TO_DATE(timestamp_tz_9_) AS comp_3 + filter: id_ > 0 + non-null: 'true' + primary-key: id_ + # timestamp_tz_0_ is: 1970-01-02 10:17:36.789123456 (Asia/Shanghai, UTC+8), TO_DATE should yield 1970-01-02 + # timestamp_tz_6_ is: 1970-01-03 17:09:27.891234561 (Europe/Berlin, UTC+1), TO_DATE should yield 1970-01-03 + # timestamp_tz_9_ is: 1970-01-05 00:01:18.912345612 (America/Puerto_Rico, UTC-4), TO_DATE should yield 1970-01-05 + # None of them are affected by pipeline time-zone config. + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`timestamp_tz_0_` TIMESTAMP(0) WITH TIME ZONE,`comp_1` DATE,`timestamp_tz_6_` TIMESTAMP(6) WITH TIME ZONE,`comp_2` DATE,`timestamp_tz_9_` TIMESTAMP(9) WITH TIME ZONE,`comp_3` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02T10:17:36.789123456+08:00, 1970-01-02, 1970-01-03T17:09:27.891234561+01:00, 1970-01-03, 1970-01-05T00:01:18.912345612-04:00, 1970-01-05], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02T10:17:36.789123456+08:00, 1970-01-02, 1970-01-03T17:09:27.891234561+01:00, 1970-01-03, 1970-01-05T00:01:18.912345612-04:00, 1970-01-05], after=[], op=DELETE, meta=()} + +- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_LTZ) + time-zone: Australia/Sydney + projection: |- + id_ + timestamp_ltz_0_ + TO_DATE(timestamp_ltz_0_) AS comp_1 + TIMESTAMPADD(HOUR, 18, timestamp_ltz_0_) AS comp_2 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_0_)) AS comp_3 + timestamp_ltz_6_ + TO_DATE(timestamp_ltz_6_) AS comp_4 + TIMESTAMPADD(HOUR, 18, timestamp_ltz_6_) AS comp_5 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_6_)) AS comp_6 + timestamp_ltz_9_ + TO_DATE(timestamp_ltz_9_) AS comp_7 + TIMESTAMPADD(HOUR, 18, timestamp_ltz_9_) AS comp_8 + TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_9_)) AS comp_9 + filter: id_ > 0 + primary-key: id_ + # timestamp_0_ is: 1970-01-02 10:17:36 (UTC), TO_DATE should yield 1970-01-02 + # after +18 hours: 1970-01-03 04:17:36 (UTC), TO_DATE should yield 1970-01-03 + # + # timestamp_6_ is: 1970-01-03 17:09:27 (UTC), TO_DATE should yield 1970-01-03 + # after +18 hours: 1970-01-04 11:09:27 (UTC), TO_DATE should yield 1970-01-04 + # + # timestamp_9_ is: 1970-01-05 00:01:18 (UTC), TO_DATE should yield 1970-01-05 + # after +18 hours: 1970-01-05 18:01:18 (UTC), TO_DATE should yield 1970-01-05 + # + # None of them are affected by pipeline time-zone config. + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`timestamp_ltz_0_` TIMESTAMP_LTZ(0),`comp_1` DATE,`comp_2` TIMESTAMP_LTZ(0),`comp_3` DATE,`timestamp_ltz_6_` TIMESTAMP_LTZ(6),`comp_4` DATE,`comp_5` TIMESTAMP_LTZ(3),`comp_6` DATE,`timestamp_ltz_9_` TIMESTAMP_LTZ(9),`comp_7` DATE,`comp_8` TIMESTAMP_LTZ(3),`comp_9` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03, 1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912, 1970-01-05], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03, 1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912, 1970-01-05], after=[], op=DELETE, meta=()} + +- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_LTZ, 24 hour offsets) + time-zone: Pacific/Tarawa + projection: |- + id_ + timestamp_ltz_9_ + TO_DATE(timestamp_ltz_9_) AS date_orig + TIMESTAMPADD(HOUR, -12, timestamp_ltz_9_) AS shift_m12 + TO_DATE(TIMESTAMPADD(HOUR, -12, timestamp_ltz_9_)) AS date_m12 + TIMESTAMPADD(HOUR, -11, timestamp_ltz_9_) AS shift_m11 + TO_DATE(TIMESTAMPADD(HOUR, -11, timestamp_ltz_9_)) AS date_m11 + TIMESTAMPADD(HOUR, -10, timestamp_ltz_9_) AS shift_m10 + TO_DATE(TIMESTAMPADD(HOUR, -10, timestamp_ltz_9_)) AS date_m10 + TIMESTAMPADD(HOUR, -9, timestamp_ltz_9_) AS shift_m9 + TO_DATE(TIMESTAMPADD(HOUR, -9, timestamp_ltz_9_)) AS date_m9 + TIMESTAMPADD(HOUR, -8, timestamp_ltz_9_) AS shift_m8 + TO_DATE(TIMESTAMPADD(HOUR, -8, timestamp_ltz_9_)) AS date_m8 + TIMESTAMPADD(HOUR, -7, timestamp_ltz_9_) AS shift_m7 + TO_DATE(TIMESTAMPADD(HOUR, -7, timestamp_ltz_9_)) AS date_m7 + TIMESTAMPADD(HOUR, -6, timestamp_ltz_9_) AS shift_m6 + TO_DATE(TIMESTAMPADD(HOUR, -6, timestamp_ltz_9_)) AS date_m6 + TIMESTAMPADD(HOUR, -5, timestamp_ltz_9_) AS shift_m5 + TO_DATE(TIMESTAMPADD(HOUR, -5, timestamp_ltz_9_)) AS date_m5 + TIMESTAMPADD(HOUR, -4, timestamp_ltz_9_) AS shift_m4 + TO_DATE(TIMESTAMPADD(HOUR, -4, timestamp_ltz_9_)) AS date_m4 + TIMESTAMPADD(HOUR, -3, timestamp_ltz_9_) AS shift_m3 + TO_DATE(TIMESTAMPADD(HOUR, -3, timestamp_ltz_9_)) AS date_m3 + TIMESTAMPADD(HOUR, -2, timestamp_ltz_9_) AS shift_m2 + TO_DATE(TIMESTAMPADD(HOUR, -2, timestamp_ltz_9_)) AS date_m2 + TIMESTAMPADD(HOUR, -1, timestamp_ltz_9_) AS shift_m1 + TO_DATE(TIMESTAMPADD(HOUR, -1, timestamp_ltz_9_)) AS date_m1 + TIMESTAMPADD(HOUR, 0, timestamp_ltz_9_) AS shift_p0 + TO_DATE(TIMESTAMPADD(HOUR, 0, timestamp_ltz_9_)) AS date_p0 + TIMESTAMPADD(HOUR, 1, timestamp_ltz_9_) AS shift_p1 + TO_DATE(TIMESTAMPADD(HOUR, 1, timestamp_ltz_9_)) AS date_p1 + TIMESTAMPADD(HOUR, 2, timestamp_ltz_9_) AS shift_p2 + TO_DATE(TIMESTAMPADD(HOUR, 2, timestamp_ltz_9_)) AS date_p2 + TIMESTAMPADD(HOUR, 3, timestamp_ltz_9_) AS shift_p3 + TO_DATE(TIMESTAMPADD(HOUR, 3, timestamp_ltz_9_)) AS date_p3 + TIMESTAMPADD(HOUR, 4, timestamp_ltz_9_) AS shift_p4 + TO_DATE(TIMESTAMPADD(HOUR, 4, timestamp_ltz_9_)) AS date_p4 + TIMESTAMPADD(HOUR, 5, timestamp_ltz_9_) AS shift_p5 + TO_DATE(TIMESTAMPADD(HOUR, 5, timestamp_ltz_9_)) AS date_p5 + TIMESTAMPADD(HOUR, 6, timestamp_ltz_9_) AS shift_p6 + TO_DATE(TIMESTAMPADD(HOUR, 6, timestamp_ltz_9_)) AS date_p6 + TIMESTAMPADD(HOUR, 7, timestamp_ltz_9_) AS shift_p7 + TO_DATE(TIMESTAMPADD(HOUR, 7, timestamp_ltz_9_)) AS date_p7 + TIMESTAMPADD(HOUR, 8, timestamp_ltz_9_) AS shift_p8 + TO_DATE(TIMESTAMPADD(HOUR, 8, timestamp_ltz_9_)) AS date_p8 + TIMESTAMPADD(HOUR, 9, timestamp_ltz_9_) AS shift_p9 + TO_DATE(TIMESTAMPADD(HOUR, 9, timestamp_ltz_9_)) AS date_p9 + TIMESTAMPADD(HOUR, 10, timestamp_ltz_9_) AS shift_p10 + TO_DATE(TIMESTAMPADD(HOUR, 10, timestamp_ltz_9_)) AS date_p10 + TIMESTAMPADD(HOUR, 11, timestamp_ltz_9_) AS shift_p11 + TO_DATE(TIMESTAMPADD(HOUR, 11, timestamp_ltz_9_)) AS date_p11 + filter: id_ > 0 + primary-key: id_ + # timestamp_ltz_9_ is: 1970-01-05T00:01:18.912345612 (UTC), epoch=345678912ms + # pipeline time-zone is Pacific/Tarawa (UTC+12). + # + # TIMESTAMPADD(HOUR, N, ...) always operates in UTC, producing 24 time points + # spanning all 24 UTC hours, straddling the midnight boundary: + # + # N=-12 → 1970-01-04T12:01:18.912 UTC (UTC+12 local: 1970-01-05T00:01:18) → TO_DATE = 1970-01-04 + # N=-11 → 1970-01-04T13:01:18.912 UTC (UTC+12 local: 1970-01-05T01:01:18) → TO_DATE = 1970-01-04 + # N=-10 → 1970-01-04T14:01:18.912 UTC (UTC+12 local: 1970-01-05T02:01:18) → TO_DATE = 1970-01-04 + # ... + # N= -1 → 1970-01-04T23:01:18.912 UTC (UTC+12 local: 1970-01-05T11:01:18) → TO_DATE = 1970-01-04 + # N= 0 → 1970-01-05T00:01:18.912 UTC (UTC+12 local: 1970-01-05T12:01:18) → TO_DATE = 1970-01-05 + # ... + # N= 11 → 1970-01-05T11:01:18.912 UTC (UTC+12 local: 1970-01-05T23:01:18) → TO_DATE = 1970-01-05 + # + # TO_DATE always uses UTC, so N=-12..-1 → 1970-01-04, N=0..+11 → 1970-01-05. + # A buggy implementation using pipeline time-zone (UTC+12) would return 1970-01-05 for all 24 entries. + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`timestamp_ltz_9_` TIMESTAMP_LTZ(9),`date_orig` DATE,`shift_m12` TIMESTAMP_LTZ(3),`date_m12` DATE,`shift_m11` TIMESTAMP_LTZ(3),`date_m11` DATE,`shift_m10` TIMESTAMP_LTZ(3),`date_m10` DATE,`shift_m9` TIMESTAMP_LTZ(3),`date_m9` DATE,`shift_m8` TIMESTAMP_LTZ(3),`date_m8` DATE,`shift_m7` TIMESTAMP_LTZ(3),`date_m7` DATE,`shift_m6` TIMESTAMP_LTZ(3),`date_m6` DATE,`shift_m5` TIMESTAMP_LTZ(3),`date_m5` DATE,`shift_m4` TIMESTAMP_LTZ(3),`date_m4` DATE,`shift_m3` TIMESTAMP_LTZ(3),`date_m3` DATE,`shift_m2` TIMESTAMP_LTZ(3),`date_m2` DATE,`shift_m1` TIMESTAMP_LTZ(3),`date_m1` DATE,`shift_p0` TIMESTAMP_LTZ(3),`date_p0` DATE,`shift_p1` TIMESTAMP_LTZ(3),`date_p1` DATE,`shift_p2` TIMESTAMP_LTZ(3),`date_p2` DATE,`shift_p3` TIMESTAMP_LTZ(3),`date_p3` DATE,`shift_p4` TIMESTAMP_LTZ(3),`date_p4` DATE,`shift_p5` TIMESTAMP_LTZ(3),`date_p5` DATE,`shift_p6` TIMESTAMP_LTZ(3),`date_p6` DATE,`shift_p7` TIMESTAMP_LTZ(3),`date_p7` DATE,`shift_p8` TIMESTAMP_LTZ(3),`date_p8` DATE,`shift_p9` TIMESTAMP_LTZ(3),`date_p9` DATE,`shift_p10` TIMESTAMP_LTZ(3),`date_p10` DATE,`shift_p11` TIMESTAMP_LTZ(3),`date_p11` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-04T12:01:18.912, 1970-01-04, 1970-01-04T13:01:18.912, 1970-01-04, 1970-01-04T14:01:18.912, 1970-01-04, 1970-01-04T15:01:18.912, 1970-01-04, 1970-01-04T16:01:18.912, 1970-01-04, 1970-01-04T17:01:18.912, 1970-01-04, 1970-01-04T18:01:18.912, 1970-01-04, 1970-01-04T19:01:18.912, 1970-01-04, 1970-01-04T20:01:18.912, 1970-01-04, 1970-01-04T21:01:18.912, 1970-01-04, 1970-01-04T22:01:18.912, 1970-01-04, 1970-01-04T23:01:18.912, 1970-01-04, 1970-01-05T00:01:18.912, 1970-01-05, 1970-01-05T01:01:18.912, 1970-01-05, 1970-01-05T02:01:18.912, 1970-01-05, 1970-01-05T03:01:18.912, 1970-01-05, 1970-01-05T04:01:18.912, 1970-01-05, 1970-01-05T05:01:18.912, 1970-01-05, 1970-01-05T06:01:18.912, 1970-01-05, 1970-01-05T07:01:18.912, 1970-01-05, 1970-01-05T08:01:18.912, 1970-01-05, 1970-01-05T09:01:18.912, 1970-01-05, 1970-01-05T10:01:18.912, 1970-01-05, 1970-01-05T11:01:18.912, 1970-01-05], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-04T12:01:18.912, 1970-01-04, 1970-01-04T13:01:18.912, 1970-01-04, 1970-01-04T14:01:18.912, 1970-01-04, 1970-01-04T15:01:18.912, 1970-01-04, 1970-01-04T16:01:18.912, 1970-01-04, 1970-01-04T17:01:18.912, 1970-01-04, 1970-01-04T18:01:18.912, 1970-01-04, 1970-01-04T19:01:18.912, 1970-01-04, 1970-01-04T20:01:18.912, 1970-01-04, 1970-01-04T21:01:18.912, 1970-01-04, 1970-01-04T22:01:18.912, 1970-01-04, 1970-01-04T23:01:18.912, 1970-01-04, 1970-01-05T00:01:18.912, 1970-01-05, 1970-01-05T01:01:18.912, 1970-01-05, 1970-01-05T02:01:18.912, 1970-01-05, 1970-01-05T03:01:18.912, 1970-01-05, 1970-01-05T04:01:18.912, 1970-01-05, 1970-01-05T05:01:18.912, 1970-01-05, 1970-01-05T06:01:18.912, 1970-01-05, 1970-01-05T07:01:18.912, 1970-01-05, 1970-01-05T08:01:18.912, 1970-01-05, 1970-01-05T09:01:18.912, 1970-01-05, 1970-01-05T10:01:18.912, 1970-01-05, 1970-01-05T11:01:18.912, 1970-01-05], after=[], op=DELETE, meta=()} + +- do: TO_DATE chained with FROM_UNIXTIME (common CDC pattern) + projection: |- + id_ + TO_DATE(FROM_UNIXTIME(bigint_)) AS comp_1 + TO_DATE(FROM_UNIXTIME(bigint_, 'yyyy-MM-dd')) AS comp_2 + primary-key: id_ + # bigint_ = 5 for record1 => FROM_UNIXTIME(5) = "1970-01-01 00:00:05" + # TO_DATE("1970-01-01 00:00:05") = null (format mismatch: contains time, TO_DATE expects 'yyyy-MM-dd') + # FROM_UNIXTIME(5, 'yyyy-MM-dd') = "1970-01-01", TO_DATE("1970-01-01") = 1970-01-01 + # bigint_ = -5 for record2 => FROM_UNIXTIME(-5) = "1969-12-31 23:59:55" + # TO_DATE("1969-12-31 23:59:55") = null (same reason) + # FROM_UNIXTIME(-5, 'yyyy-MM-dd') = "1969-12-31", TO_DATE("1969-12-31") = 1969-12-31 + # NOTE: Customers should always use FROM_UNIXTIME(x, 'yyyy-MM-dd') when chaining with TO_DATE + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, 1970-01-01], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 1970-01-01], after=[-1, null, 1969-12-31], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 1969-12-31], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} + +- do: TO_DATE chained with DATE_FORMAT (extract date from timestamp) + projection: |- + id_ + TO_DATE(DATE_FORMAT(timestamp_0_, 'yyyy-MM-dd')) AS comp_1 + TO_DATE(DATE_FORMAT(timestamp_0_, 'yyyy-MM-01')) AS comp_2 + primary-key: id_ + # timestamp_0_ record1 = 1970-01-02T10:17:36 + # DATE_FORMAT(..., 'yyyy-MM-dd') = "1970-01-02", TO_DATE = 1970-01-02 + # DATE_FORMAT(..., 'yyyy-MM-01') = "1970-01-01", TO_DATE = 1970-01-01 + # timestamp_0_ record2 = 1970-01-09T08:57:36 + # DATE_FORMAT(..., 'yyyy-MM-dd') = "1970-01-09", TO_DATE = 1970-01-09 + # DATE_FORMAT(..., 'yyyy-MM-01') = "1970-01-01", TO_DATE = 1970-01-01 + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02, 1970-01-01], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02, 1970-01-01], after=[-1, 1970-01-09, 1970-01-01], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1970-01-09, 1970-01-01], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} + +- do: TO_DATE with CASE WHEN null-guard pattern (common CDC pattern) + projection: |- + id_ + CASE WHEN timestamp_0_ IS NULL THEN TO_DATE('2000-01-01') ELSE TO_DATE(timestamp_0_) END AS comp_1 + CASE WHEN bigint_ IS NULL THEN TO_DATE('2000-01-01') ELSE TO_DATE(FROM_UNIXTIME(bigint_, 'yyyy-MM-dd')) END AS comp_2 + primary-key: id_ + # record1: timestamp_0_ not null => TO_DATE(1970-01-02T10:17:36) = 1970-01-02 + # bigint_=5, not null => TO_DATE(FROM_UNIXTIME(5,'yyyy-MM-dd')) = TO_DATE("1970-01-01") = 1970-01-01 + # record2: timestamp_0_ not null => TO_DATE(1970-01-09T08:57:36) = 1970-01-09 + # bigint_=-5, not null => TO_DATE(FROM_UNIXTIME(-5,'yyyy-MM-dd')) = TO_DATE("1969-12-31") = 1969-12-31 + # record3: both null => TO_DATE('2000-01-01') = 2000-01-01 + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02, 1970-01-01], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02, 1970-01-01], after=[-1, 1970-01-09, 1969-12-31], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1970-01-09, 1969-12-31], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, 2000-01-01, 2000-01-01], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, 2000-01-01, 2000-01-01], after=[], op=DELETE, meta=()} + +- do: TO_DATE with TO_TIMESTAMP_LTZ (convert epoch to date) + projection: |- + id_ + TO_DATE(TO_TIMESTAMP_LTZ(bigint_, 0)) AS comp_1 + primary-key: id_ + # bigint_ = 5 for record1 => TO_TIMESTAMP_LTZ(5, 0) = epoch second 5 = 1970-01-01T00:00:05 UTC + # TO_DATE => 1970-01-01 + # bigint_ = -5 for record2 => TO_TIMESTAMP_LTZ(-5, 0) = 1969-12-31T23:59:55 UTC + # TO_DATE => 1969-12-31 + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-01], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-01], after=[-1, 1969-12-31], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1969-12-31], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()} + +- do: TO_DATE with DATE_FORMAT yyyyMMdd pattern (CDC partition key) + projection: |- + id_ + DATE_FORMAT(timestamp_0_, 'yyyyMMdd') AS comp_1 + TO_DATE(DATE_FORMAT(timestamp_6_, 'yyyyMMdd'), 'yyyyMMdd') AS comp_2 + primary-key: id_ + # timestamp_0_ record1 = 1970-01-02T10:17:36 => DATE_FORMAT = "19700102" + # timestamp_6_ record1 = 1970-01-03T17:09:27 => DATE_FORMAT = "19700103", TO_DATE("19700103","yyyyMMdd") = 1970-01-03 + # timestamp_0_ record2 = 1970-01-09T08:57:36 => DATE_FORMAT = "19700109" + # timestamp_6_ record2 = 1970-01-10T15:49:27 => DATE_FORMAT = "19700110", TO_DATE("19700110","yyyyMMdd") = 1970-01-10 + expect: |- + CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` STRING,`comp_2` DATE}, primaryKeys=id_, options=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 19700102, 1970-01-03], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, 19700102, 1970-01-03], after=[-1, 19700109, 1970-01-10], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[-1, 19700109, 1970-01-10], after=[], op=DELETE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java index e8e0291b549..de31a81fcac 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java @@ -30,6 +30,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.TimeZone; @@ -152,6 +153,27 @@ public static LocalDate toDate(String str, String format) { return DateTimeUtils.parseDate(str, format); } + public static LocalDate toDate(LocalDateTime localDateTime) { + if (localDateTime == null) { + return null; + } + return localDateTime.toLocalDate(); + } + + public static LocalDate toDate(ZonedDateTime zonedDateTime) { + if (zonedDateTime == null) { + return null; + } + return zonedDateTime.toLocalDate(); + } + + public static LocalDate toDate(Instant instant) { + if (instant == null) { + return null; + } + return LocalDateTime.ofInstant(instant, ZoneId.of("UTC")).toLocalDate(); + } + public static LocalDateTime toTimestamp(String str, String timezone) { return toTimestamp(str, "yyyy-MM-dd HH:mm:ss", timezone); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 6044fbd5dd3..eef0755c112 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -285,8 +285,10 @@ public SqlSyntax getSyntax() { SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( + // Only "from string" mode supports specifying formatter. OperandTypes.family(SqlTypeFamily.STRING), - OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), + OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.TIMESTAMP)), SqlFunctionCategory.TIMEDATE); public static final SqlFunction TO_TIMESTAMP = new SqlFunction(