diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index a10392775f0..c81b1ba463b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -2737,12 +2737,12 @@ void testTransformErrorMessage() { .cause() .isExactlyInstanceOf(FlinkRuntimeException.class) .hasMessage( - "Failed to compile expression TransformExpressionKey{expression='" + "Failed to compile expression TransformExpressionKey{originalExpression='id1 > 0', expression='" + JaninoCompiler.LOAD_MODULES_EXPRESSION + "greaterThan($0, 0)', argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}") .cause() .hasMessageContaining( - "Expression: " + "Compiled expression: " + JaninoCompiler.LOAD_MODULES_EXPRESSION + "greaterThan($0, 0)") .hasMessageContaining("Column name map: {$0 -> id1}") @@ -2787,7 +2787,7 @@ void testTransformErrorMessage() { .cause() .isExactlyInstanceOf(RuntimeException.class) .hasMessageContaining( - "Failed to evaluate projection expression `castToInteger($0) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`") + "Failed to evaluate projection expression `CAST(`TB`.`name` AS INTEGER) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`") .hasMessageContaining("Column name map: {$0 -> name}"); // Unsupported operations in filter rule @@ -2808,7 +2808,10 @@ void testTransformErrorMessage() { .cause() .isExactlyInstanceOf(RuntimeException.class) .hasMessageContaining( - "Failed to evaluate filtering expression `greaterThan($0 + 1, 0)` for table `default_namespace.default_schema.mytable1`") + "Failed to evaluate filtering expression for table `default_namespace.default_schema.mytable1`.\n" + + "\tOriginal expression: name + 1 > 0\n" + + "\tCompiled expression: greaterThan($0 + 1, 0)\n" + + "\tColumn name map: {$0 -> name}") .hasMessageContaining("Column name map: {$0 -> name}") .rootCause() .isExactlyInstanceOf(RuntimeException.class) diff --git a/flink-cdc-composer/src/test/resources/specs/casting.yaml b/flink-cdc-composer/src/test/resources/specs/casting.yaml index 84a68e788c5..f5ce0e802a5 100644 --- a/flink-cdc-composer/src/test/resources/specs/casting.yaml +++ b/flink-cdc-composer/src/test/resources/specs/casting.yaml @@ -268,4 +268,4 @@ id_ CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11 primary-key: id_ - expect-error: Failed to evaluate projection expression `castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table `foo.bar.baz`. + expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`. diff --git a/flink-cdc-composer/src/test/resources/specs/logical.yaml b/flink-cdc-composer/src/test/resources/specs/logical.yaml index cc825f7a69c..4f09aa82301 100644 --- a/flink-cdc-composer/src/test/resources/specs/logical.yaml +++ b/flink-cdc-composer/src/test/resources/specs/logical.yaml @@ -45,4 +45,4 @@ bool_ IS FALSE AS comp_8 bool_ IS NOT FALSE AS comp_9 primary-key: id_ - expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.' + expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.' diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java index 5c3b17a4396..fdbaf267851 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java @@ -87,6 +87,10 @@ public DataType getDataType() { return column.getType(); } + public String getExpression() { + return expression; + } + public String getScriptExpression() { return scriptExpression; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index cf62cc85584..f2cfc425621 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -89,10 +89,12 @@ public Object evaluate(Object[] rowData, TransformContext context) { throw new RuntimeException( String.format( "Failed to evaluate projection expression `%s` for column `%s` in table `%s`.\n" + + "\tCompiled expression: %s\n" + "\tColumn name map: {%s}", - projectionColumn.getScriptExpression(), + projectionColumn.getExpression(), projectionColumn.getColumnName(), tableInfo.getName(), + projectionColumn.getScriptExpression(), projectionColumn.getColumnNameMapAsString()), e); } @@ -168,6 +170,7 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(Long.class); return TransformExpressionKey.of( + projectionColumn.getExpression(), JaninoCompiler.loadSystemFunction(scriptExpression), argumentNames, paramTypes, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java index 4c6cb71e1f6..bc496faf92a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java @@ -77,7 +77,11 @@ public static ExpressionEvaluator compileExpression( } catch (CompileException e) { throw new InvalidProgramException( String.format( - "Expression cannot be compiled. This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map: {%s}", + "Expression cannot be compiled. This is a bug. Please file an issue.\n" + + "\tOriginal expression: %s\n" + + "\tCompiled expression: %s\n" + + "\tColumn name map: {%s}", + key.getOriginalExpression(), key.getExpression(), TransformException.prettyPrintColumnNameMap( key.getColumnNameMap())), diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java index 6ece097fc9c..75cb13b665c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.operators.transform; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Collections; import java.util.List; @@ -29,7 +31,8 @@ *

A transform expression key contains: * *