diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 3a378cc8612..1d0e619c715 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -142,18 +142,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 来解析表达式并且 ## 数学函数 -| Function | Janino Code | Description | -|---------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. | -| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. | -| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. | -| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. | -| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. | -| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. | -| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. | -| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. | -| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. | -| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. | +| Function | Janino Code | Description | +|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. | +| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. | +| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. | +| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. | +| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. | +| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. | +| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. | +| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. | +| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. | +| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. | ## 字符串函数 @@ -180,13 +180,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 来解析表达式并且 | CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). | | NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. | | DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT(date, string) | dateFormat(date, string) | Converts given date to a value of string in the format specified by the format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT(time, string) | dateFormat(time, string) | Converts given time to a value of string in the format specified by the format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT_TZ(timestamp, format, timezone) | dateFormatTz(timestamp, format, timezone) | Formats a timestamp or datetime value as a string using the given pattern and the specified time zone. The timezone argument can be a time zone ID (for example, 'UTC', 'Asia/Shanghai') or an offset such as '+08:00'. | | TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | +| TO_TIMESTAMP_LTZ(string1[, string2]) | toTimestampLtz(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, with local time zone. | | FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. | | UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | | UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned. | +| DATE_ADD(date, int) | dateAdd(date, int) | Adds N days to the given date and returns a string in 'yyyy-MM-dd' format. | ## 条件函数 diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 18efeccf4dc..4c3ea60d8b3 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -112,91 +112,96 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ ## Comparison Functions -| Function | Janino Code | Description | -|----------------------|----------------------------------------------|-----------------------------------------------------------------| -| value1 = value2 | valueEquals(value1, value2) | Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is NULL. | -| value1 <> value2 | !valueEquals(value1, value2) | Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or value2 is NULL. | -| value1 > value2 | greaterThan(value1, value2) | Returns TRUE if value1 is greater than value2; returns FALSE if value1 or value2 is NULL. | -| value1 >= value2 | greaterThanOrEqual(value1, value2) | Returns TRUE if value1 is greater than or equal to value2; returns FALSE if value1 or value2 is NULL. | -| value1 < value2 | lessThan(value1, value2) | Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2 is NULL. | -| value1 <= value2 | lessThanOrEqual(value1, value2) | Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1 or value2 is NULL. | -| value IS NULL | null == value | Returns TRUE if value is NULL. | -| value IS NOT NULL | null != value | Returns TRUE if value is not NULL. | -| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. | -| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is less than value2 or greater than value3. | -| string1 LIKE string2 | like(string1, string2) | Returns TRUE if string1 matches pattern string2. | -| string1 NOT LIKE string2 | notLike(string1, string2) | Returns TRUE if string1 does not match pattern string2. | -| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*) | Returns TRUE if value1 exists in the given list (value2, value3, …). | -| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*) | Returns TRUE if value1 does not exist in the given list (value2, value3, …). | +| Function | Janino Code | Description | +|--------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------------------------| +| value1 = value2 | valueEquals(value1, value2) | Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is NULL. | +| value1 <> value2 | !valueEquals(value1, value2) | Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or value2 is NULL. | +| value1 > value2 | greaterThan(value1, value2) | Returns TRUE if value1 is greater than value2; returns FALSE if value1 or value2 is NULL. | +| value1 >= value2 | greaterThanOrEqual(value1, value2) | Returns TRUE if value1 is greater than or equal to value2; returns FALSE if value1 or value2 is NULL. | +| value1 < value2 | lessThan(value1, value2) | Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2 is NULL. | +| value1 <= value2 | lessThanOrEqual(value1, value2) | Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1 or value2 is NULL. | +| value IS NULL | null == value | Returns TRUE if value is NULL. | +| value IS NOT NULL | null != value | Returns TRUE if value is not NULL. | +| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. | +| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is less than value2 or greater than value3. | +| string1 LIKE string2 | like(string1, string2) | Returns TRUE if string1 matches pattern string2. | +| string1 NOT LIKE string2 | notLike(string1, string2) | Returns TRUE if string1 does not match pattern string2. | +| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*) | Returns TRUE if value1 exists in the given list (value2, value3, …). | +| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*) | Returns TRUE if value1 does not exist in the given list (value2, value3, …). | ## Logical Functions -| Function | Janino Code | Description | -|----------------------|-----------------------------|-----------------------------------------------------------------| -| boolean1 OR boolean2 | boolean1 || boolean2 | Returns TRUE if BOOLEAN1 is TRUE or BOOLEAN2 is TRUE. | -| boolean1 AND boolean2 | boolean1 && boolean2 | Returns TRUE if BOOLEAN1 and BOOLEAN2 are both TRUE. | -| NOT boolean | !boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | -| boolean IS FALSE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | -| boolean IS NOT FALSE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. | -| boolean IS TRUE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. | -| boolean IS NOT TRUE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | +| Function | Janino Code | Description | +|-----------------------|--------------------------------|---------------------------------------------------------------------| +| boolean1 OR boolean2 | boolean1 || boolean2 | Returns TRUE if BOOLEAN1 is TRUE or BOOLEAN2 is TRUE. | +| boolean1 AND boolean2 | boolean1 && boolean2 | Returns TRUE if BOOLEAN1 and BOOLEAN2 are both TRUE. | +| NOT boolean | !boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | +| boolean IS FALSE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | +| boolean IS NOT FALSE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. | +| boolean IS TRUE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. | +| boolean IS NOT TRUE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. | ## Arithmetic Functions -| Function | Janino Code | Description | -|------------------------------------|-----------------------------|-----------------------------------------------------------------| -| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. | -| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. | -| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. | -| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. | -| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. | -| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. | -| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. | -| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. | -| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. | -| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. | +| Function | Janino Code | Description | +|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. | +| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. | +| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. | +| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. | +| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. | +| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. | +| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. | +| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. | +| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. | +| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. | ## String Functions -| Function | Janino Code | Description | -| -------------------- | ------------------------ |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | -| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | -| UPPER(string) | upper(string) | Returns string in uppercase. | -| LOWER(string) | lower(string) | Returns string in lowercase. | -| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | -| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | -| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | -| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | -| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | +| Function | Janino Code | Description | +|--------------------------------------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | +| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | +| UPPER(string) | upper(string) | Returns string in uppercase. | +| LOWER(string) | lower(string) | Returns string in lowercase. | +| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | +| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | +| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | ## Temporal Functions -| Function | Janino Code | Description | -| -------------------- | ------------------------ | ------------------------------------------------- | -| LOCALTIME | localtime() | Returns the current SQL time in the local time zone, the return type is TIME(0). | -| LOCALTIMESTAMP | localtimestamp() | Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP(3). | -| CURRENT_TIME | currentTime() | Returns the current SQL time in the local time zone, this is a synonym of LOCAL_TIME. | -| CURRENT_DATE | currentDate() | Returns the current SQL date in the local time zone. | -| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). | -| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. | -| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. | -| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | -| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | -| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | -| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | -| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. | -| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | -| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.| +| Function | Janino Code | Description | +|------------------------------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| LOCALTIME | localtime() | Returns the current SQL time in the local time zone, the return type is TIME(0). | +| LOCALTIMESTAMP | localtimestamp() | Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP(3). | +| CURRENT_TIME | currentTime() | Returns the current SQL time in the local time zone, this is a synonym of LOCAL_TIME. | +| CURRENT_DATE | currentDate() | Returns the current SQL date in the local time zone. | +| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). | +| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. | +| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT(date, string) | dateFormat(date, string) | Converts given date to a value of string in the format specified by the format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT(time, string) | dateFormat(time, string) | Converts given time to a value of string in the format specified by the format string. The format string is compatible with Java's SimpleDateFormat. | +| DATE_FORMAT_TZ(timestamp, format, timezone) | dateFormatTz(timestamp, format, timezone) | Formats a timestamp or datetime value as a string using the given pattern and the specified time zone. The timezone argument can be a time zone ID (for example, 'UTC', 'Asia/Shanghai') or an offset such as '+08:00'. | +| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | +| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | +| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | +| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | +| TO_TIMESTAMP_LTZ(string1[, string2]) | toTimestampLtz(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, with local time zone. | +| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. | +| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | +| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned. | +| DATE_ADD(date, int) | dateAdd(date, int) | Add N days to given date data. | ## Conditional Functions -| Function | Janino Code | Description | -| -------------------- | ------------------------ | ------------------------------------------------- | -| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first time value is contained in (valueX_1, valueX_2, …). When no value matches, returns result_z if it is provided and returns NULL otherwise. | -| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first conditionX is met. When no condition is met, returns result_z if it is provided and returns NULL otherwise. | -| COALESCE(value1 [, value2]*) | coalesce(Object... objects) | Returns the first argument that is not NULL.If all arguments are NULL, it returns NULL as well. The return type is the least restrictive, common type of all of its arguments. The return type is nullable if all arguments are nullable as well. | -| IF(condition, true_value, false_value) | condition ? true_value : false_value | Returns the true_value if condition is met, otherwise false_value. E.g., IF(5 > 3, 5, 3) returns 5. | +| Function | Janino Code | Description | +|-----------------------------------------------------------------------------------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first time value is contained in (valueX_1, valueX_2, …). When no value matches, returns result_z if it is provided and returns NULL otherwise. | +| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first conditionX is met. When no condition is met, returns result_z if it is provided and returns NULL otherwise. | +| COALESCE(value1 [, value2]*) | coalesce(Object... objects) | Returns the first argument that is not NULL.If all arguments are NULL, it returns NULL as well. The return type is the least restrictive, common type of all of its arguments. The return type is nullable if all arguments are nullable as well. | +| IF(condition, true_value, false_value) | condition ? true_value : false_value | Returns the true_value if condition is met, otherwise false_value. E.g., IF(5 > 3, 5, 3) returns 5. | ## Casting Functions diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index c81b1ba463b..e4da2fca351 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -51,7 +51,6 @@ import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; import org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException; -import org.apache.flink.cdc.runtime.parser.JaninoCompiler; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -2737,14 +2736,9 @@ void testTransformErrorMessage() { .cause() .isExactlyInstanceOf(FlinkRuntimeException.class) .hasMessage( - "Failed to compile expression TransformExpressionKey{originalExpression='id1 > 0', expression='" - + JaninoCompiler.LOAD_MODULES_EXPRESSION - + "greaterThan($0, 0)', argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}") + "Failed to compile expression TransformExpressionKey{originalExpression='id1 > 0', compiledExpression='greaterThan($0, 0)', argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}") .cause() - .hasMessageContaining( - "Compiled expression: " - + JaninoCompiler.LOAD_MODULES_EXPRESSION - + "greaterThan($0, 0)") + .hasMessageContaining("Compiled expression: greaterThan($0, 0)") .hasMessageContaining("Column name map: {$0 -> id1}") .rootCause() .isExactlyInstanceOf(CompileException.class) @@ -2812,7 +2806,6 @@ void testTransformErrorMessage() { + "\tOriginal expression: name + 1 > 0\n" + "\tCompiled expression: greaterThan($0 + 1, 0)\n" + "\tColumn name map: {$0 -> name}") - .hasMessageContaining("Column name map: {$0 -> name}") .rootCause() .isExactlyInstanceOf(RuntimeException.class) .hasMessage( diff --git a/flink-cdc-composer/src/test/resources/specs/comparison.yaml b/flink-cdc-composer/src/test/resources/specs/comparison.yaml index 99c6d6d2d7d..c022f078064 100644 --- a/flink-cdc-composer/src/test/resources/specs/comparison.yaml +++ b/flink-cdc-composer/src/test/resources/specs/comparison.yaml @@ -166,7 +166,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, true, false, true, false], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, true, false, true, false], after=[], op=DELETE, meta=()} - do: Between Op - ignore: FLINK-38906 projection: |- id_ tinyint_ BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1 @@ -189,7 +188,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false, false, false, false, false, false, false, false, false, false], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false, false, false, false, false, false, false, false, false], after=[], op=DELETE, meta=()} - do: Not Between Op - ignore: FLINK-38906 projection: |- id_ tinyint_ NOT BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1 @@ -238,7 +236,6 @@ projection: id_, char_ NOT LIKE 'A.*' AS comp_1, varchar_ NOT LIKE '.*rro' AS comp_2, string_ NOT LIKE 'From [A-Z] to [A-Z] is Lie' AS comp_3 primary-key: id_ - do: In Op - ignore: FLINK-38906 projection: |- id_ tinyint_ IN (CAST(2 AS TINYINT)) AS comp_1 @@ -261,7 +258,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false, false, false, false, false, false, false, false, false, false], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false, false, false, false, false, false, false, false, false], after=[], op=DELETE, meta=()} - do: Not In Op - ignore: FLINK-38906 projection: |- id_ tinyint_ NOT IN (CAST(2 AS TINYINT)) AS comp_1 diff --git a/flink-cdc-composer/src/test/resources/specs/decimal.yaml b/flink-cdc-composer/src/test/resources/specs/decimal.yaml index e12ddb0256a..f1418dd32c5 100644 --- a/flink-cdc-composer/src/test/resources/specs/decimal.yaml +++ b/flink-cdc-composer/src/test/resources/specs/decimal.yaml @@ -14,7 +14,6 @@ ################################################################################ - do: Add Op - ignore: FLINK-38906 projection: |- id_ decimal_10_0_ + CAST(1 AS DECIMAL(1, 0)) AS comp_1 @@ -27,7 +26,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567891, null], after=[-1, -9876543209, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543209, null], after=[], op=DELETE, meta=()} - do: Subtract Op - ignore: FLINK-38906 projection: |- id_ decimal_10_0_ - CAST(1 AS DECIMAL(1, 0)) AS comp_1 @@ -40,7 +38,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567889, null], after=[-1, -9876543211, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543211, null], after=[], op=DELETE, meta=()} - do: Multiply Op - ignore: FLINK-38906 projection: |- id_ decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1 @@ -53,7 +50,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[1, 2469135780, null], after=[-1, -19753086420, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -19753086420, null], after=[], op=DELETE, meta=()} - do: Divide Op - ignore: FLINK-38906 projection: |- id_ decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1 @@ -66,7 +62,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[1, 617.283945, 61728394506172839.45], after=[-1, -4938.271605, null], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, -4938.271605, null], after=[], op=DELETE, meta=()} - do: Abs Op - ignore: FLINK-38906 projection: |- id_ ABS(decimal_10_0_) AS comp_1 @@ -80,7 +75,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Ceil Op - ignore: FLINK-38906 projection: |- id_ CEIL(decimal_10_0_) AS comp_1 @@ -94,7 +88,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Floor Op - ignore: FLINK-38906 projection: |- id_ FLOOR(decimal_10_0_) AS comp_1 @@ -108,7 +101,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Round Op - ignore: FLINK-38906 projection: |- id_ ROUND(decimal_10_0_, 1) AS comp_1 diff --git a/flink-cdc-composer/src/test/resources/specs/meta.yaml b/flink-cdc-composer/src/test/resources/specs/meta.yaml index e8c893bf5dc..df6c2f85739 100644 --- a/flink-cdc-composer/src/test/resources/specs/meta.yaml +++ b/flink-cdc-composer/src/test/resources/specs/meta.yaml @@ -47,15 +47,3 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, -1, false, -2, -3, -4, -5, -7.7, -88.88, -9876543210, -987654321098765432.10, 爱丽丝, 疯帽子, 天地玄黄宇宙洪荒, 5LiA5LqM5LiJ5Zub5LqU, 5YWt5LiD5YWr5Lmd5Y2B, 5ZC+6Lyp44Gv54yr44Gn44GC44KL, 1970-01-09T08:57:36.789723456, 1970-01-10T15:49:27.891834561, 1970-01-11T22:41:18.912945612, 1970-01-09T08:57:36.789723456+08:00, 1970-01-10T15:49:27.891834561+01:00, 1970-01-11T22:41:18.912945612-04:00, 1970-01-09T08:57:36.789723456, 1970-01-10T15:49:27.891834561, 1970-01-11T22:41:18.912945612, 2001-01-01, 12:34:45, 23:45:07, 02:30:05, [2, 3, 5, 7, 11, 13, 17, 19], [二, san, 五, qi, 十一], {1 -> yi, 2 -> er, 3 -> san}, {二 -> [E, R], 三 -> [S, A, N], 一 -> [Y, I]}, {name: STRING -> Derrida, length: INT -> 7}, [{"k":1},"hello",{"k":2}]], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[+I, 0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, 0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()} -- do: Downcase Post Converter - ignore: FLINK-38887 - projection: id_, 'UPCASE' AS UPCASE, 'downcase' AS downcase, 'MiXeD cAsE' AS MiXeD_cAsE - primary-key: id_ - converters: FIELD_NAME_LOWER_CASE - expect: |- - CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`upcase` STRING,`downcase` STRING,`mixed_case` STRING}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, UPCASE, downcase, MiXeD cAsE], op=INSERT, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, UPCASE, downcase, MiXeD cAsE], after=[-1, UPCASE, downcase, MiXeD cAsE], op=UPDATE, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[-1, UPCASE, downcase, MiXeD cAsE], after=[], op=DELETE, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, UPCASE, downcase, MiXeD cAsE], op=INSERT, meta=()} - DataChangeEvent{tableId=foo.bar.baz, before=[0, UPCASE, downcase, MiXeD cAsE], after=[], op=DELETE, meta=()} diff --git a/flink-cdc-composer/src/test/resources/specs/temporal.yaml b/flink-cdc-composer/src/test/resources/specs/temporal.yaml index 3e485e6594f..3e612529631 100644 --- a/flink-cdc-composer/src/test/resources/specs/temporal.yaml +++ b/flink-cdc-composer/src/test/resources/specs/temporal.yaml @@ -78,7 +78,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: To Date Function - ignore: FLINK-38906 projection: |- id_ TO_DATE('2025-01-05') AS comp_1 @@ -117,7 +116,6 @@ TO_TIMESTAMP('2024 !! 02 !! 14 11 !! 45 !! 49', 'yyyy//MM//dd') AS comp expect-error: 'Unparseable date: "2024 !! 02 !! 14 11 !! 45 !! 49"' - do: Format DateType and TimeType - ignore: FLINK-38906 projection: |- id_, date_, time_0_, time_6_ DATE_FORMAT(date_, 'yyyy->MM->dd') AS date_fmt_1 @@ -134,7 +132,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: From UnixTime Function - ignore: FLINK-38906 projection: |- FROM_UNIXTIME(bigint_) AS col_1 FROM_UNIXTIME(bigint_, 'yyyy/MM/dd HH;mm;ss') AS col_2 @@ -146,7 +143,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[], op=DELETE, meta=()} - do: From UnixTime Function with Implicit Conversion - ignore: FLINK-38906 projection: |- FROM_UNIXTIME(int_) AS col_1 FROM_UNIXTIME(int_, 'yyyy/MM/dd HH;mm;ss') AS col_2 @@ -158,7 +154,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[], op=DELETE, meta=()} - do: To Timestamp LTZ Function - ignore: FLINK-38906 projection: |- TO_TIMESTAMP_LTZ(bigint_) AS col_1 TO_TIMESTAMP_LTZ(bigint_, 0) AS col_2 @@ -174,7 +169,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, 1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP(0) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_0_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_0_, 'America/Los_Angeles') AS col_2 @@ -190,7 +184,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP(6) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_6_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_6_, 'America/Los_Angeles') AS col_2 @@ -206,7 +199,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP(9) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_9_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_9_, 'America/Los_Angeles') AS col_2 @@ -222,7 +214,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP_LTZ(0) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_ltz_0_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_ltz_0_, 'America/Los_Angeles') AS col_2 @@ -238,7 +229,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP_LTZ(6) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_ltz_6_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_ltz_6_, 'America/Los_Angeles') AS col_2 @@ -254,7 +244,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: Formatting TIMESTAMP_LTZ(9) with Timezone - ignore: FLINK-38906 projection: |- DATE_FORMAT_TZ(timestamp_ltz_9_, 'Asia/Shanghai') AS col_1 DATE_FORMAT_TZ(timestamp_ltz_9_, 'America/Los_Angeles') AS col_2 @@ -270,7 +259,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, null], after=[], op=DELETE, meta=()} - do: DATE_ADD Function - ignore: FLINK-38906 projection: |- DATE_ADD(timestamp_0_, 17) AS col_1 DATE_ADD(timestamp_6_, 17) AS col_2 diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java index be2df0e1f36..41605ca8b55 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java @@ -78,6 +78,13 @@ public static boolean betweenAsymmetric(String value, String minValue, String ma return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 0; } + public static boolean betweenAsymmetric(Byte value, byte minValue, byte maxValue) { + if (value == null) { + return false; + } + return value >= minValue && value <= maxValue; + } + public static boolean betweenAsymmetric(Short value, short minValue, short maxValue) { if (value == null) { return false; @@ -125,6 +132,10 @@ public static boolean notBetweenAsymmetric(String value, String minValue, String return !betweenAsymmetric(value, minValue, maxValue); } + public static boolean notBetweenAsymmetric(Byte value, byte minValue, byte maxValue) { + return !betweenAsymmetric(value, minValue, maxValue); + } + public static boolean notBetweenAsymmetric(Short value, short minValue, short maxValue) { return !betweenAsymmetric(value, minValue, maxValue); } @@ -151,37 +162,45 @@ public static boolean notBetweenAsymmetric( } public static boolean in(String value, String... str) { - return Arrays.stream(str).anyMatch(item -> value.equals(item)); + return Arrays.asList(str).contains(value); + } + + public static boolean in(Byte value, Byte... values) { + return Arrays.asList(values).contains(value); } public static boolean in(Short value, Short... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean in(Integer value, Integer... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean in(Long value, Long... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean in(Float value, Float... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean in(Double value, Double... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean in(BigDecimal value, BigDecimal... values) { - return Arrays.stream(values).anyMatch(item -> value.equals(item)); + return Arrays.asList(values).contains(value); } public static boolean notIn(String value, String... values) { return !in(value, values); } + public static boolean notIn(Byte value, Byte... values) { + return !in(value, values); + } + public static boolean notIn(Short value, Short... values) { return !in(value, values); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 4adc1f2a92d..ae0b48d57d1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -487,7 +487,7 @@ private List createTransformers() { new PostTransformer( selectors, TransformProjection.of(projection).orElse(null), - TransformFilter.of(filterExpression, udfDescriptors).orElse(null), + TransformFilter.of(filterExpression).orElse(null), PostTransformConverters.of(rule.getPostTransformConverter()) .orElse(null), rule.getSupportedMetadataColumns()); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 61af9e370c6..ff6cc512f44 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -116,7 +116,7 @@ public void setup( new PreTransformer( selectors, TransformProjection.of(projection).orElse(null), - TransformFilter.of(filter, udfDescriptors).orElse(null))); + TransformFilter.of(filter).orElse(null))); schemaMetadataTransformers.add( new Tuple2<>( selectors, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index f2cfc425621..db5c37ca258 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -131,7 +131,6 @@ private TransformExpressionKey generateTransformExpressionKey() { List> paramTypes = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); - String scriptExpression = projectionColumn.getScriptExpression(); Map columnNameMap = projectionColumn.getColumnNameMap(); LinkedHashSet originalColumnNames = new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); @@ -171,7 +170,7 @@ private TransformExpressionKey generateTransformExpressionKey() { return TransformExpressionKey.of( projectionColumn.getExpression(), - JaninoCompiler.loadSystemFunction(scriptExpression), + projectionColumn.getScriptExpression(), argumentNames, paramTypes, JavaClassConverter.toJavaClass(projectionColumn.getDataType()), diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java index bc496faf92a..1085a7df82d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java @@ -26,6 +26,8 @@ import org.codehaus.commons.compiler.CompileException; import org.codehaus.janino.ExpressionEvaluator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -36,6 +38,8 @@ */ public class TransformExpressionCompiler { + private static final Logger LOG = LoggerFactory.getLogger(TransformExpressionCompiler.class); + static final Cache COMPILED_EXPRESSION_CACHE = CacheBuilder.newBuilder().softValues().build(); @@ -71,9 +75,17 @@ public static ExpressionEvaluator compileExpression( // Result type expressionEvaluator.setExpressionType(key.getReturnClass()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Going to evaluate expression: {}", key.getFullExpression()); + LOG.debug(" - Argument names: {}", argumentNames); + LOG.debug(" - Argument types: {}", argumentClasses); + LOG.debug(" - Returns: {}", key.getReturnClass()); + } + try { // Compile - expressionEvaluator.cook(key.getExpression()); + expressionEvaluator.cook(key.getFullExpression()); } catch (CompileException e) { throw new InvalidProgramException( String.format( @@ -82,7 +94,7 @@ public static ExpressionEvaluator compileExpression( + "\tCompiled expression: %s\n" + "\tColumn name map: {%s}", key.getOriginalExpression(), - key.getExpression(), + key.getCompiledExpression(), TransformException.prettyPrintColumnNameMap( key.getColumnNameMap())), e); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java index 75cb13b665c..5a15662e551 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.cdc.runtime.parser.JaninoCompiler; + import javax.annotation.Nullable; import java.io.Serializable; @@ -43,7 +45,7 @@ public class TransformExpressionKey implements Serializable { private static final long serialVersionUID = 1L; @Nullable private final String originalExpression; - private final String expression; + private final String compiledExpression; private final List argumentNames; private final List> argumentClasses; private final Class returnClass; @@ -51,13 +53,13 @@ public class TransformExpressionKey implements Serializable { private TransformExpressionKey( @Nullable String originalExpression, - String expression, + String compiledExpression, List argumentNames, List> argumentClasses, Class returnClass, Map columnNameMap) { this.originalExpression = originalExpression; - this.expression = expression; + this.compiledExpression = compiledExpression; this.argumentNames = argumentNames; this.argumentClasses = argumentClasses; this.returnClass = returnClass; @@ -69,8 +71,12 @@ public String getOriginalExpression() { return originalExpression; } - public String getExpression() { - return expression; + public String getCompiledExpression() { + return compiledExpression; + } + + public String getFullExpression() { + return JaninoCompiler.loadSystemFunction(compiledExpression); } public List getArgumentNames() { @@ -91,14 +97,14 @@ public Map getColumnNameMap() { public static TransformExpressionKey of( @Nullable String originalExpression, - String expression, + String compiledExpression, List argumentNames, List> argumentClasses, Class returnClass, Map columnNameMap) { return new TransformExpressionKey( originalExpression, - expression, + compiledExpression, argumentNames, argumentClasses, returnClass, @@ -115,7 +121,7 @@ public boolean equals(Object o) { } TransformExpressionKey that = (TransformExpressionKey) o; return Objects.equals(originalExpression, that.originalExpression) - && expression.equals(that.expression) + && compiledExpression.equals(that.compiledExpression) && argumentNames.equals(that.argumentNames) && argumentClasses.equals(that.argumentClasses) && returnClass.equals(that.returnClass) @@ -126,7 +132,7 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash( originalExpression, - expression, + compiledExpression, argumentNames, argumentClasses, returnClass, @@ -139,8 +145,8 @@ public String toString() { + "originalExpression='" + originalExpression + '\'' - + ", expression='" - + expression + + ", compiledExpression='" + + compiledExpression + '\'' + ", argumentNames=" + argumentNames diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java index 3feaa08223e..a9ea00dd072 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java @@ -41,17 +41,12 @@ public class TransformFilter implements Serializable { private static final long serialVersionUID = 1L; private final String expression; - private final String scriptExpression; private final List columnNames; private final Map columnNameMap; public TransformFilter( - String expression, - String scriptExpression, - List columnNames, - Map columnNameMap) { + String expression, List columnNames, Map columnNameMap) { this.expression = expression; - this.scriptExpression = scriptExpression; this.columnNames = columnNames; this.columnNameMap = columnNameMap; } @@ -60,10 +55,6 @@ public String getExpression() { return expression; } - public String getScriptExpression() { - return scriptExpression; - } - public List getColumnNames() { return columnNames; } @@ -76,19 +67,13 @@ public String getColumnNameMapAsString() { return TransformException.prettyPrintColumnNameMap(getColumnNameMap()); } - public static Optional of( - String filterExpression, List udfDescriptors) { + public static Optional of(String filterExpression) { if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) { return Optional.empty(); } List columnNames = TransformParser.parseFilterColumnNameList(filterExpression); Map columnNameMap = TransformParser.generateColumnNameMap(columnNames); - String scriptExpression = - TransformParser.translateFilterExpressionToJaninoExpression( - filterExpression, udfDescriptors, columnNameMap); - return Optional.of( - new TransformFilter( - filterExpression, scriptExpression, columnNames, columnNameMap)); + return Optional.of(new TransformFilter(filterExpression, columnNames, columnNameMap)); } public boolean isValid() { @@ -101,9 +86,6 @@ public String toString() { + "expression='" + expression + '\'' - + ", scriptExpression='" - + scriptExpression - + '\'' + ", columnNames=" + columnNames + ", columnNameMap=" diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 299266d1beb..77dc5cfe6ef 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; +import org.apache.flink.cdc.runtime.parser.TransformParser; import org.codehaus.janino.ExpressionEvaluator; @@ -69,7 +70,13 @@ protected TransformFilterProcessor( this.transformExpressionKey = null; this.expressionEvaluator = null; } else { - this.transformExpressionKey = generateTransformExpressionKey(); + this.transformExpressionKey = + generateTransformExpressionKey( + tableInfo.getPreTransformedSchema().getColumns(), + udfDescriptors, + supportedMetadataColumns + .values() + .toArray(new SupportedMetadataColumn[0])); this.expressionEvaluator = TransformExpressionCompiler.compileExpression( transformExpressionKey, udfDescriptors); @@ -117,8 +124,12 @@ public boolean test(Object[] preRow, Object[] postRow, TransformContext context) + "\tCompiled expression: %s\n" + "\tColumn name map: {%s}", tableInfo.getName(), - transformFilter.getExpression(), - transformFilter.getScriptExpression(), + transformExpressionKey != null + ? transformExpressionKey.getOriginalExpression() + : "", + transformExpressionKey != null + ? transformExpressionKey.getCompiledExpression() + : "", transformFilter.getColumnNameMapAsString()), e); } @@ -200,7 +211,10 @@ private Object[] generateParams(Object[] preRow, Object[] postRow, TransformCont return params.toArray(); } - private TransformExpressionKey generateTransformExpressionKey() { + private TransformExpressionKey generateTransformExpressionKey( + List columns, + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { Tuple2, List>> args = generateArguments(true); args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE); @@ -208,9 +222,17 @@ private TransformExpressionKey generateTransformExpressionKey() { args.f0.add(JaninoCompiler.DEFAULT_EPOCH_TIME); args.f1.add(Long.class); + String scriptExpression = + TransformParser.translateFilterExpressionToJaninoExpression( + transformFilter.getExpression(), + columns, + udfDescriptors, + supportedMetadataColumns, + transformFilter.getColumnNameMap()); + return TransformExpressionKey.of( transformFilter.getExpression(), - JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()), + scriptExpression, args.f0, args.f1, Boolean.class, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index b7fcfa0ea6b..4345331ac7e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -21,6 +21,10 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.converter.JavaClassConverter; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; @@ -78,7 +82,8 @@ public class JaninoCompiler { "TIMESTAMPADD", "TIMESTAMPDIFF", "TIMESTAMP_DIFF", - "DATE_FORMAT"); + "DATE_FORMAT", + "DATE_ADD"); public static final String DEFAULT_EPOCH_TIME = "__epoch_time__"; public static final String DEFAULT_TIME_ZONE = "__time_zone__"; @@ -121,54 +126,47 @@ public static ExpressionEvaluator compileExpression( } } - public static String translateSqlNodeToJaninoExpression( - SqlNode transform, - List udfDescriptors, - Map columnNameMap) { - Java.Rvalue rvalue = - translateSqlNodeToJaninoRvalue(transform, udfDescriptors, columnNameMap); + public static String translateSqlNodeToJaninoExpression(Context context, SqlNode transform) { + Java.Rvalue rvalue = translateSqlNodeToJaninoRvalue(context, transform); if (rvalue != null) { return rvalue.toString(); } return ""; } - public static Java.Rvalue translateSqlNodeToJaninoRvalue( - SqlNode transform, - List udfDescriptors, - Map columnNameMap) { + public static Java.Rvalue translateSqlNodeToJaninoRvalue(Context context, SqlNode transform) { if (transform instanceof SqlIdentifier) { - return translateSqlIdentifier((SqlIdentifier) transform, columnNameMap); + return translateSqlIdentifier(context, (SqlIdentifier) transform); } else if (transform instanceof SqlBasicCall) { - return translateSqlBasicCall((SqlBasicCall) transform, udfDescriptors, columnNameMap); + return translateSqlBasicCall(context, (SqlBasicCall) transform); } else if (transform instanceof SqlCase) { - return translateSqlCase((SqlCase) transform, udfDescriptors, columnNameMap); + return translateSqlCase(context, (SqlCase) transform); } else if (transform instanceof SqlLiteral) { - return translateSqlSqlLiteral((SqlLiteral) transform); + return translateSqlSqlLiteral(context, (SqlLiteral) transform); } return null; } private static Java.Rvalue translateSqlIdentifier( - SqlIdentifier sqlIdentifier, Map columnNameMap) { + Context context, SqlIdentifier sqlIdentifier) { String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1); if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) { - return generateTimezoneFreeTemporalFunctionOperation(columnName); + return generateTimezoneFreeTemporalFunctionOperation(context, columnName); } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) { - return generateTimezoneRequiredTemporalFunctionOperation(columnName); + return generateTimezoneRequiredTemporalFunctionOperation(context, columnName); } else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase())) { - return generateTimezoneFreeTemporalConversionFunctionOperation(columnName); + return generateTimezoneFreeTemporalConversionFunctionOperation(context, columnName); } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains( columnName.toUpperCase())) { - return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName); + return generateTimezoneRequiredTemporalConversionFunctionOperation(context, columnName); } else { return new Java.AmbiguousName( Location.NOWHERE, - new String[] {columnNameMap.getOrDefault(columnName, columnName)}); + new String[] {context.columnNameMap.getOrDefault(columnName, columnName)}); } } - private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) { + private static Java.Rvalue translateSqlSqlLiteral(Context context, SqlLiteral sqlLiteral) { if (sqlLiteral.getValue() == null) { return new Java.NullLiteral(Location.NOWHERE); } @@ -190,14 +188,11 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) { return new Java.AmbiguousName(Location.NOWHERE, new String[] {value.toString()}); } - private static Java.Rvalue translateSqlBasicCall( - SqlBasicCall sqlBasicCall, - List udfDescriptors, - Map columnNameMap) { + private static Java.Rvalue translateSqlBasicCall(Context context, SqlBasicCall sqlBasicCall) { List operandList = sqlBasicCall.getOperandList(); List atoms = new ArrayList<>(); for (SqlNode sqlNode : operandList) { - translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors, columnNameMap); + translateSqlNodeToAtoms(context, sqlNode, atoms); } if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains( sqlBasicCall.getOperator().getName().toUpperCase())) { @@ -210,27 +205,22 @@ private static Java.Rvalue translateSqlBasicCall( sqlBasicCall.getOperator().getName().toUpperCase())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); } - return sqlBasicCallToJaninoRvalue( - sqlBasicCall, atoms.toArray(new Java.Rvalue[0]), udfDescriptors); + return sqlBasicCallToJaninoRvalue(context, sqlBasicCall, atoms.toArray(new Java.Rvalue[0])); } - private static Java.Rvalue translateSqlCase( - SqlCase sqlCase, - List udfDescriptors, - Map columnNameMap) { + private static Java.Rvalue translateSqlCase(Context context, SqlCase sqlCase) { SqlNodeList whenOperands = sqlCase.getWhenOperands(); SqlNodeList thenOperands = sqlCase.getThenOperands(); SqlNode elseOperand = sqlCase.getElseOperand(); List whenAtoms = new ArrayList<>(); for (SqlNode sqlNode : whenOperands) { - translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors, columnNameMap); + translateSqlNodeToAtoms(context, sqlNode, whenAtoms); } List thenAtoms = new ArrayList<>(); for (SqlNode sqlNode : thenOperands) { - translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors, columnNameMap); + translateSqlNodeToAtoms(context, sqlNode, thenAtoms); } - Java.Rvalue elseAtoms = - translateSqlNodeToJaninoRvalue(elseOperand, udfDescriptors, columnNameMap); + Java.Rvalue elseAtoms = translateSqlNodeToJaninoRvalue(context, elseOperand); Java.Rvalue sqlCaseRvalueTemp = elseAtoms; for (int i = whenAtoms.size() - 1; i >= 0; i--) { sqlCaseRvalueTemp = @@ -244,50 +234,46 @@ private static Java.Rvalue translateSqlCase( } private static void translateSqlNodeToAtoms( - SqlNode sqlNode, - List atoms, - List udfDescriptors, - Map columnNameMap) { + Context context, SqlNode sqlNode, List atoms) { if (sqlNode instanceof SqlIdentifier) { - atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode, columnNameMap)); + atoms.add(translateSqlIdentifier(context, (SqlIdentifier) sqlNode)); } else if (sqlNode instanceof SqlLiteral) { - atoms.add(translateSqlSqlLiteral((SqlLiteral) sqlNode)); + atoms.add(translateSqlSqlLiteral(context, (SqlLiteral) sqlNode)); } else if (sqlNode instanceof SqlBasicCall) { - atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode, udfDescriptors, columnNameMap)); + atoms.add(translateSqlBasicCall(context, (SqlBasicCall) sqlNode)); } else if (sqlNode instanceof SqlNodeList) { for (SqlNode node : (SqlNodeList) sqlNode) { - translateSqlNodeToAtoms(node, atoms, udfDescriptors, columnNameMap); + translateSqlNodeToAtoms(context, node, atoms); } } else if (sqlNode instanceof SqlCase) { - atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors, columnNameMap)); + atoms.add(translateSqlCase(context, (SqlCase) sqlNode)); } } private static Java.Rvalue sqlBasicCallToJaninoRvalue( - SqlBasicCall sqlBasicCall, - Java.Rvalue[] atoms, - List udfDescriptors) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { switch (sqlBasicCall.getKind()) { case AND: - return generateBinaryOperation(sqlBasicCall, atoms, "&&"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "&&"); case OR: - return generateBinaryOperation(sqlBasicCall, atoms, "||"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "||"); case NOT: - return generateUnaryOperation("!", atoms[0]); + return generateUnaryOperation(context, "!", atoms[0]); case EQUALS: - return generateEqualsOperation(sqlBasicCall, atoms); + return generateEqualsOperation(context, sqlBasicCall, atoms); case NOT_EQUALS: - return generateUnaryOperation("!", generateEqualsOperation(sqlBasicCall, atoms)); + return generateUnaryOperation( + context, "!", generateEqualsOperation(context, sqlBasicCall, atoms)); case IS_NULL: - return generateUnaryOperation("null == ", atoms[0]); + return generateUnaryOperation(context, "null == ", atoms[0]); case IS_NOT_NULL: - return generateUnaryOperation("null != ", atoms[0]); + return generateUnaryOperation(context, "null != ", atoms[0]); case IS_FALSE: case IS_NOT_TRUE: - return generateUnaryOperation("false == ", atoms[0]); + return generateUnaryOperation(context, "false == ", atoms[0]); case IS_TRUE: case IS_NOT_FALSE: - return generateUnaryOperation("true == ", atoms[0]); + return generateUnaryOperation(context, "true == ", atoms[0]); case BETWEEN: case IN: case NOT_IN: @@ -296,49 +282,69 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue( case FLOOR: case TRIM: case OTHER_FUNCTION: - return generateOtherFunctionOperation(sqlBasicCall, atoms, udfDescriptors); + return generateOtherFunctionOperation(context, sqlBasicCall, atoms); case PLUS: - return generateBinaryOperation(sqlBasicCall, atoms, "+"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "+"); case MINUS: - return generateBinaryOperation(sqlBasicCall, atoms, "-"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "-"); case TIMES: - return generateBinaryOperation(sqlBasicCall, atoms, "*"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "*"); case DIVIDE: - return generateBinaryOperation(sqlBasicCall, atoms, "/"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "/"); case MOD: - return generateBinaryOperation(sqlBasicCall, atoms, "%"); + return generateBinaryOperation(context, sqlBasicCall, atoms, "%"); case LESS_THAN: case GREATER_THAN: case LESS_THAN_OR_EQUAL: case GREATER_THAN_OR_EQUAL: - return generateCompareOperation(sqlBasicCall, atoms); + return generateCompareOperation(context, sqlBasicCall, atoms); case CAST: - return generateCastOperation(sqlBasicCall, atoms); + return generateCastOperation(context, sqlBasicCall, atoms); case TIMESTAMP_DIFF: - return generateTimestampDiffOperation(sqlBasicCall, atoms); + return generateTimestampDiffOperation(context, sqlBasicCall, atoms); case TIMESTAMP_ADD: - return generateTimestampAddOperation(sqlBasicCall, atoms); + return generateTimestampAddOperation(context, sqlBasicCall, atoms); case OTHER: - return generateOtherOperation(sqlBasicCall, atoms); + return generateOtherOperation(context, sqlBasicCall, atoms); default: - throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); + throw new ParseException("Unrecognized expression: " + sqlBasicCall); } } - private static Java.Rvalue generateUnaryOperation(String operator, Java.Rvalue atom) { + private static Java.Rvalue generateUnaryOperation( + Context context, String operator, Java.Rvalue atom) { return new Java.UnaryOperation(Location.NOWHERE, operator, atom); } + private static final Map decimalArithmeticHandlers = + Map.of( + "+", "plus", + "-", "minus", + "*", "times", + "/", "divides"); + private static Java.Rvalue generateBinaryOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms, String operator) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms, String operator) { if (atoms.length != 2) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } + if (decimalArithmeticHandlers.containsKey(operator)) { + String handler = decimalArithmeticHandlers.get(operator); + DataType resultType = + TransformParser.deduceSubExpressionType( + context.columns, + sqlBasicCall, + context.udfDescriptors, + context.supportedMetadataColumns); + if (resultType.is(DataTypeRoot.DECIMAL)) { + return new Java.MethodInvocation(Location.NOWHERE, null, handler, atoms); + } + } return new Java.BinaryOperation(Location.NOWHERE, atoms[0], operator, atoms[1]); } private static Java.Rvalue generateEqualsOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (atoms.length != 2) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } @@ -347,17 +353,17 @@ private static Java.Rvalue generateEqualsOperation( } private static Java.Rvalue generateCastOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (atoms.length != 1) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } List operandList = sqlBasicCall.getOperandList(); SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1); - return generateTypeConvertMethod(sqlDataTypeSpec, atoms); + return generateTypeConvertMethod(context, sqlDataTypeSpec, atoms); } private static Java.Rvalue generateCompareOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (atoms.length != 2) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } @@ -385,7 +391,7 @@ private static Java.Rvalue generateCompareOperation( } private static Java.Rvalue generateTimestampDiffOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (atoms.length != 4) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } @@ -417,7 +423,7 @@ private static Java.Rvalue generateTimestampDiffOperation( } private static Java.Rvalue generateTimestampAddOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (atoms.length != 4) { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } @@ -448,13 +454,13 @@ private static Java.Rvalue generateTimestampAddOperation( timestampDiffFunctionParam.toArray(new Java.Rvalue[0])); } - private static Java.Rvalue generateCharLengthOperation(Java.Rvalue[] atoms) { + private static Java.Rvalue generateCharLengthOperation(Context context, Java.Rvalue[] atoms) { return new Java.MethodInvocation( Location.NOWHERE, null, StringUtils.convertToCamelCase("CHAR_LENGTH"), atoms); } private static Java.Rvalue generateOtherOperation( - SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (sqlBasicCall.getOperator().getName().equals("||")) { return new Java.MethodInvocation( Location.NOWHERE, null, StringUtils.convertToCamelCase("CONCAT"), atoms); @@ -463,9 +469,7 @@ private static Java.Rvalue generateOtherOperation( } private static Java.Rvalue generateOtherFunctionOperation( - SqlBasicCall sqlBasicCall, - Java.Rvalue[] atoms, - List udfDescriptors) { + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { String operationName = sqlBasicCall.getOperator().getName().toUpperCase(); if (operationName.equals("IF")) { if (atoms.length == 3) { @@ -476,7 +480,7 @@ private static Java.Rvalue generateOtherFunctionOperation( } } else { Optional udfFunctionOptional = - udfDescriptors.stream() + context.udfDescriptors.stream() .filter(e -> e.getName().equalsIgnoreCase(operationName)) .findFirst(); return udfFunctionOptional @@ -498,7 +502,8 @@ private static Java.Rvalue generateOtherFunctionOperation( } } - private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(String operationName) { + private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation( + Context context, String operationName) { return new Java.MethodInvocation( Location.NOWHERE, null, @@ -509,7 +514,7 @@ private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(String } private static Java.Rvalue generateTimezoneRequiredTemporalFunctionOperation( - String operationName) { + Context context, String operationName) { List timestampFunctionParam = new ArrayList<>(); timestampFunctionParam.add( new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); @@ -523,7 +528,7 @@ private static Java.Rvalue generateTimezoneRequiredTemporalFunctionOperation( } private static Java.Rvalue generateTimezoneFreeTemporalConversionFunctionOperation( - String operationName) { + Context context, String operationName) { return new Java.MethodInvocation( Location.NOWHERE, null, @@ -532,7 +537,7 @@ private static Java.Rvalue generateTimezoneFreeTemporalConversionFunctionOperati } private static Java.Rvalue generateTimezoneRequiredTemporalConversionFunctionOperation( - String operationName) { + Context context, String operationName) { return new Java.MethodInvocation( Location.NOWHERE, null, @@ -543,7 +548,7 @@ private static Java.Rvalue generateTimezoneRequiredTemporalConversionFunctionOpe } private static Java.Rvalue generateTypeConvertMethod( - SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) { + Context context, SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) { switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) { case "BOOLEAN": return new Java.MethodInvocation(Location.NOWHERE, null, "castToBoolean", atoms); @@ -614,4 +619,39 @@ private static String generateInvokeExpression(UserDefinedFunctionDescriptor udf return String.format("__instanceOf%s.eval", udfFunction.getClassName()); } } + + /** Contextual information for {@link JaninoCompiler}. */ + public static class Context { + + // Upstream physical columns + public final List columns; + + // Mangled column name map to $1, $2... + public final Map columnNameMap; + + // User defined function signatures + public final List udfDescriptors; + + // Readable metadata columns + public final SupportedMetadataColumn[] supportedMetadataColumns; + + private Context( + List columns, + Map columnNameMap, + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { + this.columns = columns; + this.columnNameMap = columnNameMap; + this.udfDescriptors = udfDescriptors; + this.supportedMetadataColumns = supportedMetadataColumns; + } + + public static Context of( + List columns, + Map columnNameMap, + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { + return new Context(columns, columnNameMap, udfDescriptors, supportedMetadataColumns); + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 3369c482d17..5ee7153013c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -348,7 +348,12 @@ public static List generateProjectionColumns( relDataType), exprNode.toString(), JaninoCompiler.translateSqlNodeToJaninoExpression( - exprNode, udfDescriptors, columnNameMap), + JaninoCompiler.Context.of( + columns, + columnNameMap, + udfDescriptors, + supportedMetadataColumns), + exprNode), originalColumnNames, columnNameMap); } @@ -423,7 +428,9 @@ public static ProjectionColumn resolveProjectionColumnFromIdentifier( public static String translateFilterExpressionToJaninoExpression( String filterExpression, + List columns, List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns, Map columnNameMap) { if (isNullOrWhitespaceOnly(filterExpression)) { return ""; @@ -434,7 +441,9 @@ public static String translateFilterExpressionToJaninoExpression( } SqlNode where = sqlSelect.getWhere(); return JaninoCompiler.translateSqlNodeToJaninoExpression( - where, udfDescriptors, columnNameMap); + JaninoCompiler.Context.of( + columns, columnNameMap, udfDescriptors, supportedMetadataColumns), + where); } public static List parseComputedColumnNames( @@ -597,4 +606,37 @@ public static Map generateColumnNameMap(List originalCol } return columnNameMap; } + + public static DataType deduceSubExpressionType( + List columns, + SqlNode subExpression, + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { + SqlSelect sqlSelect = + new SqlSelect( + SqlParserPos.QUOTED_ZERO, + SqlNodeList.EMPTY, + SqlNodeList.of(subExpression), + new SqlIdentifier(DEFAULT_TABLE, SqlParserPos.QUOTED_ZERO), + null, + null, + null, + null, + null, + null, + null, + null); + RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns); + RelDataType[] relDataTypes = + relNode.getRowType().getFieldList().stream() + .map(RelDataTypeField::getType) + .toArray(RelDataType[]::new); + Preconditions.checkArgument( + relDataTypes.length == 1, + "RelDataType %s should be unary from SqlNode %s", + relDataTypes, + sqlSelect); + RelDataType expressionType = relDataTypes[0]; + return CalciteDataTypeConverter.convertCalciteRelDataTypeToDataType(expressionType); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 106b5052455..468566b8200 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -241,6 +241,8 @@ public SqlSyntax getSyntax() { InferTypes.RETURN_TYPE, OperandTypes.or( OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.DATE, SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.TIME, SqlTypeFamily.STRING), OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), SqlFunctionCategory.TIMEDATE); public static final SqlFunction TIMESTAMP_DIFF = @@ -298,6 +300,50 @@ public SqlSyntax getSyntax() { OperandTypes.family(SqlTypeFamily.CHARACTER), OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), SqlFunctionCategory.TIMEDATE); + public static final SqlFunction TO_TIMESTAMP_LTZ = + new SqlFunction( + "TO_TIMESTAMP_LTZ", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3), + SqlTypeTransforms.FORCE_NULLABLE), + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.CHARACTER), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER), + OperandTypes.family( + SqlTypeFamily.CHARACTER, + SqlTypeFamily.CHARACTER, + SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); + + public static final SqlFunction DATE_FORMAT_TZ = + new SqlFunction( + "DATE_FORMAT_TZ", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER), + OperandTypes.family( + SqlTypeFamily.TIMESTAMP, + SqlTypeFamily.CHARACTER, + SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); + + public static final SqlFunction DATE_ADD = + new SqlFunction( + "DATE_ADD", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.DATE, SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER)), + SqlFunctionCategory.TIMEDATE); // --------------------- // Conditional Functions diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 94acbffaf4e..3f1d026862b 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -398,6 +398,8 @@ public void testTranslateFilterToJaninoExpressionError() { TransformParser.translateFilterExpressionToJaninoExpression( "TIMESTAMPDIFF(SECONDS, dt1, dt2)", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap()); }) .isExactlyInstanceOf(ParseException.class) @@ -407,6 +409,8 @@ public void testTranslateFilterToJaninoExpressionError() { TransformParser.translateFilterExpressionToJaninoExpression( "TIMESTAMPDIFF(QUARTER, dt1, dt2)", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap()); }) .isExactlyInstanceOf(ParseException.class) @@ -417,6 +421,8 @@ public void testTranslateFilterToJaninoExpressionError() { TransformParser.translateFilterExpressionToJaninoExpression( "TIMESTAMPADD(SECONDS, dt1, dt2)", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap()); }) .isExactlyInstanceOf(ParseException.class) @@ -426,6 +432,8 @@ public void testTranslateFilterToJaninoExpressionError() { TransformParser.translateFilterExpressionToJaninoExpression( "TIMESTAMPADD(QUARTER, dt1, dt2)", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap()); }) .isExactlyInstanceOf(ParseException.class) @@ -622,6 +630,12 @@ void testTranslateUdfFilterToJaninoExpression() { @Test public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() { + List columns = + List.of( + Column.physicalColumn("a", DataTypes.INT()), + Column.physicalColumn("b", DataTypes.INT()), + Column.physicalColumn("a-b", DataTypes.INT())); + Map columnNameMap = new HashMap<>(); columnNameMap.put("a", "$0"); columnNameMap.put("b", "$1"); @@ -630,42 +644,52 @@ public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() { testFilterExpressionWithUdf( "format(upper(a))", "__instanceOfFormatFunctionClass.eval(upper($0))", + columns, columnNameMap); testFilterExpressionWithUdf( "format(lower(b))", "__instanceOfFormatFunctionClass.eval(lower($1))", + columns, columnNameMap); testFilterExpressionWithUdf( "format(concat(a,b))", "__instanceOfFormatFunctionClass.eval(concat($0, $1))", + columns, columnNameMap); testFilterExpressionWithUdf( "format(SUBSTR(`a-b`,1))", "__instanceOfFormatFunctionClass.eval(substr($2, 1))", + columns, columnNameMap); testFilterExpressionWithUdf( "typeof(`a-b` like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(like($2, \"^[a-zA-Z]\"))", + columns, columnNameMap); testFilterExpressionWithUdf( "typeof(`a-b` not like '^[a-zA-Z]')", "__instanceOfTypeOfFunctionClass.eval(notLike($2, \"^[a-zA-Z]\"))", + columns, columnNameMap); testFilterExpressionWithUdf( "typeof(a-b-`a-b`)", "__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)", + columns, columnNameMap); testFilterExpressionWithUdf( "typeof(a-b-2)", "__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)", + columns, columnNameMap); testFilterExpressionWithUdf( "addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND format('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")", + columns, columnNameMap); testFilterExpressionWithUdf( "ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''", "greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)), 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")", + columns, columnNameMap); } @@ -687,6 +711,8 @@ void testLargeNumericalLiterals() { TransformParser.translateFilterExpressionToJaninoExpression( "id > 9223372036854775808", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap())) .isExactlyInstanceOf(CalciteContextException.class) .hasMessageContaining("Numeric literal '9223372036854775808' out of range"); @@ -696,6 +722,8 @@ void testLargeNumericalLiterals() { TransformParser.translateFilterExpressionToJaninoExpression( "id < -9223372036854775809", Collections.emptyList(), + Collections.emptyList(), + new SupportedMetadataColumn[0], Collections.emptyMap())) .isExactlyInstanceOf(CalciteContextException.class) .hasMessageContaining("Numeric literal '-9223372036854775809' out of range"); @@ -776,22 +804,34 @@ void testParsingExpressionWithUnicodeLiterals() { } } + private static final List DUMMY_COLUMNS = + List.of(Column.physicalColumn("id", DataTypes.INT())); + private void testFilterExpression(String expression, String expressionExpect) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression( - expression, Collections.emptyList(), Collections.emptyMap()); + expression, + DUMMY_COLUMNS, + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap()); Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); } private void testFilterExpressionWithUdf(String expression, String expressionExpect) { - testFilterExpressionWithUdf(expression, expressionExpect, Collections.emptyMap()); + testFilterExpressionWithUdf( + expression, expressionExpect, DUMMY_COLUMNS, Collections.emptyMap()); } private void testFilterExpressionWithUdf( - String expression, String expressionExpect, Map columnNameMap) { + String expression, + String expressionExpect, + List columns, + Map columnNameMap) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression( expression, + columns, Arrays.asList( new UserDefinedFunctionDescriptor( "format", @@ -802,6 +842,7 @@ private void testFilterExpressionWithUdf( new UserDefinedFunctionDescriptor( "typeof", "org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")), + new SupportedMetadataColumn[0], columnNameMap); Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); }