From 43873097636aeded748cf4d88a2d0592226c3436 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 29 Jan 2026 15:45:33 +0800 Subject: [PATCH 1/4] [FLINK-38996][transform] Enhance error messages for projection and filtering expressions --- .../src/test/resources/specs/casting.yaml | 2 +- .../src/test/resources/specs/logical.yaml | 2 +- .../operators/transform/ProjectionColumn.java | 4 ++ .../transform/ProjectionColumnProcessor.java | 5 ++- .../TransformExpressionCompiler.java | 6 ++- .../transform/TransformExpressionKey.java | 37 ++++++++++++++++--- .../transform/TransformFilterProcessor.java | 8 +++- .../transform/PostTransformOperatorTest.java | 4 ++ 8 files changed, 57 insertions(+), 11 deletions(-) diff --git a/flink-cdc-composer/src/test/resources/specs/casting.yaml b/flink-cdc-composer/src/test/resources/specs/casting.yaml index 84a68e788c5..f5ce0e802a5 100644 --- a/flink-cdc-composer/src/test/resources/specs/casting.yaml +++ b/flink-cdc-composer/src/test/resources/specs/casting.yaml @@ -268,4 +268,4 @@ id_ CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11 primary-key: id_ - expect-error: Failed to evaluate projection expression `castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table `foo.bar.baz`. + expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`. diff --git a/flink-cdc-composer/src/test/resources/specs/logical.yaml b/flink-cdc-composer/src/test/resources/specs/logical.yaml index cc825f7a69c..4f09aa82301 100644 --- a/flink-cdc-composer/src/test/resources/specs/logical.yaml +++ b/flink-cdc-composer/src/test/resources/specs/logical.yaml @@ -45,4 +45,4 @@ bool_ IS FALSE AS comp_8 bool_ IS NOT FALSE AS comp_9 primary-key: id_ - expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.' + expect-error: 'java.lang.RuntimeException: Failed to evaluate projection expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.' diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java index 5c3b17a4396..fdbaf267851 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java @@ -87,6 +87,10 @@ public DataType getDataType() { return column.getType(); } + public String getExpression() { + return expression; + } + public String getScriptExpression() { return scriptExpression; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 8db8efc3126..03db01be5e6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -89,10 +89,12 @@ public Object evaluate(Object[] rowData, TransformContext context) { throw new RuntimeException( String.format( "Failed to evaluate projection expression `%s` for column `%s` in table `%s`.\n" + + "\tCompiled expression: %s\n" + "\tColumn name map: {%s}", - projectionColumn.getScriptExpression(), + projectionColumn.getExpression(), projectionColumn.getColumnName(), tableInfo.getName(), + projectionColumn.getScriptExpression(), projectionColumn.getColumnNameMapAsString()), e); } @@ -168,6 +170,7 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(Long.class); return TransformExpressionKey.of( + projectionColumn.getExpression(), JaninoCompiler.loadSystemFunction(scriptExpression), argumentNames, paramTypes, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java index 4c6cb71e1f6..bc496faf92a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java @@ -77,7 +77,11 @@ public static ExpressionEvaluator compileExpression( } catch (CompileException e) { throw new InvalidProgramException( String.format( - "Expression cannot be compiled. This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map: {%s}", + "Expression cannot be compiled. This is a bug. Please file an issue.\n" + + "\tOriginal expression: %s\n" + + "\tCompiled expression: %s\n" + + "\tColumn name map: {%s}", + key.getOriginalExpression(), key.getExpression(), TransformException.prettyPrintColumnNameMap( key.getColumnNameMap())), diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java index 6ece097fc9c..75cb13b665c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.operators.transform; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Collections; import java.util.List; @@ -29,7 +31,8 @@ *

A transform expression key contains: * *