Skip to content

Commit 3dde255

Browse files
authored
[FLINK-37466][table-api-java] Make DEFAULT explicit from Table API to planner
This closes #26290.
1 parent 0d0c185 commit 3dde255

File tree

8 files changed

+104
-30
lines changed

8 files changed

+104
-30
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java

+24-15
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
2323
import org.apache.flink.api.common.typeutils.CompositeType;
24-
import org.apache.flink.table.api.DataTypes;
2524
import org.apache.flink.table.api.TableException;
2625
import org.apache.flink.table.api.ValidationException;
2726
import org.apache.flink.table.catalog.DataTypeFactory;
@@ -51,7 +50,6 @@
5150
import org.apache.flink.table.types.inference.TypeStrategies;
5251
import org.apache.flink.table.types.logical.LogicalType;
5352
import org.apache.flink.table.types.utils.DataTypeUtils;
54-
import org.apache.flink.util.Preconditions;
5553

5654
import javax.annotation.Nullable;
5755

@@ -68,6 +66,7 @@
6866

6967
import static java.util.Collections.singletonList;
7068
import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunction;
69+
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
7170
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
7271
import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
7372
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasLegacyTypes;
@@ -302,8 +301,18 @@ private UnresolvedCallExpression executeAssignment(
302301
declaredArgs.forEach(
303302
declaredArg -> {
304303
if (declaredArg.isOptional()) {
304+
// All optional arguments have a type.
305+
// This is checked in StaticArgument.
306+
final DataType dataType =
307+
declaredArg
308+
.getDataType()
309+
.orElseThrow(IllegalStateException::new);
305310
namedArgs.putIfAbsent(
306-
declaredArg.getName(), valueLiteral(null, DataTypes.NULL()));
311+
declaredArg.getName(),
312+
CallExpression.permanent(
313+
BuiltInFunctionDefinitions.DEFAULT,
314+
List.of(),
315+
dataType));
307316
}
308317
});
309318

