From 0086aa39f9227571c2ab506b91c90033299968d3 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 2 Feb 2026 15:21:14 +0800 Subject: [PATCH 1/3] [FLINK-38888][feature] YAML Pipeline supports item subscription of complex types --- .../src/test/resources/specs/nested.yaml | 15 -- .../functions/impl/StructFunctions.java | 96 ++++++++++++ .../cdc/runtime/parser/JaninoCompiler.java | 35 ++++- .../cdc/runtime/parser/TransformParser.java | 2 +- .../metadata/TransformSqlOperatorTable.java | 6 + .../functions/impl/StructFunctionsTest.java | 143 ++++++++++++++++++ .../runtime/parser/TransformParserTest.java | 45 ++++++ 7 files changed, 325 insertions(+), 17 deletions(-) create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml b/flink-cdc-composer/src/test/resources/specs/nested.yaml index f230cc6ca1f..f61ec8f7915 100644 --- a/flink-cdc-composer/src/test/resources/specs/nested.yaml +++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml @@ -119,7 +119,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[], op=DELETE, meta=()} - do: Integer Array Subscripting - ignore: FLINK-38888 projection: |- id_ array_int_ @@ -133,7 +132,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: String Array Subscripting - ignore: FLINK-38888 projection: |- id_ array_string_ @@ -147,7 +145,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Array OOB Subscripting - ignore: FLINK-38888 projection: |- id_ array_string_[0] AS negative_overflow @@ -161,7 +158,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Array Subscripting With Sub Expression - ignore: FLINK-38888 projection: |- id_ array_int_[1 + 1] AS int_key @@ -175,7 +171,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Filter by Array-related Expression - ignore: FLINK-38888 projection: id_, array_string_ filter: array_string_[3] = '五' primary-key: id_ @@ -184,7 +179,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()} - do: Map Subscripting - ignore: FLINK-38888 projection: |- id_ map_int_string_ @@ -198,7 +192,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Map Subscripting with absent key - ignore: FLINK-38888 projection: |- id_ map_int_string_[233] AS map_value @@ -211,7 +204,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()} - do: Map Subscripting with complex expression - ignore: FLINK-38888 projection: |- id_ map_int_string_[1 + 2] AS map_value @@ -224,7 +216,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()} - do: Map Subscripting with nested expressions - ignore: FLINK-38888 projection: |- id_ map_string_array_string_[lower('ONE')] AS index_1 @@ -238,7 +229,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Map Subscripting With Absent Object - ignore: FLINK-38888 projection: |- id_ map_string_array_string_['foo'] AS index_1 @@ -252,7 +242,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()} - do: Filter by Map-related Expression - ignore: FLINK-38888 projection: id_, map_int_string_ filter: map_int_string_[1] = 'one' primary-key: id_ @@ -260,7 +249,6 @@ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP}, primaryKeys=id_, options=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()} - do: Record Subscripting With Index - ignore: FLINK-38888 projection: |- id_ complex_row_ @@ -275,7 +263,6 @@ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], after=[], op=DELETE, meta=()} - do: Record Subscripting With Invalid Index - ignore: FLINK-38888 projection: |- id_ complex_row_ @@ -283,7 +270,6 @@ primary-key: id_ expect-error: 'Cannot infer type of field at position 0 within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)' - do: Record Subscripting With Computed Index (Illegal, type must be statically determined) - ignore: FLINK-38888 projection: |- id_ complex_row_ @@ -291,7 +277,6 @@ primary-key: id_ expect-error: 'Cannot infer type of field at position null within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)' - do: Filter by Record-related Expression - ignore: FLINK-38888 projection: id_, complex_row_ filter: complex_row_[1] = 'Derrida' primary-key: id_ diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java new file mode 100644 index 00000000000..0f3202440c4 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.functions.impl; + +import java.util.List; +import java.util.Map; + +/** Struct built-in functions. */ +public class StructFunctions { + + /** + * Accesses an element from an ARRAY or MAP by index or key. + * + *

For ARRAY: Uses 1-based index (SQL standard). array[1] returns the first element. + * + *

For MAP: Uses key to access the value. map['key'] returns the value for 'key'. + */ + public static Object itemAccess(Object collection, Object indexOrKey) { + if (collection == null || indexOrKey == null) { + return null; + } + + Object result; + if (collection instanceof List) { + result = arrayElement((List) collection, indexOrKey); + } else if (collection instanceof Map) { + result = mapValue((Map) collection, indexOrKey); + } else { + throw new IllegalArgumentException( + "itemAccess only supports List or Map, but got: " + + collection.getClass().getName()); + } + return result; + } + + /** + * Gets an element from an Object array by index (1-based, SQL standard). This overload handles + * arrays that have been converted from ArrayData to Object[] by DataTypeConverter. + * + * @param array the Object array to access + * @param index the 1-based index + * @return the element at the specified index, or null if index is out of bounds + */ + public static Object arrayElement(List array, Object index) { + if (array == null || index == null) { + return null; + } + + int idx; + if (index instanceof Number) { + idx = ((Number) index).intValue(); + } else { + idx = Integer.parseInt(index.toString()); + } + + // Convert 1-based index to 0-based (SQL standard uses 1-based indexing) + int zeroBasedIndex = idx - 1; + + // Check bounds + if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) { + return null; + } + + return array.get(zeroBasedIndex); + } + + /** + * Gets a value from a Map by key. + * + * @param map the Map to access + * @param key the key to look up + * @return the value for the specified key, or null if not found + */ + public static Object mapValue(Map map, Object key) { + if (map == null || key == null) { + return null; + } + + return map.get(key); + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 4345331ac7e..6c2849c8ac7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -89,7 +89,7 @@ public class JaninoCompiler { public static final String DEFAULT_TIME_ZONE = "__time_zone__"; private static final String[] BUILTIN_FUNCTION_MODULES = { - "Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal" + "Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct", "Temporal" }; @VisibleForTesting @@ -306,6 +306,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue( return generateTimestampAddOperation(context, sqlBasicCall, atoms); case OTHER: return generateOtherOperation(context, sqlBasicCall, atoms); + case ITEM: + return generateItemAccessOperation(context, sqlBasicCall, atoms); default: throw new ParseException("Unrecognized expression: " + sqlBasicCall); } @@ -468,6 +470,37 @@ private static Java.Rvalue generateOtherOperation( throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } + private static Java.Rvalue generateItemAccessOperation( + Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + if (atoms.length != 2) { + throw new ParseException("Unrecognized item access expression: " + sqlBasicCall); + } + Java.Rvalue methodInvocation = + new Java.MethodInvocation(Location.NOWHERE, null, "itemAccess", atoms); + + // Deduce the return type and add a cast to ensure proper type conversion + DataType resultType = + TransformParser.deduceSubExpressionType( + context.columns, + sqlBasicCall, + context.udfDescriptors, + context.supportedMetadataColumns); + + // Get the Java class for the result type and add a cast + Class javaClass = JavaClassConverter.toJavaClass(resultType); + if (javaClass != null && javaClass != Object.class) { + return new Java.Cast( + Location.NOWHERE, + new Java.ReferenceType( + Location.NOWHERE, + new Java.Annotation[0], + javaClass.getName().split("\\."), + null), + methodInvocation); + } + return methodInvocation; + } + private static Java.Rvalue generateOtherFunctionOperation( Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { String operationName = sqlBasicCall.getOperator().getName().toUpperCase(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 5ee7153013c..841878b24e0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -93,7 +93,7 @@ public class TransformParser { private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class); private static final String DEFAULT_SCHEMA = "default_schema"; - private static final String DEFAULT_TABLE = "TB"; + public static final String DEFAULT_TABLE = "TB"; private static final String MAPPED_COLUMN_NAME_PREFIX = "$"; private static final String MAPPED_SINGLE_COLUMN_NAME = MAPPED_COLUMN_NAME_PREFIX + "0"; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 468566b8200..ad2820d0f66 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -398,6 +398,12 @@ public SqlSyntax getSyntax() { // -------------- public static final SqlFunction CAST = SqlStdOperatorTable.CAST; + // --------------------- + // Collection Functions + // --------------------- + // Supports array[index] and map[key] syntax + public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM; + public static final SqlFunction AI_CHAT_PREDICT = new SqlFunction( "AI_CHAT_PREDICT", diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java new file mode 100644 index 00000000000..0505bd7e620 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.functions.impl; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link StructFunctions}. */ +class StructFunctionsTest { + + @Test + void testItemAccessWithList() { + List list = Arrays.asList("one", "two", "three"); + + // SQL uses 1-based indexing + assertThat(StructFunctions.itemAccess(list, 1)).isEqualTo("one"); + assertThat(StructFunctions.itemAccess(list, 2)).isEqualTo("two"); + assertThat(StructFunctions.itemAccess(list, 3)).isEqualTo("three"); + } + + @Test + void testItemAccessWithMap() { + Map map = new HashMap<>(); + map.put("a", 1); + map.put("b", 2); + map.put("c", 3); + + assertThat(StructFunctions.itemAccess(map, "a")).isEqualTo(1); + assertThat(StructFunctions.itemAccess(map, "b")).isEqualTo(2); + assertThat(StructFunctions.itemAccess(map, "c")).isEqualTo(3); + } + + @Test + void testItemAccessWithNull() { + List list = Arrays.asList("one", "two", "three"); + Map map = new HashMap<>(); + map.put("a", 1); + + // Null collection returns null + assertThat(StructFunctions.itemAccess(null, 1)).isNull(); + assertThat(StructFunctions.itemAccess(null, "a")).isNull(); + + // Null index/key returns null + assertThat(StructFunctions.itemAccess(list, null)).isNull(); + assertThat(StructFunctions.itemAccess(map, null)).isNull(); + } + + @Test + void testItemAccessWithInvalidType() { + assertThatThrownBy(() -> StructFunctions.itemAccess("not a collection", 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("itemAccess only supports List or Map"); + } + + @Test + void testArrayElementWithNormalIndex() { + List array = Arrays.asList(10, 20, 30, 40, 50); + + // SQL uses 1-based indexing + assertThat(StructFunctions.arrayElement(array, 1)).isEqualTo(10); + assertThat(StructFunctions.arrayElement(array, 3)).isEqualTo(30); + assertThat(StructFunctions.arrayElement(array, 5)).isEqualTo(50); + } + + @Test + void testArrayElementWithOutOfBoundsIndex() { + List array = Arrays.asList(10, 20, 30); + + // Out of bounds returns null (not throw exception) + assertThat(StructFunctions.arrayElement(array, 0)).isNull(); // Index 0 is invalid in SQL + assertThat(StructFunctions.arrayElement(array, 4)).isNull(); + assertThat(StructFunctions.arrayElement(array, -1)).isNull(); + assertThat(StructFunctions.arrayElement(array, 100)).isNull(); + } + + @Test + void testArrayElementWithNull() { + List array = Arrays.asList(10, 20, 30); + + assertThat(StructFunctions.arrayElement(null, 1)).isNull(); + assertThat(StructFunctions.arrayElement(array, null)).isNull(); + } + + @Test + void testArrayElementWithStringIndex() { + List array = Arrays.asList(10, 20, 30); + + // Index can be string that can be parsed to integer + assertThat(StructFunctions.arrayElement(array, "1")).isEqualTo(10); + assertThat(StructFunctions.arrayElement(array, "2")).isEqualTo(20); + } + + @Test + void testMapValue() { + Map map = new HashMap<>(); + map.put(1, "one"); + map.put(2, "two"); + map.put(3, "three"); + + assertThat(StructFunctions.mapValue(map, 1)).isEqualTo("one"); + assertThat(StructFunctions.mapValue(map, 2)).isEqualTo("two"); + assertThat(StructFunctions.mapValue(map, 3)).isEqualTo("three"); + } + + @Test + void testMapValueWithAbsentKey() { + Map map = new HashMap<>(); + map.put(1, "one"); + + assertThat(StructFunctions.mapValue(map, 999)).isNull(); + } + + @Test + void testMapValueWithNull() { + Map map = new HashMap<>(); + map.put(1, "one"); + + assertThat(StructFunctions.mapValue(null, 1)).isNull(); + assertThat(StructFunctions.mapValue(map, null)).isNull(); + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 3f1d026862b..77dccd63fb6 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -391,6 +391,39 @@ void testTranslateFilterToJaninoExpression() { testFilterExpression("try_parse_json(jsonStr)", "tryParseJson(jsonStr)"); } + @Test + public void testTranslateItemAccessToJaninoExpression() { + // Test collection access functions (ARRAY, MAP) with proper column schema + List columns = + List.of( + Column.physicalColumn("arr", DataTypes.ARRAY(DataTypes.STRING())), + Column.physicalColumn( + "m", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())), + Column.physicalColumn("idx", DataTypes.INT()), + Column.physicalColumn("k", DataTypes.STRING())); + + // Array access: array[index] - index is 1-based (SQL standard) + // Result type is String (from ARRAY), so cast is added + testFilterExpressionWithColumns("arr[1]", "(java.lang.String) itemAccess(arr, 1)", columns); + testFilterExpressionWithColumns("arr[2]", "(java.lang.String) itemAccess(arr, 2)", columns); + testFilterExpressionWithColumns( + "arr[idx]", "(java.lang.String) itemAccess(arr, idx)", columns); + // Map access: map[key] + // Result type is Integer (from MAP), so cast is added + testFilterExpressionWithColumns( + "m['key']", "(java.lang.Integer) itemAccess(m, \"key\")", columns); + testFilterExpressionWithColumns("m[k]", "(java.lang.Integer) itemAccess(m, k)", columns); + // Nested access with comparisons + testFilterExpressionWithColumns( + "arr[1] = 'value'", + "valueEquals((java.lang.String) itemAccess(arr, 1), \"value\")", + columns); + testFilterExpressionWithColumns( + "m['key'] > 10", + "greaterThan((java.lang.Integer) itemAccess(m, \"key\"), 10)", + columns); + } + @Test public void testTranslateFilterToJaninoExpressionError() { Assertions.assertThatThrownBy( @@ -818,6 +851,18 @@ private void testFilterExpression(String expression, String expressionExpect) { Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); } + private void testFilterExpressionWithColumns( + String expression, String expressionExpect, List columns) { + String janinoExpression = + TransformParser.translateFilterExpressionToJaninoExpression( + expression, + columns, + Collections.emptyList(), + new SupportedMetadataColumn[0], + Collections.emptyMap()); + Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect); + } + private void testFilterExpressionWithUdf(String expression, String expressionExpect) { testFilterExpressionWithUdf( expression, expressionExpect, DUMMY_COLUMNS, Collections.emptyMap()); From e280cd4d364461bab10bf4d256cd19f11d4d2b47 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 2 Feb 2026 15:27:49 +0800 Subject: [PATCH 2/3] [FLINK-38888][feature] YAML Pipeline supports item subscription of complex types --- .../org/apache/flink/cdc/runtime/parser/TransformParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 841878b24e0..5ee7153013c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -93,7 +93,7 @@ public class TransformParser { private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class); private static final String DEFAULT_SCHEMA = "default_schema"; - public static final String DEFAULT_TABLE = "TB"; + private static final String DEFAULT_TABLE = "TB"; private static final String MAPPED_COLUMN_NAME_PREFIX = "$"; private static final String MAPPED_SINGLE_COLUMN_NAME = MAPPED_COLUMN_NAME_PREFIX + "0"; From 6067cbc2a232be71e5ca17a32ddb4d4a2d1b2c82 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 2 Feb 2026 21:30:59 +0800 Subject: [PATCH 3/3] [FLINK-38888][feature] YAML Pipeline supports item subscription of complex types --- .../content.zh/docs/core-concept/transform.md | 8 + docs/content/docs/core-concept/transform.md | 10 + .../functions/impl/StructFunctions.java | 64 ++--- .../cdc/runtime/parser/JaninoCompiler.java | 29 ++- .../metadata/TransformSqlOperatorTable.java | 4 +- .../functions/impl/StructFunctionsTest.java | 233 ++++++++++-------- .../runtime/parser/TransformParserTest.java | 5 + 7 files changed, 187 insertions(+), 166 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index aec40d70abb..5f278e798da 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -226,6 +226,14 @@ You can use `CAST( AS )` syntax to convert any valid expression `
If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. | | TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function.

If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. | +## Struct Functions + +| Function | Janino Code | Description | +| -------- | ----------- | ----------- | +| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL 标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 | +| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为 NULL,则返回 NULL。 | +| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1 开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 | + # 示例 ## 添加计算列 表达式可以用来生成新的列。例如,如果我们想基于表 `web_order` 在数据库 `mydb` 中添加两个计算列,我们可以定义一个转换规则如下: diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 584c3606df0..d0a4901b444 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -227,6 +227,16 @@ You can use `CAST( AS )` syntax to convert any valid expression `
If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. | | TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function.

If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. | +## Struct Functions + +Struct functions are used to access elements in ARRAY, MAP and ROW types using subscript syntax. + +| Function | Janino Code | Description | +| -------- | ----------- | ----------- | +| array[index] | itemAccess(array, index) | Returns the element at position `index` in the array. Index is 1-based (SQL standard). Returns NULL if the index is out of bounds or if the array is NULL. | +| map[key] | itemAccess(map, key) | Returns the value associated with `key` in the map. Returns NULL if the key does not exist or if the map is NULL. | +| row[index] | itemAccess(row, index) | Returns the field at position `index` in the row. Index is 1-based. The index must be a constant (not a computed expression) since the return type must be statically determined. | + # Example ## Add computed columns Evaluation expressions can be used to generate new columns. For example, if we want to append two computed columns based on the table `web_order` in the database `mydb`, we may define a transform rule as follows: diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java index 0f3202440c4..603930cc950 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java @@ -20,77 +20,51 @@ import java.util.List; import java.util.Map; -/** Struct built-in functions. */ +/** + * Built-in functions for collection and struct data types. + * + *

These functions support accessing elements from collections (ARRAY, MAP) and structured data + * types (ROW). + */ public class StructFunctions { /** - * Accesses an element from an ARRAY or MAP by index or key. - * - *

For ARRAY: Uses 1-based index (SQL standard). array[1] returns the first element. + * Accesses an element from an ARRAY by index (1-based, SQL standard). * - *

For MAP: Uses key to access the value. map['key'] returns the value for 'key'. - */ - public static Object itemAccess(Object collection, Object indexOrKey) { - if (collection == null || indexOrKey == null) { - return null; - } - - Object result; - if (collection instanceof List) { - result = arrayElement((List) collection, indexOrKey); - } else if (collection instanceof Map) { - result = mapValue((Map) collection, indexOrKey); - } else { - throw new IllegalArgumentException( - "itemAccess only supports List or Map, but got: " - + collection.getClass().getName()); - } - return result; - } - - /** - * Gets an element from an Object array by index (1-based, SQL standard). This overload handles - * arrays that have been converted from ArrayData to Object[] by DataTypeConverter. + *

array[1] returns the first element. * - * @param array the Object array to access + * @param the element type of the array + * @param array the array to access * @param index the 1-based index * @return the element at the specified index, or null if index is out of bounds */ - public static Object arrayElement(List array, Object index) { + public static T itemAccess(List array, Integer index) { if (array == null || index == null) { return null; } - - int idx; - if (index instanceof Number) { - idx = ((Number) index).intValue(); - } else { - idx = Integer.parseInt(index.toString()); - } - // Convert 1-based index to 0-based (SQL standard uses 1-based indexing) - int zeroBasedIndex = idx - 1; - - // Check bounds + int zeroBasedIndex = index - 1; if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) { return null; } - return array.get(zeroBasedIndex); } /** - * Gets a value from a Map by key. + * Accesses a value from a MAP by key. * - * @param map the Map to access + *

map['key'] returns the value for 'key'. + * + * @param the key type of the map + * @param the value type of the map + * @param map the map to access * @param key the key to look up * @return the value for the specified key, or null if not found */ - public static Object mapValue(Map map, Object key) { + public static V itemAccess(Map map, K key) { if (map == null || key == null) { return null; } - return map.get(key); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 6c2849c8ac7..fb506925529 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; @@ -472,9 +473,11 @@ private static Java.Rvalue generateOtherOperation( private static Java.Rvalue generateItemAccessOperation( Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { - if (atoms.length != 2) { - throw new ParseException("Unrecognized item access expression: " + sqlBasicCall); - } + Preconditions.checkArgument( + atoms.length == 2, + "Expecting item accessing call %s to have 2 operands, got %s actually", + sqlBasicCall, + List.of(atoms)); Java.Rvalue methodInvocation = new Java.MethodInvocation(Location.NOWHERE, null, "itemAccess", atoms); @@ -487,16 +490,20 @@ private static Java.Rvalue generateItemAccessOperation( context.supportedMetadataColumns); // Get the Java class for the result type and add a cast + // Use getCanonicalName() to correctly handle array types (e.g., byte[] instead of "[B") Class javaClass = JavaClassConverter.toJavaClass(resultType); if (javaClass != null && javaClass != Object.class) { - return new Java.Cast( - Location.NOWHERE, - new Java.ReferenceType( - Location.NOWHERE, - new Java.Annotation[0], - javaClass.getName().split("\\."), - null), - methodInvocation); + String canonicalName = javaClass.getCanonicalName(); + if (canonicalName != null) { + return new Java.Cast( + Location.NOWHERE, + new Java.ReferenceType( + Location.NOWHERE, + new Java.Annotation[0], + canonicalName.split("\\."), + null), + methodInvocation); + } } return methodInvocation; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index ad2820d0f66..7d0f66362bd 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -399,9 +399,9 @@ public SqlSyntax getSyntax() { public static final SqlFunction CAST = SqlStdOperatorTable.CAST; // --------------------- - // Collection Functions + // Struct Functions // --------------------- - // Supports array[index] and map[key] syntax + // Supports accessing elements of ARRAY[index], ROW[index] and MAP[key] public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM; public static final SqlFunction AI_CHAT_PREDICT = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java index 0505bd7e620..1ac03cc2b67 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java @@ -17,127 +17,144 @@ package org.apache.flink.cdc.runtime.functions.impl; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link StructFunctions}. */ class StructFunctionsTest { - @Test - void testItemAccessWithList() { - List list = Arrays.asList("one", "two", "three"); - - // SQL uses 1-based indexing - assertThat(StructFunctions.itemAccess(list, 1)).isEqualTo("one"); - assertThat(StructFunctions.itemAccess(list, 2)).isEqualTo("two"); - assertThat(StructFunctions.itemAccess(list, 3)).isEqualTo("three"); - } - - @Test - void testItemAccessWithMap() { - Map map = new HashMap<>(); - map.put("a", 1); - map.put("b", 2); - map.put("c", 3); - - assertThat(StructFunctions.itemAccess(map, "a")).isEqualTo(1); - assertThat(StructFunctions.itemAccess(map, "b")).isEqualTo(2); - assertThat(StructFunctions.itemAccess(map, "c")).isEqualTo(3); - } - - @Test - void testItemAccessWithNull() { - List list = Arrays.asList("one", "two", "three"); - Map map = new HashMap<>(); - map.put("a", 1); - - // Null collection returns null - assertThat(StructFunctions.itemAccess(null, 1)).isNull(); - assertThat(StructFunctions.itemAccess(null, "a")).isNull(); - - // Null index/key returns null - assertThat(StructFunctions.itemAccess(list, null)).isNull(); - assertThat(StructFunctions.itemAccess(map, null)).isNull(); + // ======================================== + // List (ARRAY) Access Tests + // ======================================== + @Nested + class ListAccessTests { + + @Test + void testNormalAccess() { + List list = Arrays.asList("one", "two", "three"); + + // SQL uses 1-based indexing + assertThat(StructFunctions.itemAccess(list, 1)).isEqualTo("one"); + assertThat(StructFunctions.itemAccess(list, 2)).isEqualTo("two"); + assertThat(StructFunctions.itemAccess(list, 3)).isEqualTo("three"); + } + + @Test + void testOutOfBoundsAccess() { + List list = Arrays.asList(10, 20, 30); + + // Index 0 is invalid in SQL (1-based indexing) + assertThat(StructFunctions.itemAccess(list, 0)).isNull(); + // Negative index + assertThat(StructFunctions.itemAccess(list, -1)).isNull(); + // Index beyond size + assertThat(StructFunctions.itemAccess(list, 4)).isNull(); + assertThat(StructFunctions.itemAccess(list, 100)).isNull(); + } + + @Test + void testNullHandling() { + List list = Arrays.asList("one", "two", "three"); + + // Null list returns null + assertThat(StructFunctions.itemAccess((List) null, 1)).isNull(); + // Null index returns null + assertThat(StructFunctions.itemAccess(list, null)).isNull(); + } + + @Test + void testEmptyList() { + List emptyList = Collections.emptyList(); + + assertThat(StructFunctions.itemAccess(emptyList, 1)).isNull(); + } + + @Test + void testListWithNullElement() { + List listWithNull = new ArrayList<>(); + listWithNull.add("first"); + listWithNull.add(null); + listWithNull.add("third"); + + assertThat(StructFunctions.itemAccess(listWithNull, 1)).isEqualTo("first"); + assertThat(StructFunctions.itemAccess(listWithNull, 2)).isNull(); + assertThat(StructFunctions.itemAccess(listWithNull, 3)).isEqualTo("third"); + } } - @Test - void testItemAccessWithInvalidType() { - assertThatThrownBy(() -> StructFunctions.itemAccess("not a collection", 1)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("itemAccess only supports List or Map"); - } - - @Test - void testArrayElementWithNormalIndex() { - List array = Arrays.asList(10, 20, 30, 40, 50); - - // SQL uses 1-based indexing - assertThat(StructFunctions.arrayElement(array, 1)).isEqualTo(10); - assertThat(StructFunctions.arrayElement(array, 3)).isEqualTo(30); - assertThat(StructFunctions.arrayElement(array, 5)).isEqualTo(50); - } - - @Test - void testArrayElementWithOutOfBoundsIndex() { - List array = Arrays.asList(10, 20, 30); - - // Out of bounds returns null (not throw exception) - assertThat(StructFunctions.arrayElement(array, 0)).isNull(); // Index 0 is invalid in SQL - assertThat(StructFunctions.arrayElement(array, 4)).isNull(); - assertThat(StructFunctions.arrayElement(array, -1)).isNull(); - assertThat(StructFunctions.arrayElement(array, 100)).isNull(); - } - - @Test - void testArrayElementWithNull() { - List array = Arrays.asList(10, 20, 30); - - assertThat(StructFunctions.arrayElement(null, 1)).isNull(); - assertThat(StructFunctions.arrayElement(array, null)).isNull(); - } - - @Test - void testArrayElementWithStringIndex() { - List array = Arrays.asList(10, 20, 30); - - // Index can be string that can be parsed to integer - assertThat(StructFunctions.arrayElement(array, "1")).isEqualTo(10); - assertThat(StructFunctions.arrayElement(array, "2")).isEqualTo(20); - } - - @Test - void testMapValue() { - Map map = new HashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - - assertThat(StructFunctions.mapValue(map, 1)).isEqualTo("one"); - assertThat(StructFunctions.mapValue(map, 2)).isEqualTo("two"); - assertThat(StructFunctions.mapValue(map, 3)).isEqualTo("three"); - } - - @Test - void testMapValueWithAbsentKey() { - Map map = new HashMap<>(); - map.put(1, "one"); - - assertThat(StructFunctions.mapValue(map, 999)).isNull(); - } - - @Test - void testMapValueWithNull() { - Map map = new HashMap<>(); - map.put(1, "one"); - - assertThat(StructFunctions.mapValue(null, 1)).isNull(); - assertThat(StructFunctions.mapValue(map, null)).isNull(); + // ======================================== + // Map Access Tests + // ======================================== + @Nested + class MapAccessTests { + + @Test + void testNormalAccessWithStringKey() { + Map map = new HashMap<>(); + map.put("a", 1); + map.put("b", 2); + map.put("c", 3); + + assertThat(StructFunctions.itemAccess(map, "a")).isEqualTo(1); + assertThat(StructFunctions.itemAccess(map, "b")).isEqualTo(2); + assertThat(StructFunctions.itemAccess(map, "c")).isEqualTo(3); + } + + @Test + void testNormalAccessWithIntegerKey() { + Map map = new HashMap<>(); + map.put(1, "one"); + map.put(2, "two"); + map.put(3, "three"); + + assertThat(StructFunctions.itemAccess(map, 1)).isEqualTo("one"); + assertThat(StructFunctions.itemAccess(map, 2)).isEqualTo("two"); + assertThat(StructFunctions.itemAccess(map, 3)).isEqualTo("three"); + } + + @Test + void testMissingKey() { + Map map = new HashMap<>(); + map.put("exists", 1); + + assertThat(StructFunctions.itemAccess(map, "nonexistent")).isNull(); + } + + @Test + void testNullHandling() { + Map map = new HashMap<>(); + map.put("a", 1); + + // Null map returns null + assertThat(StructFunctions.itemAccess((Map) null, "a")).isNull(); + // Null key returns null + assertThat(StructFunctions.itemAccess(map, null)).isNull(); + } + + @Test + void testEmptyMap() { + Map emptyMap = Collections.emptyMap(); + + assertThat(StructFunctions.itemAccess(emptyMap, "any")).isNull(); + } + + @Test + void testMapWithNullValue() { + Map mapWithNullValue = new HashMap<>(); + mapWithNullValue.put("key1", "value1"); + mapWithNullValue.put("key2", null); + + assertThat(StructFunctions.itemAccess(mapWithNullValue, "key1")).isEqualTo("value1"); + assertThat(StructFunctions.itemAccess(mapWithNullValue, "key2")).isNull(); + } } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 77dccd63fb6..a45dbde2ac6 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -422,6 +422,11 @@ public void testTranslateItemAccessToJaninoExpression() { "m['key'] > 10", "greaterThan((java.lang.Integer) itemAccess(m, \"key\"), 10)", columns); + + List binaryArrayColumns = + List.of(Column.physicalColumn("binArr", DataTypes.ARRAY(DataTypes.BINARY(16)))); + testFilterExpressionWithColumns( + "binArr[1]", "(byte[]) itemAccess(binArr, 1)", binaryArrayColumns); } @Test