Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions flink-cdc-composer/src/test/resources/specs/nested.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[], op=DELETE, meta=()}
- do: Integer Array Subscripting
ignore: FLINK-38888
projection: |-
id_
array_int_
Expand All @@ -133,7 +132,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: String Array Subscripting
ignore: FLINK-38888
projection: |-
id_
array_string_
Expand All @@ -147,7 +145,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Array OOB Subscripting
ignore: FLINK-38888
projection: |-
id_
array_string_[0] AS negative_overflow
Expand All @@ -161,7 +158,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Array Subscripting With Sub Expression
ignore: FLINK-38888
projection: |-
id_
array_int_[1 + 1] AS int_key
Expand All @@ -175,7 +171,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Filter by Array-related Expression
ignore: FLINK-38888
projection: id_, array_string_
filter: array_string_[3] = '五'
primary-key: id_
Expand All @@ -184,7 +179,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()}
- do: Map Subscripting
ignore: FLINK-38888
projection: |-
id_
map_int_string_
Expand All @@ -198,7 +192,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with absent key
ignore: FLINK-38888
projection: |-
id_
map_int_string_[233] AS map_value
Expand All @@ -211,7 +204,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with complex expression
ignore: FLINK-38888
projection: |-
id_
map_int_string_[1 + 2] AS map_value
Expand All @@ -224,7 +216,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting with nested expressions
ignore: FLINK-38888
projection: |-
id_
map_string_array_string_[lower('ONE')] AS index_1
Expand All @@ -238,7 +229,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Map Subscripting With Absent Object
ignore: FLINK-38888
projection: |-
id_
map_string_array_string_['foo'] AS index_1
Expand All @@ -252,15 +242,13 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
- do: Filter by Map-related Expression
ignore: FLINK-38888
projection: id_, map_int_string_
filter: map_int_string_[1] = 'one'
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()}
- do: Record Subscripting With Index
ignore: FLINK-38888
projection: |-
id_
complex_row_
Expand All @@ -275,23 +263,20 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], after=[], op=DELETE, meta=()}
- do: Record Subscripting With Invalid Index
ignore: FLINK-38888
projection: |-
id_
complex_row_
complex_row_[0] AS bad_key
primary-key: id_
expect-error: 'Cannot infer type of field at position 0 within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Record Subscripting With Computed Index (Illegal, type must be statically determined)
ignore: FLINK-38888
projection: |-
id_
complex_row_
complex_row_[1 + 1] AS bad_key
primary-key: id_
expect-error: 'Cannot infer type of field at position null within ROW type: RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Filter by Record-related Expression
ignore: FLINK-38888
projection: id_, complex_row_
filter: complex_row_[1] = 'Derrida'
primary-key: id_
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.functions.impl;

import java.util.List;
import java.util.Map;

/** Struct built-in functions. */
public class StructFunctions {

/**
* Accesses an element from an ARRAY or MAP by index or key.
*
* <p>For ARRAY: Uses 1-based index (SQL standard). array[1] returns the first element.
*
* <p>For MAP: Uses key to access the value. map['key'] returns the value for 'key'.
*/
public static Object itemAccess(Object collection, Object indexOrKey) {
if (collection == null || indexOrKey == null) {
return null;
}

Object result;
if (collection instanceof List) {
result = arrayElement((List) collection, indexOrKey);
} else if (collection instanceof Map) {
result = mapValue((Map<?, ?>) collection, indexOrKey);
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
} else {
throw new IllegalArgumentException(
"itemAccess only supports List or Map, but got: "
+ collection.getClass().getName());
}
return result;
}

/**
* Gets an element from an Object array by index (1-based, SQL standard). This overload handles
* arrays that have been converted from ArrayData to Object[] by DataTypeConverter.
*
* @param array the Object array to access
* @param index the 1-based index
* @return the element at the specified index, or null if index is out of bounds
*/
public static Object arrayElement(List array, Object index) {
if (array == null || index == null) {
return null;
}

int idx;
if (index instanceof Number) {
idx = ((Number) index).intValue();
} else {
idx = Integer.parseInt(index.toString());
}

// Convert 1-based index to 0-based (SQL standard uses 1-based indexing)
int zeroBasedIndex = idx - 1;

// Check bounds
if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
return null;
}

return array.get(zeroBasedIndex);
}

/**
* Gets a value from a Map by key.
*
* @param map the Map to access
* @param key the key to look up
* @return the value for the specified key, or null if not found
*/
public static Object mapValue(Map<?, ?> map, Object key) {
Comment thread
yuxiqian marked this conversation as resolved.
Outdated
if (map == null || key == null) {
return null;
}

return map.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class JaninoCompiler {
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

private static final String[] BUILTIN_FUNCTION_MODULES = {
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal"
"Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct", "Temporal"
};

@VisibleForTesting
Expand Down Expand Up @@ -306,6 +306,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue(
return generateTimestampAddOperation(context, sqlBasicCall, atoms);
case OTHER:
return generateOtherOperation(context, sqlBasicCall, atoms);
case ITEM:
return generateItemAccessOperation(context, sqlBasicCall, atoms);
default:
throw new ParseException("Unrecognized expression: " + sqlBasicCall);
}
Expand Down Expand Up @@ -468,6 +470,37 @@ private static Java.Rvalue generateOtherOperation(
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
}

private static Java.Rvalue generateItemAccessOperation(
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 2) {
throw new ParseException("Unrecognized item access expression: " + sqlBasicCall);
}
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
Java.Rvalue methodInvocation =
new Java.MethodInvocation(Location.NOWHERE, null, "itemAccess", atoms);

// Deduce the return type and add a cast to ensure proper type conversion
DataType resultType =
TransformParser.deduceSubExpressionType(
context.columns,
sqlBasicCall,
context.udfDescriptors,
context.supportedMetadataColumns);

// Get the Java class for the result type and add a cast
Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
if (javaClass != null && javaClass != Object.class) {
return new Java.Cast(
Location.NOWHERE,
new Java.ReferenceType(
Location.NOWHERE,
new Java.Annotation[0],
javaClass.getName().split("\\."),
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
null),
methodInvocation);
}
return methodInvocation;
}

private static Java.Rvalue generateOtherFunctionOperation(
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
String operationName = sqlBasicCall.getOperator().getName().toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
public class TransformParser {
private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class);
private static final String DEFAULT_SCHEMA = "default_schema";
private static final String DEFAULT_TABLE = "TB";
public static final String DEFAULT_TABLE = "TB";
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
private static final String MAPPED_COLUMN_NAME_PREFIX = "$";
private static final String MAPPED_SINGLE_COLUMN_NAME = MAPPED_COLUMN_NAME_PREFIX + "0";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;

// ---------------------
// Collection Functions
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
// ---------------------
// Supports array[index] and map[key] syntax
Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
Comment thread
Hisoka-X marked this conversation as resolved.

public static final SqlFunction AI_CHAT_PREDICT =
new SqlFunction(
"AI_CHAT_PREDICT",
Expand Down
Loading
Loading