@@ -490,32 +499,32 @@ public boolean isArgumentLiteral(int pos) {
490499

491500
@Override
492501
public boolean isArgumentNull(int pos) {
493-
Preconditions.checkArgument(
494-
isArgumentLiteral(pos), "Argument at position %s is not a literal.", pos);
495502
final ResolvedExpression arg = getArgument(pos);
496-
// special case for type literals in Table API only
497-
if (arg instanceof TypeLiteralExpression) {
498-
return false;
503+
if (isFunction(arg, BuiltInFunctionDefinitions.DEFAULT)) {
504+
return true;
505+
}
506+
if (arg instanceof ValueLiteralExpression) {
507+
final ValueLiteralExpression literal = (ValueLiteralExpression) arg;
508+
return literal.isNull();
499509
}
500-
final ValueLiteralExpression literal = (ValueLiteralExpression) getArgument(pos);
501-
return literal.isNull();
510+
return false;
502511
}
503512

504513
@Override
505514
@SuppressWarnings("unchecked")
506515
public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
507-
Preconditions.checkArgument(
508-
isArgumentLiteral(pos), "Argument at position %s is not a literal.", pos);
509516
final ResolvedExpression arg = getArgument(pos);
510-
// special case for type literals in Table API only
511517
if (arg instanceof TypeLiteralExpression) {
512518
if (!DataType.class.isAssignableFrom(clazz)) {
513519
return Optional.empty();
514520
}
515521
return Optional.of((T) arg.getOutputDataType());
516522
}
517-
final ValueLiteralExpression literal = (ValueLiteralExpression) getArgument(pos);
518-
return literal.getValueAs(clazz);
523+
if (arg instanceof ValueLiteralExpression) {
524+
final ValueLiteralExpression literal = (ValueLiteralExpression) arg;
525+
return literal.getValueAs(clazz);
526+
}
527+
return Optional.empty();
519528
}
520529

521530
@Override

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -2901,13 +2901,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
29012901
BuiltInFunctionDefinition.newBuilder()
29022902
.name("ASSIGNMENT")
29032903
.kind(OTHER)
2904-
.inputTypeStrategy(
2905-
sequence(
2906-
and(
2907-
InputTypeStrategies.LITERAL,
2908-
logical(LogicalTypeFamily.CHARACTER_STRING)),
2909-
or(OUTPUT_IF_NULL, InputTypeStrategies.ANY)))
2910-
.outputTypeStrategy(TypeStrategies.argument(1))
2904+
.outputTypeStrategy(TypeStrategies.MISSING)
2905+
.build();
2906+
2907+
public static final BuiltInFunctionDefinition DEFAULT =
2908+
BuiltInFunctionDefinition.newBuilder()
2909+
.name("DEFAULT")
2910+
.callSyntax(SqlCallSyntax.NO_PARENTHESIS)
2911+
.kind(SCALAR)
2912+
.inputTypeStrategy(NO_ARGS)
2913+
.outputTypeStrategy(TypeStrategies.MISSING)
29112914
.build();
29122915

29132916
public static final BuiltInFunctionDefinition STREAM_RECORD_TIMESTAMP =

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private void checkOptionalType() {
261261
}
262262
// e.g. for untyped table arguments
263263
if (dataType == null) {
264-
return;
264+
throw new ValidationException("Untyped table arguments must not be optional.");
265265
}
266266

267267
final LogicalType type = dataType.getLogicalType();

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ private static Stream<TestSpec> functionSpecs() {
667667
StaticArgument.table(
668668
"rowTable",
669669
Row.class,
670-
true,
670+
false,
671671
EnumSet.of(StaticArgumentTrait.TABLE_AS_ROW)))
672672
.expectStaticArgument(StaticArgument.scalar("s", DataTypes.STRING(), true))
673673
.expectState("s1", TypeStrategies.explicit(MyFirstState.TYPE))
@@ -693,7 +693,7 @@ private static Stream<TestSpec> functionSpecs() {
693693
StaticArgument.table(
694694
"rowTable",
695695
Row.class,
696-
true,
696+
false,
697697
EnumSet.of(StaticArgumentTrait.TABLE_AS_ROW)))
698698
.expectStaticArgument(StaticArgument.scalar("s", DataTypes.STRING(), true))
699699
.expectState("s1", TypeStrategies.explicit(MyFirstState.TYPE))
@@ -2328,8 +2328,7 @@ public void eval(
23282328
@ArgumentHint(name = "i") Integer i,
23292329
@ArgumentHint(
23302330
value = {ArgumentTrait.TABLE_AS_ROW},
2331-
name = "rowTable",
2332-
isOptional = true)
2331+
name = "rowTable")
23332332
Row t2,
23342333
@ArgumentHint(isOptional = true, name = "s") String s) {}
23352334
}
@@ -2347,8 +2346,7 @@ public void eval(
23472346
@ArgumentHint(name = "i", type = @DataTypeHint("INT")),
23482347
@ArgumentHint(
23492348
name = "rowTable",
2350-
value = {ArgumentTrait.TABLE_AS_ROW},
2351-
isOptional = true),
2349+
value = {ArgumentTrait.TABLE_AS_ROW}),
23522350
@ArgumentHint(name = "s", isOptional = true, type = @DataTypeHint("STRING"))
23532351
},
23542352
output = @DataTypeHint("ROW<b BOOLEAN>"))

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class CustomizedConverters {
5959
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_QUERY, new JsonQueryConverter());
6060
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_OBJECT, new JsonObjectConverter());
6161
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_ARRAY, new JsonArrayConverter());
62+
CONVERTERS.put(BuiltInFunctionDefinitions.DEFAULT, new DefaultConverter());
6263
CONVERTERS.put(InternalFunctionDefinitions.THROW_EXCEPTION, new ThrowExceptionConverter());
6364
}
6465

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.expressions.converter.converters;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.expressions.CallExpression;
23+
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
24+
import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
25+
import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
26+
import org.apache.flink.table.types.logical.LogicalType;
27+
28+
import org.apache.calcite.rel.type.RelDataType;
29+
import org.apache.calcite.rex.RexNode;
30+
31+
/** Conversion for {@link BuiltInFunctionDefinitions#DEFAULT}. */
32+
@Internal
33+
class DefaultConverter extends CustomizedConverter {
34+
35+
@Override
36+
public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) {
37+
final LogicalType outputType = call.getOutputDataType().getLogicalType();
38+
final RelDataType relDataType =
39+
context.getTypeFactory().createFieldTypeFromLogicalType(outputType);
40+
return context.getRelBuilder()
41+
.getRexBuilder()
42+
.makeCall(new SqlDefaultArgOperator(relDataType));
43+
}
44+
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java

+8
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,14 @@ public void eval(Context ctx, @ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row
534534
}
535535
}
536536

537+
/** Testing function. */
538+
public static class OptionalFunction extends TestProcessTableFunctionBase {
539+
public void eval(
540+
Context ctx, @ArgumentHint(value = TABLE_AS_ROW, isOptional = true) Row r) {
541+
collectObjects(r);
542+
}
543+
}
544+
537545
// --------------------------------------------------------------------------------------------
538546
// Helpers
539547
// --------------------------------------------------------------------------------------------

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,12 @@ private static Stream<ErrorSpec> errorSpecs() {
342342
"SELECT * FROM f(r => TABLE t_watermarked, i => 1, on_time => DESCRIPTOR(ts, INVALID))",
343343
"Invalid time attribute declaration. Each column in the `on_time` argument must "
344344
+ "reference at least one column in one of the table arguments. "
345-
+ "Unknown references: [INVALID]"));
345+
+ "Unknown references: [INVALID]"),
346+
ErrorSpec.of(
347+
"invalid optional table argument",
348+
OptionalUntypedTable.class,
349+
"SELECT * FROM f()",
350+
"Untyped table arguments must not be optional."));
346351
}
347352

348353
/** Testing function. */
@@ -392,6 +397,12 @@ public void eval(
392397
@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES, PASS_COLUMNS_THROUGH}) Row r) {}
393398
}
394399

400+
/** Testing function. */
401+
public static class OptionalUntypedTable extends ProcessTableFunction<String> {
402+
@SuppressWarnings("unused")
403+
public void eval(@ArgumentHint(value = TABLE_AS_ROW, isOptional = true) Row r) {}
404+
}
405+
395406
private static class ErrorSpec {
396407
private final String description;
397408
private final Class<? extends UserDefinedFunction> functionClass;

0 commit comments

Comments
 (0)