Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any valid expression `<EXP
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string is invalid, an error will be thrown. To return NULL instead of an error, use the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. |
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. |

## Struct Functions

| Function | Janino Code | Description |
| -------- | ----------- | ----------- |
| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL 标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为 NULL,则返回 NULL。 |
| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1 开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |

# 示例
## 添加计算列
表达式可以用来生成新的列。例如,如果我们想基于表 `web_order` 在数据库 `mydb` 中添加两个计算列,我们可以定义一个转换规则如下:
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any valid expression `<EXP
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string is invalid, an error will be thrown. To return NULL instead of an error, use the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will throw an error. The default value of `allowDuplicateKeys` is false. |
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the JSON string is invalid, return NULL. To throw an error instead of returning NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the input JSON string, when `allowDuplicateKeys` is true, the parser will keep the last occurrence of all fields with the same key, otherwise it will return NULL. The default value of `allowDuplicateKeys` is false. |

## Struct Functions

Struct functions are used to access elements in ARRAY, MAP and ROW types using subscript syntax.

| Function | Janino Code | Description |
| -------- | ----------- | ----------- |
| array[index] | itemAccess(array, index) | Returns the element at position `index` in the array. Index is 1-based (SQL standard). Returns NULL if the index is out of bounds or if the array is NULL. |
| map[key] | itemAccess(map, key) | Returns the value associated with `key` in the map. Returns NULL if the key does not exist or if the map is NULL. |
| row[index] | itemAccess(row, index) | Returns the field at position `index` in the row. Index is 1-based. The index must be a constant (not a computed expression) since the return type must be statically determined. |

# Example
## Add computed columns
Evaluation expressions can be used to generate new columns. For example, if we want to append two computed columns based on the table `web_order` in the database `mydb`, we may define a transform rule as follows:
Expand Down
15 changes: 0 additions & 15 deletions flink-cdc-composer/src/test/resources/specs/nested.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[], op=DELETE, meta=()}
- do: Integer Array Subscripting
ignore: FLINK-38888
projection: |-
id_
array_int_
Expand All @@ -133,7 +132,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: String Array Subscripting
ignore: FLINK-38888
projection: |-
id_
array_string_
Expand All @@ -147,7 +145,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Array OOB Subscripting
ignore: FLINK-38888
projection: |-
id_
array_string_[0] AS negative_overflow
Expand All @@ -161,7 +158,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Array Subscripting With Sub Expression
ignore: FLINK-38888
projection: |-
id_
array_int_[1 + 1] AS int_key
Expand All @@ -175,7 +171,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Filter by Array-related Expression
ignore: FLINK-38888
projection: id_, array_string_
filter: array_string_[3] = '五'
primary-key: id_
Expand All @@ -184,7 +179,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()}
- do: Map Subscripting
ignore: FLINK-38888
projection: |-
id_
map_int_string_
Expand All @@ -198,7 +192,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with absent key
ignore: FLINK-38888
projection: |-
id_
map_int_string_[233] AS map_value
Expand All @@ -211,7 +204,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with complex expression
ignore: FLINK-38888
projection: |-
id_
map_int_string_[1 + 2] AS map_value
Expand All @@ -224,7 +216,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with nested expressions
ignore: FLINK-38888
projection: |-
id_
map_string_array_string_[lower('ONE')] AS index_1
Expand All @@ -238,7 +229,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting With Absent Object
ignore: FLINK-38888
projection: |-
id_
map_string_array_string_['foo'] AS index_1
Expand All @@ -252,15 +242,13 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Filter by Map-related Expression
ignore: FLINK-38888
projection: id_, map_int_string_
filter: map_int_string_[1] = 'one'
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()}
- do: Record Subscripting With Index
ignore: FLINK-38888
projection: |-
id_
complex_row_
Expand All @@ -275,23 +263,20 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], after=[], op=DELETE, meta=()}
- do: Record Subscripting With Invalid Index
ignore: FLINK-38888
projection: |-
id_
complex_row_
complex_row_[0] AS bad_key
primary-key: id_
expect-error: 'Cannot infer type of field at position 0 within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Record Subscripting With Computed Index (Illegal, type must be statically determined)
ignore: FLINK-38888
projection: |-
id_
complex_row_
complex_row_[1 + 1] AS bad_key
primary-key: id_
expect-error: 'Cannot infer type of field at position null within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Filter by Record-related Expression
ignore: FLINK-38888
projection: id_, complex_row_
filter: complex_row_[1] = 'Derrida'
primary-key: id_
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.functions.impl;

