Skip to content

Commit 66ed84f

Browse files
Hisoka-XMrart
authored andcommitted
[FLINK-38888][transform] YAML Pipeline supports item subscription of ARRAY, MAP, and ROW (apache#4241)
1 parent 73b5b2d commit 66ed84f

8 files changed

Lines changed: 345 additions & 16 deletions

File tree

docs/content.zh/docs/core-concept/transform.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any valid expression `<EXP
226226
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string is invalid, an error will be thrown. To return NULL instead of an error, use the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. |
227227
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. |
228228

229+
## Struct Functions
230+
231+
| Function | Janino Code | Description |
232+
| -------- | ----------- | ----------- |
233+
| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL 标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
234+
| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为 NULL,则返回 NULL。 |
235+
| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1 开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |
236+
229237
# 示例
230238
## 添加计算列
231239
表达式可以用来生成新的列。例如,如果我们想基于表 `web_order` 在数据库 `mydb` 中添加两个计算列,我们可以定义一个转换规则如下:

docs/content/docs/core-concept/transform.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any valid expression `<EXP
227227
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string is invalid, an error will be thrown. To return NULL instead of an error, use the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. |
228228
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. |
229229

230+
## Struct Functions
231+
232+
Struct functions are used to access elements in ARRAY, MAP and ROW types using subscript syntax.
233+
234+
| Function | Janino Code | Description |
235+
| -------- | ----------- | ----------- |
236+
| array[index] | itemAccess(array, index) | Returns the element at position `index` in the array. Index is 1-based (SQL standard). Returns NULL if the index is out of bounds or if the array is NULL. |
237+
| map[key] | itemAccess(map, key) | Returns the value associated with `key` in the map. Returns NULL if the key does not exist or if the map is NULL. |
238+
| row[index] | itemAccess(row, index) | Returns the field at position `index` in the row. Index is 1-based. The index must be a constant (not a computed expression) since the return type must be statically determined. |
239+
230240
# Example
231241
## Add computed columns
232242
Evaluation expressions can be used to generate new columns. For example, if we want to append two computed columns based on the table `web_order` in the database `mydb`, we may define a transform rule as follows:

flink-cdc-composer/src/test/resources/specs/nested.yaml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}], op=INSERT, meta=()}
120120
DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[], op=DELETE, meta=()}
121121
- do: Integer Array Subscripting
122-
ignore: FLINK-38888
123122
projection: |-
124123
id_
125124
array_int_
@@ -133,7 +132,6 @@
133132
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
134133
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
135134
- do: String Array Subscripting
136-
ignore: FLINK-38888
137135
projection: |-
138136
id_
139137
array_string_
@@ -147,7 +145,6 @@
147145
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
148146
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
149147
- do: Array OOB Subscripting
150-
ignore: FLINK-38888
151148
projection: |-
152149
id_
153150
array_string_[0] AS negative_overflow
@@ -161,7 +158,6 @@
161158
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
162159
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
163160
- do: Array Subscripting With Sub Expression
164-
ignore: FLINK-38888
165161
projection: |-
166162
id_
167163
array_int_[1 + 1] AS int_key
@@ -175,7 +171,6 @@
175171
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
176172
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
177173
- do: Filter by Array-related Expression
178-
ignore: FLINK-38888
179174
projection: id_, array_string_
180175
filter: array_string_[3] = '五'
181176
primary-key: id_
@@ -184,7 +179,6 @@
184179
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
185180
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()}
186181
- do: Map Subscripting
187-
ignore: FLINK-38888
188182
projection: |-
189183
id_
190184
map_int_string_
@@ -198,7 +192,6 @@
198192
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
199193
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
200194
- do: Map Subscripting with absent key
201-
ignore: FLINK-38888
202195
projection: |-
203196
id_
204197
map_int_string_[233] AS map_value
@@ -211,7 +204,6 @@
211204
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
212205
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
213206
- do: Map Subscripting with complex expression
214-
ignore: FLINK-38888
215207
projection: |-
216208
id_
217209
map_int_string_[1 + 2] AS map_value
@@ -224,7 +216,6 @@
224216
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
225217
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
226218
- do: Map Subscripting with nested expressions
227-
ignore: FLINK-38888
228219
projection: |-
229220
id_
230221
map_string_array_string_[lower('ONE')] AS index_1
@@ -238,7 +229,6 @@
238229
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
239230
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
240231
- do: Map Subscripting With Absent Object
241-
ignore: FLINK-38888
242232
projection: |-
243233
id_
244234
map_string_array_string_['foo'] AS index_1
@@ -252,15 +242,13 @@
252242
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
253243
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
254244
- do: Filter by Map-related Expression
255-
ignore: FLINK-38888
256245
projection: id_, map_int_string_
257246
filter: map_int_string_[1] = 'one'
258247
primary-key: id_
259248
expect: |-
260249
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, options=()}
261250
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()}
262251
- do: Record Subscripting With Index
263-
ignore: FLINK-38888
264252
projection: |-
265253
id_
266254
complex_row_
@@ -275,23 +263,20 @@
275263
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null], op=INSERT, meta=()}
276264
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], after=[], op=DELETE, meta=()}
277265
- do: Record Subscripting With Invalid Index
278-
ignore: FLINK-38888
279266
projection: |-
280267
id_
281268
complex_row_
282269
complex_row_[0] AS bad_key
283270
primary-key: id_
284271
expect-error: 'Cannot infer type of field at position 0 within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
285272
- do: Record Subscripting With Computed Index (Illegal, type must be statically determined)
286-
ignore: FLINK-38888
287273
projection: |-
288274
id_
289275
complex_row_
290276
complex_row_[1 + 1] AS bad_key
291277
primary-key: id_
292278
expect-error: 'Cannot infer type of field at position null within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
293279
- do: Filter by Record-related Expression
294-
ignore: FLINK-38888
295280
projection: id_, complex_row_
296281
filter: complex_row_[1] = 'Derrida'
297282
primary-key: id_
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.functions.impl;
19+
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
/**
24+
* Built-in functions for collection and struct data types.
25+
*
26+
* <p>These functions support accessing elements from collections (ARRAY, MAP) and structured data
27+
* types (ROW).
28+
*/
29+
public class StructFunctions {
30+
31+
/**
32+
* Accesses an element from an ARRAY by index (1-based, SQL standard).
33+
*
34+
* <p>array[1] returns the first element.
35+
*
36+
* @param <T> the element type of the array
37+
* @param array the array to access
38+
* @param index the 1-based index
39+
* @return the element at the specified index, or null if index is out of bounds
40+
*/
41+
public static <T> T itemAccess(List<T> array, Integer index) {
42+
if (array == null || index == null) {
43+
return null;
44+
}
45+
// Convert 1-based index to 0-based (SQL standard uses 1-based indexing)
46+
int zeroBasedIndex = index - 1;
47+
if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
48+
return null;
49+
}
50+
return array.get(zeroBasedIndex);
51+
}
52+
53+
/**
54+
* Accesses a value from a MAP by key.
55+
*
56+
* <p>map['key'] returns the value for 'key'.
57+
*
58+
* @param <K> the key type of the map
59+
* @param <V> the value type of the map
60+
* @param map the map to access
61+
* @param key the key to look up
62+
* @return the value for the specified key, or null if not found
63+
*/
64+
public static <K, V> V itemAccess(Map<K, V> map, K key) {
65+
if (map == null || key == null) {
66+
return null;
67+
}
68+
return map.get(key);
69+
}
70+
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
2626
import org.apache.flink.cdc.common.types.DataType;
2727
import org.apache.flink.cdc.common.types.DataTypeRoot;
28+
import org.apache.flink.cdc.common.utils.Preconditions;
2829
import org.apache.flink.cdc.common.utils.StringUtils;
2930
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
3031

