Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2737,12 +2737,12 @@ void testTransformErrorMessage() {
.cause()
.isExactlyInstanceOf(FlinkRuntimeException.class)
.hasMessage(
"Failed to compile expression TransformExpressionKey{expression='"
"Failed to compile expression TransformExpressionKey{originalExpression='id1 > 0', expression='"
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
+ "greaterThan($0, 0)', argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}")
.cause()
.hasMessageContaining(
"Expression: "
"Compiled expression: "
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
+ "greaterThan($0, 0)")
.hasMessageContaining("Column name map: {$0 -> id1}")
Expand Down Expand Up @@ -2787,7 +2787,7 @@ void testTransformErrorMessage() {
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining(
"Failed to evaluate projection expression `castToInteger($0) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`")
"Failed to evaluate projection expression `CAST(`TB`.`name` AS INTEGER) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`")
.hasMessageContaining("Column name map: {$0 -> name}");

// Unsupported operations in filter rule
Expand All @@ -2808,7 +2808,10 @@ void testTransformErrorMessage() {
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining(
"Failed to evaluate filtering expression `greaterThan($0 + 1, 0)` for table `default_namespace.default_schema.mytable1`")
"Failed to evaluate filtering expression for table `default_namespace.default_schema.mytable1`.\n"
+ "\tOriginal expression: name + 1 > 0\n"
+ "\tCompiled expression: greaterThan($0 + 1, 0)\n"
+ "\tColumn name map: {$0 -> name}")
.hasMessageContaining("Column name map: {$0 -> name}")
Comment thread
yuxiqian marked this conversation as resolved.
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
Expand Down
2 changes: 1 addition & 1 deletion flink-cdc-composer/src/test/resources/specs/casting.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,4 @@
id_
CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11
primary-key: id_
expect-error: Failed to evaluate projection expression `castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table `foo.bar.baz`.
expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`.
2 changes: 1 addition & 1 deletion flink-cdc-composer/src/test/resources/specs/logical.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@
bool_ IS FALSE AS comp_8
bool_ IS NOT FALSE AS comp_9
primary-key: id_
expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.'
expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.'
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public DataType getDataType() {
return column.getType();
}

public String getExpression() {
return expression;
}

public String getScriptExpression() {
return scriptExpression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ public Object evaluate(Object[] rowData, TransformContext context) {
throw new RuntimeException(
String.format(
"Failed to evaluate projection expression `%s` for column `%s` in table `%s`.\n"
+ "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
projectionColumn.getScriptExpression(),
projectionColumn.getExpression(),
projectionColumn.getColumnName(),
tableInfo.getName(),
projectionColumn.getScriptExpression(),
projectionColumn.getColumnNameMapAsString()),
e);
}
Expand Down Expand Up @@ -168,6 +170,7 @@ private TransformExpressionKey generateTransformExpressionKey() {
paramTypes.add(Long.class);

return TransformExpressionKey.of(
projectionColumn.getExpression(),
JaninoCompiler.loadSystemFunction(scriptExpression),
argumentNames,
paramTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public static ExpressionEvaluator compileExpression(
} catch (CompileException e) {
throw new InvalidProgramException(
String.format(
"Expression cannot be compiled. This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map: {%s}",
"Expression cannot be compiled. This is a bug. Please file an issue.\n"
+ "\tOriginal expression: %s\n"
+ "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
key.getOriginalExpression(),
key.getExpression(),
TransformException.prettyPrintColumnNameMap(
key.getColumnNameMap())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.runtime.operators.transform;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,7 +31,8 @@
* <p>A transform expression key contains:
*
* <ul>
* <li>expression: a string for the transformation expression.
* <li>originalExpression: a string for the original transformation expression input by users.
* <li>expression: a string for the compiled transformation expression.
* <li>argumentNames: a list for the argument names in expression.
* <li>argumentClasses: a list for the argument classes in expression.
* <li>returnClass: a class for the return class in expression
Expand All @@ -39,25 +42,33 @@
*/
public class TransformExpressionKey implements Serializable {
private static final long serialVersionUID = 1L;
@Nullable private final String originalExpression;
private final String expression;
private final List<String> argumentNames;
private final List<Class<?>> argumentClasses;
private final Class<?> returnClass;
private final Map<String, String> columnNameMap;

private TransformExpressionKey(
@Nullable String originalExpression,
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
this.originalExpression = originalExpression;
this.expression = expression;
this.argumentNames = argumentNames;
this.argumentClasses = argumentClasses;
this.returnClass = returnClass;
this.columnNameMap = columnNameMap;
}

@Nullable
public String getOriginalExpression() {
return originalExpression;
}

public String getExpression() {
return expression;
}
Expand All @@ -79,13 +90,19 @@ public Map<String, String> getColumnNameMap() {
}

public static TransformExpressionKey of(
@Nullable String originalExpression,
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
return new TransformExpressionKey(
expression, argumentNames, argumentClasses, returnClass, columnNameMap);
originalExpression,
expression,
argumentNames,
argumentClasses,
returnClass,
columnNameMap);
}

@Override
Expand All @@ -97,7 +114,8 @@ public boolean equals(Object o) {
return false;
}
TransformExpressionKey that = (TransformExpressionKey) o;
return expression.equals(that.expression)
return Objects.equals(originalExpression, that.originalExpression)
&& expression.equals(that.expression)
&& argumentNames.equals(that.argumentNames)
&& argumentClasses.equals(that.argumentClasses)
&& returnClass.equals(that.returnClass)
Expand All @@ -106,13 +124,22 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(expression, argumentNames, argumentClasses, returnClass, columnNameMap);
return Objects.hash(
originalExpression,
expression,
argumentNames,
argumentClasses,
returnClass,
columnNameMap);
}

@Override
public String toString() {
return "TransformExpressionKey{"
+ "expression='"
+ "originalExpression='"
+ originalExpression
+ '\''
+ ", expression='"
+ expression
+ '\''
+ ", argumentNames="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ public boolean test(Object[] preRow, Object[] postRow, TransformContext context)
} catch (InvocationTargetException e) {
throw new RuntimeException(
String.format(
"Failed to evaluate filtering expression `%s` for table `%s`.\n"
"Failed to evaluate filtering expression for table `%s`.\n"
+ "\tOriginal expression: %s\n"
+ "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
transformFilter.getScriptExpression(),
tableInfo.getName(),
transformFilter.getExpression(),
transformFilter.getScriptExpression(),
transformFilter.getColumnNameMapAsString()),
e);
}
Expand Down Expand Up @@ -206,6 +209,7 @@ private TransformExpressionKey generateTransformExpressionKey() {
args.f1.add(Long.class);

return TransformExpressionKey.of(
transformFilter.getExpression(),
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
args.f0,
args.f1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2767,7 +2767,12 @@ void testCastErrorTransform() throws Exception {
+ "\tcolumns={`col1` STRING NOT NULL,`castInt` INT,`castBoolean` BOOLEAN,`castTinyint` TINYINT,`castSmallint` SMALLINT,`castBigint` BIGINT,`castFloat` FLOAT,`castDouble` DOUBLE,`castChar` STRING,`castVarchar` STRING,`castDecimal` DECIMAL(4, 2),`castTimestamp` TIMESTAMP(3)}, primaryKeys=col1, options=().")
.cause()
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Failed to evaluate projection expression `CAST(`TB`.`castFloat` AS TIMESTAMP(3))` for column `castTimestamp` in table `my_company.my_branch.data_cast`.\n"
+ "\tCompiled expression: castToTimestamp($0, __time_zone__)\n"
+ "\tColumn name map: {$0 -> castFloat}")
.hasRootCauseMessage("Unable to parse given string as timestamp: 1.0");

transformFunctionEventEventOperatorTestHarness.close();
}

Expand Down
Loading