import java.util.List;
import java.util.Map;

/**
* Built-in functions for collection and struct data types.
*
* <p>These functions support accessing elements from collections (ARRAY, MAP) and structured data
* types (ROW).
*/
public class StructFunctions {

/**
* Accesses an element from an ARRAY by index (1-based, SQL standard).
*
* <p>array[1] returns the first element.
*
* @param <T> the element type of the array
* @param array the array to access
* @param index the 1-based index
* @return the element at the specified index, or null if index is out of bounds
*/
public static <T> T itemAccess(List<T> array, Integer index) {
if (array == null || index == null) {
return null;
}
// Convert 1-based index to 0-based (SQL standard uses 1-based indexing)
int zeroBasedIndex = index - 1;
if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
return null;
}
return array.get(zeroBasedIndex);
}

/**
* Accesses a value from a MAP by key.
*
* <p>map['key'] returns the value for 'key'.
*
* @param <K> the key type of the map
* @param <V> the value type of the map
* @param map the map to access
* @param key the key to look up
* @return the value for the specified key, or null if not found
*/
public static <K, V> V itemAccess(Map<K, V> map, K key) {
if (map == null || key == null) {
return null;
}
return map.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;

Expand Down Expand Up @@ -89,7 +90,7 @@ public class JaninoCompiler {
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

private static final String[] BUILTIN_FUNCTION_MODULES = {
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal"
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct", "Temporal"
};

@VisibleForTesting
Expand Down Expand Up @@ -306,6 +307,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue(
return generateTimestampAddOperation(context, sqlBasicCall, atoms);
case OTHER:
return generateOtherOperation(context, sqlBasicCall, atoms);
case ITEM:
return generateItemAccessOperation(context, sqlBasicCall, atoms);
default:
throw new ParseException("Unrecognized expression: " + sqlBasicCall);
}
Expand Down Expand Up @@ -468,6 +471,43 @@ private static Java.Rvalue generateOtherOperation(
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
}

private static Java.Rvalue generateItemAccessOperation(
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
Preconditions.checkArgument(
atoms.length == 2,
"Expecting item accessing call %s to have 2 operands, got %s actually",
sqlBasicCall,
List.of(atoms));
Java.Rvalue methodInvocation =
new Java.MethodInvocation(Location.NOWHERE, null, "itemAccess", atoms);

// Deduce the return type and add a cast to ensure proper type conversion
DataType resultType =
TransformParser.deduceSubExpressionType(
context.columns,
sqlBasicCall,
context.udfDescriptors,
context.supportedMetadataColumns);

// Get the Java class for the result type and add a cast
// Use getCanonicalName() to correctly handle array types (e.g., byte[] instead of "[B")
Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
if (javaClass != null && javaClass != Object.class) {
String canonicalName = javaClass.getCanonicalName();
if (canonicalName != null) {
return new Java.Cast(
Location.NOWHERE,
new Java.ReferenceType(
Location.NOWHERE,
new Java.Annotation[0],
canonicalName.split("\\."),
null),
methodInvocation);
}
}
return methodInvocation;
}

private static Java.Rvalue generateOtherFunctionOperation(
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
String operationName = sqlBasicCall.getOperator().getName().toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;

// ---------------------
// Struct Functions
// ---------------------
// Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
Comment thread
Hisoka-X marked this conversation as resolved.

public static final SqlFunction AI_CHAT_PREDICT =
new SqlFunction(
"AI_CHAT_PREDICT",
Expand Down
Loading
Loading