Skip to content

Commit 5776a3d

Browse files
Hisoka-XThorneANN
authored andcommitted
[FLINK-38996][transform] Enhance error messages for projection and filtering expressions (apache#4243)
1 parent 44801d8 commit 5776a3d

9 files changed

Lines changed: 65 additions & 15 deletions

File tree

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2737,12 +2737,12 @@ void testTransformErrorMessage() {
27372737
.cause()
27382738
.isExactlyInstanceOf(FlinkRuntimeException.class)
27392739
.hasMessage(
2740-
"Failed to compile expression TransformExpressionKey{expression='"
2740+
"Failed to compile expression TransformExpressionKey{originalExpression='id1 > 0', expression='"
27412741
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
27422742
+ "greaterThan($0, 0)', argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}")
27432743
.cause()
27442744
.hasMessageContaining(
2745-
"Expression: "
2745+
"Compiled expression: "
27462746
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
27472747
+ "greaterThan($0, 0)")
27482748
.hasMessageContaining("Column name map: {$0 -> id1}")
@@ -2787,7 +2787,7 @@ void testTransformErrorMessage() {
27872787
.cause()
27882788
.isExactlyInstanceOf(RuntimeException.class)
27892789
.hasMessageContaining(
2790-
"Failed to evaluate projection expression `castToInteger($0) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`")
2790+
"Failed to evaluate projection expression `CAST(`TB`.`name` AS INTEGER) + 1` for column `new_name` in table `default_namespace.default_schema.mytable1`")
27912791
.hasMessageContaining("Column name map: {$0 -> name}");
27922792

27932793
// Unsupported operations in filter rule
@@ -2808,7 +2808,10 @@ void testTransformErrorMessage() {
28082808
.cause()
28092809
.isExactlyInstanceOf(RuntimeException.class)
28102810
.hasMessageContaining(
2811-
"Failed to evaluate filtering expression `greaterThan($0 + 1, 0)` for table `default_namespace.default_schema.mytable1`")
2811+
"Failed to evaluate filtering expression for table `default_namespace.default_schema.mytable1`.\n"
2812+
+ "\tOriginal expression: name + 1 > 0\n"
2813+
+ "\tCompiled expression: greaterThan($0 + 1, 0)\n"
2814+
+ "\tColumn name map: {$0 -> name}")
28122815
.hasMessageContaining("Column name map: {$0 -> name}")
28132816
.rootCause()
28142817
.isExactlyInstanceOf(RuntimeException.class)

flink-cdc-composer/src/test/resources/specs/casting.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,4 @@
268268
id_
269269
CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11
270270
primary-key: id_
271-
expect-error: Failed to evaluate projection expression `castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table `foo.bar.baz`.
271+
expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`.

flink-cdc-composer/src/test/resources/specs/logical.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@
4545
bool_ IS FALSE AS comp_8
4646
bool_ IS NOT FALSE AS comp_9
4747
primary-key: id_
48-
expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.'
48+
expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.'

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ public DataType getDataType() {
8787
return column.getType();
8888
}
8989

90+
public String getExpression() {
91+
return expression;
92+
}
93+
9094
public String getScriptExpression() {
9195
return scriptExpression;
9296
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,12 @@ public Object evaluate(Object[] rowData, TransformContext context) {
8989
throw new RuntimeException(
9090
String.format(
9191
"Failed to evaluate projection expression `%s` for column `%s` in table `%s`.\n"
92+
+ "\tCompiled expression: %s\n"
9293
+ "\tColumn name map: {%s}",
93-
projectionColumn.getScriptExpression(),
94+
projectionColumn.getExpression(),
9495
projectionColumn.getColumnName(),
9596
tableInfo.getName(),
97+
projectionColumn.getScriptExpression(),
9698
projectionColumn.getColumnNameMapAsString()),
9799
e);
98100
}
@@ -168,6 +170,7 @@ private TransformExpressionKey generateTransformExpressionKey() {
168170
paramTypes.add(Long.class);
169171

170172
return TransformExpressionKey.of(
173+
projectionColumn.getExpression(),
171174
JaninoCompiler.loadSystemFunction(scriptExpression),
172175
argumentNames,
173176
paramTypes,

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ public static ExpressionEvaluator compileExpression(
7777
} catch (CompileException e) {
7878
throw new InvalidProgramException(
7979
String.format(
80-
"Expression cannot be compiled. This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map: {%s}",
80+
"Expression cannot be compiled. This is a bug. Please file an issue.\n"
81+
+ "\tOriginal expression: %s\n"
82+
+ "\tCompiled expression: %s\n"
83+
+ "\tColumn name map: {%s}",
84+
key.getOriginalExpression(),
8185
key.getExpression(),
8286
TransformException.prettyPrintColumnNameMap(
8387
key.getColumnNameMap())),

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.runtime.operators.transform;
1919

20+
import javax.annotation.Nullable;
21+
2022
import java.io.Serializable;
2123
import java.util.Collections;
2224
import java.util.List;
@@ -29,7 +31,8 @@
2931
* <p>A transform expression key contains:
3032
*
3133
* <ul>
32-
* <li>expression: a string for the transformation expression.
34+
* <li>originalExpression: a string for the original transformation expression input by users.
35+
* <li>expression: a string for the compiled transformation expression.
3336
* <li>argumentNames: a list for the argument names in expression.
3437
* <li>argumentClasses: a list for the argument classes in expression.
3538
* <li>returnClass: a class for the return class in expression
@@ -39,25 +42,33 @@
3942
*/
4043
public class TransformExpressionKey implements Serializable {
4144
private static final long serialVersionUID = 1L;
45+
@Nullable private final String originalExpression;
4246
private final String expression;
4347
private final List<String> argumentNames;
4448
private final List<Class<?>> argumentClasses;
4549
private final Class<?> returnClass;
4650
private final Map<String, String> columnNameMap;
4751

4852
private TransformExpressionKey(
53+
@Nullable String originalExpression,
4954
String expression,
5055
List<String> argumentNames,
5156
List<Class<?>> argumentClasses,
5257
Class<?> returnClass,
5358
Map<String, String> columnNameMap) {
59+
this.originalExpression = originalExpression;
5460
this.expression = expression;
5561
this.argumentNames = argumentNames;
5662
this.argumentClasses = argumentClasses;
5763
this.returnClass = returnClass;
5864
this.columnNameMap = columnNameMap;
5965
}
6066

67+
@Nullable
68+
public String getOriginalExpression() {
69+
return originalExpression;
70+
}
71+
6172
public String getExpression() {
6273
return expression;
6374
}
@@ -79,13 +90,19 @@ public Map<String, String> getColumnNameMap() {
7990
}
8091

8192
public static TransformExpressionKey of(
93+
@Nullable String originalExpression,
8294
String expression,
8395
List<String> argumentNames,
8496
List<Class<?>> argumentClasses,
8597
Class<?> returnClass,
8698
Map<String, String> columnNameMap) {
8799
return new TransformExpressionKey(
88-
expression, argumentNames, argumentClasses, returnClass, columnNameMap);
100+
originalExpression,
101+
expression,
102+
argumentNames,
103+
argumentClasses,
104+
returnClass,
105+
columnNameMap);
89106
}
90107

91108
@Override
@@ -97,7 +114,8 @@ public boolean equals(Object o) {
97114
return false;
98115
}
99116
TransformExpressionKey that = (TransformExpressionKey) o;
100-
return expression.equals(that.expression)
117+
return Objects.equals(originalExpression, that.originalExpression)
118+
&& expression.equals(that.expression)
101119
&& argumentNames.equals(that.argumentNames)
102120
&& argumentClasses.equals(that.argumentClasses)
103121
&& returnClass.equals(that.returnClass)
@@ -106,13 +124,22 @@ public boolean equals(Object o) {
106124

107125
@Override
108126
public int hashCode() {
109-
return Objects.hash(expression, argumentNames, argumentClasses, returnClass, columnNameMap);
127+
return Objects.hash(
128+
originalExpression,
129+
expression,
130+
argumentNames,
131+
argumentClasses,
132+
returnClass,
133+
columnNameMap);
110134
}
111135

112136
@Override
113137
public String toString() {
114138
return "TransformExpressionKey{"
115-
+ "expression='"
139+
+ "originalExpression='"
140+
+ originalExpression
141+
+ '\''
142+
+ ", expression='"
116143
+ expression
117144
+ '\''
118145
+ ", argumentNames="

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,13 @@ public boolean test(Object[] preRow, Object[] postRow, TransformContext context)
112112
} catch (InvocationTargetException e) {
113113
throw new RuntimeException(
114114
String.format(
115-
"Failed to evaluate filtering expression `%s` for table `%s`.\n"
115+
"Failed to evaluate filtering expression for table `%s`.\n"
116+
+ "\tOriginal expression: %s\n"
117+
+ "\tCompiled expression: %s\n"
116118
+ "\tColumn name map: {%s}",
117-
transformFilter.getScriptExpression(),
118119
tableInfo.getName(),
120+
transformFilter.getExpression(),
121+
transformFilter.getScriptExpression(),
119122
transformFilter.getColumnNameMapAsString()),
120123
e);
121124
}
@@ -206,6 +209,7 @@ private TransformExpressionKey generateTransformExpressionKey() {
206209
args.f1.add(Long.class);
207210

208211
return TransformExpressionKey.of(
212+
transformFilter.getExpression(),
209213
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
210214
args.f0,
211215
args.f1,

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2767,7 +2767,12 @@ void testCastErrorTransform() throws Exception {
27672767
+ "\tcolumns={`col1` STRING NOT NULL,`castInt` INT,`castBoolean` BOOLEAN,`castTinyint` TINYINT,`castSmallint` SMALLINT,`castBigint` BIGINT,`castFloat` FLOAT,`castDouble` DOUBLE,`castChar` STRING,`castVarchar` STRING,`castDecimal` DECIMAL(4, 2),`castTimestamp` TIMESTAMP(3)}, primaryKeys=col1, options=().")
27682768
.cause()
27692769
.hasRootCauseInstanceOf(IllegalArgumentException.class)
2770+
.hasMessageContaining(
2771+
"Failed to evaluate projection expression `CAST(`TB`.`castFloat` AS TIMESTAMP(3))` for column `castTimestamp` in table `my_company.my_branch.data_cast`.\n"
2772+
+ "\tCompiled expression: castToTimestamp($0, __time_zone__)\n"
2773+
+ "\tColumn name map: {$0 -> castFloat}")
27702774
.hasRootCauseMessage("Unable to parse given string as timestamp: 1.0");
2775+
27712776
transformFunctionEventEventOperatorTestHarness.close();
27722777
}
27732778

0 commit comments

Comments
 (0)