From 3096b17a4efc5909561f6104bf2d39d96b530c50 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 11 Apr 2025 16:03:01 +0800 Subject: [PATCH 1/5] [Feature][transform-v2] sql transform support multi_if function --- docs/en/transform-v2/sql-functions.md | 10 ++ docs/zh/transform-v2/sql-functions.md | 9 ++ .../seatunnel/e2e/transform/TestSQLIT.java | 19 +--- .../sql_transform/func_multi_if.conf | 100 ++++++++++++++++++ .../transform/sql/zeta/ZetaSQLFunction.java | 55 +++++++++- .../transform/sql/zeta/ZetaSQLType.java | 29 +++++ 6 files changed, 204 insertions(+), 18 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf diff --git a/docs/en/transform-v2/sql-functions.md b/docs/en/transform-v2/sql-functions.md index b406d0d3d3a..d33b6de8c79 100644 --- a/docs/en/transform-v2/sql-functions.md +++ b/docs/en/transform-v2/sql-functions.md @@ -959,6 +959,16 @@ Example: NULLIF(A, B) + +### MULTI_IF +```MULTI_IF(condition1, value1, condition2, value2, ... conditionN, valueN, bValue)``` + +returns the first value for which the corresponding condition is true. If all conditions are false, it returns the last value. + +Example: + +MULTI_IF(A > 1, 'A', B > 1, 'B', C > 1, 'C', 'D') + ### CASE WHEN ``` diff --git a/docs/zh/transform-v2/sql-functions.md b/docs/zh/transform-v2/sql-functions.md index 02e3dc8e7df..9bc6752b2f9 100644 --- a/docs/zh/transform-v2/sql-functions.md +++ b/docs/zh/transform-v2/sql-functions.md @@ -952,6 +952,15 @@ IFNULL(A, B) NULLIF(A, B) +### MULTI_IF +```MULTI_IF(condition1, value1, condition2, value2, ... conditionN, valueN, bValue)``` + +返回第一个满足相应条件的值。如果所有条件均为假,则返回最后一个值。 + +示例: + +MULTI_IF(A > 1, 'A', B > 1, 'B', C > 1, 'C', 'D') + ### CASE WHEN ``` diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index d47a5c99726..ce5cfbaec4e 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -64,23 +64,8 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr container.executeJob("/sql_transform/explode_transform.conf"); Assertions.assertEquals(0, execResultBySql.getExitCode()); - Container.ExecResult execResultBySqlWithoutOuter = - container.executeJob("/sql_transform/explode_transform_without_outer.conf"); - Assertions.assertEquals(0, execResultBySqlWithoutOuter.getExitCode()); - - Container.ExecResult execResultBySqlWithOuter = - container.executeJob("/sql_transform/explode_transform_with_outer.conf"); - Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode()); - - Container.ExecResult arraySql = container.executeJob("/sql_transform/func_array.conf"); - Assertions.assertEquals(0, arraySql.getExitCode()); - - Container.ExecResult splitSql = container.executeJob("/sql_transform/func_split.conf"); - Assertions.assertEquals(0, splitSql.getExitCode()); - - Container.ExecResult maxMinSql = - container.executeJob("/sql_transform/func_array_max_min.conf"); - Assertions.assertEquals(0, maxMinSql.getExitCode()); + Container.ExecResult multiIfSql = container.executeJob("/sql_transform/func_multi_if.conf"); + Assertions.assertEquals(0, multiIfSql.getExitCode()); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf new file mode 100644 index 00000000000..33009c2b0b5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf @@ -0,0 +1,100 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 10000 +} + +source { + FakeSource { + plugin_output = "fake" + schema = { + fields { + id = "int" + age = "int" + score = "double" + name = "string" + } + } + rows = [ + {fields = [1, 15, 85.5, "Alice"], kind = INSERT} + ] + } +} + +transform { + Sql { + plugin_input = "fake" + plugin_output = "fake1" + query = """ + SELECT + id, + age, + score, + name, + MULTI_IF(age < 18, 'Minor', age < 30, 'Young Adult', age < 40, 'Adult', 'Senior') as age_category, + MULTI_IF(score >= 90, 'A', score >= 80, 'B', score >= 70, 'C', score >= 60, 'D', 'F') as grade + FROM fake + """ + } +} + +sink { + Assert { + plugin_input = "fake1" + rules = { + row_rules = [ + { + rule_type = "MIN_ROW" + rule_value = 1 + }, + { + rule_type = "MAX_ROW" + rule_value = 1 + } + ], + field_rules = [ + { + field_name = "id" + field_type = "int" + field_value = [ + {equals_to = 1} + ] + }, + { + field_name = "age_category" + field_type = "string" + field_value = [ + {equals_to = "Minor"} + ] + }, + { + field_name = "grade" + field_type = "string" + field_value = [ + {equals_to = "B"} + ] + } + ] + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java index 4787ada1a48..19f252daea7 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java @@ -190,6 +190,7 @@ public class ZetaSQLFunction { public static final String COALESCE = "COALESCE"; public static final String IFNULL = "IFNULL"; public static final String NULLIF = "NULLIF"; + public static final String MULTI_IF = "MULTI_IF"; public static final String UUID = "UUID"; @@ -313,6 +314,14 @@ public Object computeForValue(Expression expression, Object[] inputFields) { } if (expression instanceof Function) { Function function = (Function) expression; + String functionName = function.getName(); + + // Special handling for MULTI_IF to properly evaluate comparison expressions + if (MULTI_IF.equalsIgnoreCase(functionName)) { + return multiIfFunction(function, inputFields); + } + + // Standard handling for other functions ExpressionList expressionList = (ExpressionList) function.getParameters(); List functionArgs = new ArrayList<>(); @@ -321,7 +330,7 @@ public Object computeForValue(Expression expression, Object[] inputFields) { functionArgs.add(computeForValue(funcArgExpression, inputFields)); } } - return executeFunctionExpr(function.getName(), functionArgs); + return executeFunctionExpr(functionName, functionArgs); } if (expression instanceof TimeKeyExpression) { return executeTimeKeyExpr(((TimeKeyExpression) expression).getStringValue()); @@ -559,6 +568,7 @@ public Object executeFunctionExpr(String functionName, List args) { return SystemFunction.ifnull(args); case NULLIF: return SystemFunction.nullif(args); + // MULTI_IF is now handled directly in computeForValue case ARRAY: return ArrayFunction.array(args); case ARRAY_MAX: @@ -897,4 +907,47 @@ public SeaTunnelRowType lateralViewMapping( } return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); } + + private Object multiIfFunction(Function function, Object[] inputFields) { + ExpressionList expressionList = + (ExpressionList) function.getParameters(); + if (expressionList == null + || expressionList.getExpressions() == null + || expressionList.getExpressions().isEmpty()) { + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "MULTI_IF function requires parameters"); + } + + List expressions = expressionList.getExpressions(); + if (expressions.size() < 3 || expressions.size() % 2 == 0) { + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format( + "MULTI_IF function requires at least 3 arguments and an odd number of arguments: %s", + function)); + } + + // Process pairs of condition-result with special handling for comparison expressions + for (int i = 0; i < expressions.size() - 1; i += 2) { + Expression conditionExpr = expressions.get(i); + Object conditionResult; + + // Special handling for comparison expressions + if (conditionExpr instanceof BinaryExpression + && zetaSQLFilter.isConditionExpr(conditionExpr)) { + conditionResult = zetaSQLFilter.executeFilter(conditionExpr, inputFields); + } else { + conditionResult = computeForValue(conditionExpr, inputFields); + } + + if (conditionResult instanceof Boolean && (Boolean) conditionResult) { + // Condition is true, evaluate and return the corresponding result + return computeForValue(expressions.get(i + 1), inputFields); + } + } + + // No condition was true, evaluate and return the default value (last argument) + return computeForValue(expressions.get(expressions.size() - 1), inputFields); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java index 5d5a26bf40b..56beb9d75d8 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java @@ -486,6 +486,35 @@ private SeaTunnelDataType getFunctionType(Function function) { case ZetaSQLFunction.IFNULL: // Result has the same type as first argument return getExpressionType(function.getParameters().getExpressions().get(0)); + case ZetaSQLFunction.MULTI_IF: + ExpressionList multiIfExpressionList = function.getParameters(); + if (multiIfExpressionList == null) { + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "MULTI_IF function requires parameters"); + } + + List multiIfExpressions = multiIfExpressionList.getExpressions(); + if (multiIfExpressions == null || multiIfExpressions.isEmpty()) { + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "MULTI_IF function requires parameters"); + } + + if (multiIfExpressions.size() < 3 || multiIfExpressions.size() % 2 == 0) { + throw new TransformException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format( + "MULTI_IF function requires at least 3 arguments and an odd number of arguments")); + } + + List> resultTypes = new ArrayList<>(); + for (int i = 1; i < multiIfExpressions.size() - 1; i += 2) { + resultTypes.add(getExpressionType(multiIfExpressions.get(i))); + } + resultTypes.add( + getExpressionType(multiIfExpressions.get(multiIfExpressions.size() - 1))); + return getMaxType(resultTypes); case ZetaSQLFunction.MOD: // Result has the same type as second argument return getExpressionType(function.getParameters().getExpressions().get(1)); From 696f73ca8c9dd95941d39eb699555dd6e2a3257e Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 11 Apr 2025 16:05:48 +0800 Subject: [PATCH 2/5] [Feature][transform-v2] sql transform support multi_if function --- .../seatunnel/e2e/transform/TestSQLIT.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index ce5cfbaec4e..cb588d0aef7 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -64,6 +64,24 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr container.executeJob("/sql_transform/explode_transform.conf"); Assertions.assertEquals(0, execResultBySql.getExitCode()); + Container.ExecResult execResultBySqlWithoutOuter = + container.executeJob("/sql_transform/explode_transform_without_outer.conf"); + Assertions.assertEquals(0, execResultBySqlWithoutOuter.getExitCode()); + + Container.ExecResult execResultBySqlWithOuter = + container.executeJob("/sql_transform/explode_transform_with_outer.conf"); + Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode()); + + Container.ExecResult arraySql = container.executeJob("/sql_transform/func_array.conf"); + Assertions.assertEquals(0, arraySql.getExitCode()); + + Container.ExecResult splitSql = container.executeJob("/sql_transform/func_split.conf"); + Assertions.assertEquals(0, splitSql.getExitCode()); + + Container.ExecResult maxMinSql = + container.executeJob("/sql_transform/func_array_max_min.conf"); + Assertions.assertEquals(0, maxMinSql.getExitCode()); + Container.ExecResult multiIfSql = container.executeJob("/sql_transform/func_multi_if.conf"); Assertions.assertEquals(0, multiIfSql.getExitCode()); } From e452320fb4057298bbe535a4137e5ad21ee70958 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 11 Apr 2025 16:30:37 +0800 Subject: [PATCH 3/5] [Feature][transform-v2] sql transform support multi_if function --- .../org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java index 19f252daea7..664a72e3bbe 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java @@ -568,7 +568,6 @@ public Object executeFunctionExpr(String functionName, List args) { return SystemFunction.ifnull(args); case NULLIF: return SystemFunction.nullif(args); - // MULTI_IF is now handled directly in computeForValue case ARRAY: return ArrayFunction.array(args); case ARRAY_MAX: From a159aac49616125dcaf9a7fbcd22a78d5e46979d Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Mon, 14 Apr 2025 09:44:09 +0800 Subject: [PATCH 4/5] [Feature][transform-v2] sql transform support multi_if function --- .../test/resources/sql_transform/func_multi_if.conf | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf index 33009c2b0b5..20a9e33fefd 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf @@ -52,7 +52,8 @@ transform { score, name, MULTI_IF(age < 18, 'Minor', age < 30, 'Young Adult', age < 40, 'Adult', 'Senior') as age_category, - MULTI_IF(score >= 90, 'A', score >= 80, 'B', score >= 70, 'C', score >= 60, 'D', 'F') as grade + MULTI_IF(score >= 90, 'A', score >= 80, 'B', score >= 70, 'C', score >= 60, 'D', 'F') as grade, + MULTI_IF(score >= 90, 'excellent', 'pass') as grade_category, FROM fake """ } @@ -93,6 +94,13 @@ sink { field_value = [ {equals_to = "B"} ] + }, + { + field_name = "grade_category" + field_type = "string" + field_value = [ + {equals_to = "pass"} + ] } ] } From 470cbb7cfad28b5b8301795977c67e5c304b79cd Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Mon, 14 Apr 2025 14:25:07 +0800 Subject: [PATCH 5/5] [Feature][transform-v2] sql transform support multi_if function --- .../src/test/resources/sql_transform/func_multi_if.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf index 20a9e33fefd..a32987de689 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_multi_if.conf @@ -53,7 +53,7 @@ transform { name, MULTI_IF(age < 18, 'Minor', age < 30, 'Young Adult', age < 40, 'Adult', 'Senior') as age_category, MULTI_IF(score >= 90, 'A', score >= 80, 'B', score >= 70, 'C', score >= 60, 'D', 'F') as grade, - MULTI_IF(score >= 90, 'excellent', 'pass') as grade_category, + MULTI_IF(score >= 90, 'excellent', 'pass') as grade_category FROM fake """ }