@@ -89,7 +90,7 @@ public class JaninoCompiler {
8990
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
9091

9192
private static final String[] BUILTIN_FUNCTION_MODULES = {
92-
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal"
93+
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct", "Temporal"
9394
};
9495

9596
@VisibleForTesting
@@ -306,6 +307,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue(
306307
return generateTimestampAddOperation(context, sqlBasicCall, atoms);
307308
case OTHER:
308309
return generateOtherOperation(context, sqlBasicCall, atoms);
310+
case ITEM:
311+
return generateItemAccessOperation(context, sqlBasicCall, atoms);
309312
default:
310313
throw new ParseException("Unrecognized expression: " + sqlBasicCall);
311314
}
@@ -468,6 +471,43 @@ private static Java.Rvalue generateOtherOperation(
468471
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
469472
}
470473

474+
private static Java.Rvalue generateItemAccessOperation(
475+
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
476+
Preconditions.checkArgument(
477+
atoms.length == 2,
478+
"Expecting item accessing call %s to have 2 operands, got %s actually",
479+
sqlBasicCall,
480+
List.of(atoms));
481+
Java.Rvalue methodInvocation =
482+
new Java.MethodInvocation(Location.NOWHERE, null, "itemAccess", atoms);
483+
484+
// Deduce the return type and add a cast to ensure proper type conversion
485+
DataType resultType =
486+
TransformParser.deduceSubExpressionType(
487+
context.columns,
488+
sqlBasicCall,
489+
context.udfDescriptors,
490+
context.supportedMetadataColumns);
491+
492+
// Get the Java class for the result type and add a cast
493+
// Use getCanonicalName() to correctly handle array types (e.g., byte[] instead of "[B")
494+
Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
495+
if (javaClass != null && javaClass != Object.class) {
496+
String canonicalName = javaClass.getCanonicalName();
497+
if (canonicalName != null) {
498+
return new Java.Cast(
499+
Location.NOWHERE,
500+
new Java.ReferenceType(
501+
Location.NOWHERE,
502+
new Java.Annotation[0],
503+
canonicalName.split("\\."),
504+
null),
505+
methodInvocation);
506+
}
507+
}
508+
return methodInvocation;
509+
}
510+
471511
private static Java.Rvalue generateOtherFunctionOperation(
472512
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
473513
String operationName = sqlBasicCall.getOperator().getName().toUpperCase();

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
398398
// --------------
399399
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
400400

401+
// ---------------------
402+
// Struct Functions
403+
// ---------------------
404+
// Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
405+
public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
406+
401407
public static final SqlFunction AI_CHAT_PREDICT =
402408
new SqlFunction(
403409
"AI_CHAT_PREDICT",

0 commit comments

Comments
 (0)