Skip to content

Commit 86424d3

Browse files
Hisoka-XThorneANN
authored andcommitted
[FLINK-37586][udf] Add support for options in user-defined functions and update related documentation (apache#4252)
1 parent 997eb15 commit 86424d3

11 files changed

Lines changed: 379 additions & 8 deletions

File tree

docs/content.zh/docs/core-concept/transform.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,51 @@ pipeline:
412412

413413
注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过 `flink-cdc.sh --jar` 选项传递。
414414

415+
### UDF 配置选项
416+
417+
你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过 `UserDefinedFunctionContext.configuration()` 获取:
418+
419+
```yaml
420+
pipeline:
421+
user-defined-function:
422+
- name: query_redis
423+
classpath: com.example.flink.cdc.udf.RedisQueryFunction
424+
options:
425+
hostname: localhost
426+
port: "6379"
427+
cache.enabled: "true"
428+
```
429+
430+
在你的 UDF 实现中,可以通过定义 `ConfigOption` 实例来访问这些配置选项:
431+
432+
```java
433+
import org.apache.flink.cdc.common.configuration.ConfigOption;
434+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
435+
436+
public class RedisQueryFunction implements UserDefinedFunction {
437+
private static final ConfigOption<String> HOSTNAME =
438+
ConfigOptions.key("hostname").stringType().noDefaultValue();
439+
private static final ConfigOption<Integer> PORT =
440+
ConfigOptions.key("port").intType().defaultValue(6379);
441+
442+
private String hostname;
443+
private int port;
444+
445+
@Override
446+
public void open(UserDefinedFunctionContext context) throws Exception {
447+
hostname = context.configuration().get(HOSTNAME);
448+
port = context.configuration().get(PORT);
449+
// 在这里初始化你的连接...
450+
}
451+
452+
public Object eval(String key) {
453+
// 使用 hostname 和 port 查询 Redis...
454+
}
455+
}
456+
```
457+
458+
`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。
459+
415460
在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样:
416461

417462
```yaml

docs/content/docs/core-concept/transform.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,51 @@ pipeline:
417417

418418
Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.
419419

420+
### UDF Options
421+
422+
You can pass extra options to UDFs by adding an `options` block. These options will be available in the `open` method through `UserDefinedFunctionContext.configuration()`:
423+
424+
```yaml
425+
pipeline:
426+
user-defined-function:
427+
- name: query_redis
428+
classpath: com.example.flink.cdc.udf.RedisQueryFunction
429+
options:
430+
hostname: localhost
431+
port: "6379"
432+
cache.enabled: "true"
433+
```
434+
435+
And in your UDF implementation, you can access these options by defining `ConfigOption` instances:
436+
437+
```java
438+
import org.apache.flink.cdc.common.configuration.ConfigOption;
439+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
440+
441+
public class RedisQueryFunction implements UserDefinedFunction {
442+
private static final ConfigOption<String> HOSTNAME =
443+
ConfigOptions.key("hostname").stringType().noDefaultValue();
444+
private static final ConfigOption<Integer> PORT =
445+
ConfigOptions.key("port").intType().defaultValue(6379);
446+
447+
private String hostname;
448+
private int port;
449+
450+
@Override
451+
public void open(UserDefinedFunctionContext context) throws Exception {
452+
hostname = context.configuration().get(HOSTNAME);
453+
port = context.configuration().get(PORT);
454+
// Initialize your connection here...
455+
}
456+
457+
public Object eval(String key) {
458+
// Query Redis using hostname and port...
459+
}
460+
}
461+
```
462+
463+
The `options` field is optional. If not specified, an empty configuration will be passed to the UDF.
464+
420465
After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:
421466

422467
```yaml

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
9292
private static final String UDF_KEY = "user-defined-function";
9393
private static final String UDF_FUNCTION_NAME_KEY = "name";
9494
private static final String UDF_CLASSPATH_KEY = "classpath";
95+
private static final String UDF_OPTIONS_KEY = "options";
9596

9697
// Model related keys
9798
private static final String MODEL_NAME_KEY = "model-name";
@@ -295,7 +296,7 @@ private UdfDef toUdfDef(JsonNode udfNode) {
295296
"UDF",
296297
udfNode,
297298
Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
298-
Collections.emptyList());
299+
Collections.singletonList(UDF_OPTIONS_KEY));
299300

300301
String functionName =
301302
checkNotNull(
@@ -310,7 +311,15 @@ private UdfDef toUdfDef(JsonNode udfNode) {
310311
UDF_CLASSPATH_KEY)
311312
.asText();
312313

313-
return new UdfDef(functionName, classpath);
314+
Map<String, String> options =
315+
Optional.ofNullable(udfNode.get(UDF_OPTIONS_KEY))
316+
.map(
317+
node ->
318+
mapper.convertValue(
319+
node, new TypeReference<Map<String, String>>() {}))
320+
.orElse(null);
321+
322+
return new UdfDef(functionName, classpath, options);
314323
}
315324

316325
private TransformDef toTransformDef(JsonNode transformNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,15 @@ void testUdfDefinition() throws Exception {
197197
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
198198
}
199199

200+
@Test
201+
void testUdfDefinitionWithOptions() throws Exception {
202+
URL resource =
203+
Resources.getResource("definitions/pipeline-definition-with-udf-options.yaml");
204+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
205+
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
206+
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
207+
}
208+
200209
@Test
201210
void testSchemaEvolutionTypesConfiguration() throws Exception {
202211
testSchemaEvolutionTypesParsing(
@@ -669,4 +678,42 @@ void testParsingFullDefinitionFromString() throws Exception {
669678
ImmutableMap.<String, String>builder()
670679
.put("parallelism", "1")
671680
.build()));
681+
682+
private final PipelineDef pipelineDefWithUdfOptions =
683+
new PipelineDef(
684+
new SourceDef("values", null, new Configuration()),
685+
new SinkDef(
686+
"values",
687+
null,
688+
new Configuration(),
689+
ImmutableSet.of(
690+
DROP_COLUMN,
691+
ALTER_COLUMN_TYPE,
692+
ADD_COLUMN,
693+
CREATE_TABLE,
694+
RENAME_COLUMN)),
695+
Collections.emptyList(),
696+
Collections.singletonList(
697+
new TransformDef(
698+
"mydb.web_order",
699+
"*, query_redis(id) as redis_value",
700+
"id > 0",
701+
null,
702+
null,
703+
null,
704+
null,
705+
null)),
706+
Collections.singletonList(
707+
new UdfDef(
708+
"query_redis",
709+
"org.apache.flink.cdc.udf.examples.java.RedisQueryFunction",
710+
ImmutableMap.<String, String>builder()
711+
.put("hostname", "localhost")
712+
.put("port", "6379")
713+
.put("cache.enabled", "true")
714+
.build())),
715+
Configuration.fromMap(
716+
ImmutableMap.<String, String>builder()
717+
.put("parallelism", "1")
718+
.build()));
672719
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
################################################################################
17+
source:
18+
type: values
19+
20+
sink:
21+
type: values
22+
23+
transform:
24+
- source-table: mydb.web_order
25+
projection: "*, query_redis(id) as redis_value"
26+
filter: id > 0
27+
28+
pipeline:
29+
parallelism: 1
30+
user-defined-function:
31+
- name: query_redis
32+
classpath: org.apache.flink.cdc.udf.examples.java.RedisQueryFunction
33+
options:
34+
hostname: localhost
35+
port: "6379"
36+
cache.enabled: "true"

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.composer.definition;
1919

20+
import java.util.Collections;
21+
import java.util.Map;
2022
import java.util.Objects;
2123

2224
/**
@@ -27,15 +29,22 @@
2729
* <ul>
2830
* <li>name: Static method name of user-defined functions.
2931
* <li>classpath: Fully-qualified class path of package containing given function.
32+
* <li>options: Configuration options for the user-defined function.
3033
* </ul>
3134
*/
3235
public class UdfDef {
3336
private final String name;
3437
private final String classpath;
38+
private final Map<String, String> options;
3539

3640
public UdfDef(String name, String classpath) {
41+
this(name, classpath, Collections.emptyMap());
42+
}
43+
44+
public UdfDef(String name, String classpath, Map<String, String> options) {
3745
this.name = name;
3846
this.classpath = classpath;
47+
this.options = options != null ? options : Collections.emptyMap();
3948
}
4049

4150
public String getName() {
@@ -46,6 +55,10 @@ public String getClasspath() {
4655
return classpath;
4756
}
4857

58+
public Map<String, String> getOptions() {
59+
return options;
60+
}
61+
4962
@Override
5063
public boolean equals(Object o) {
5164
if (this == o) {
@@ -56,16 +69,27 @@ public boolean equals(Object o) {
5669
}
5770

5871
UdfDef udfDef = (UdfDef) o;
59-
return Objects.equals(name, udfDef.name) && Objects.equals(classpath, udfDef.classpath);
72+
return Objects.equals(name, udfDef.name)
73+
&& Objects.equals(classpath, udfDef.classpath)
74+
&& Objects.equals(options, udfDef.options);
6075
}
6176

6277
@Override
6378
public int hashCode() {
64-
return Objects.hash(name, classpath);
79+
return Objects.hash(name, classpath, options);
6580
}
6681

6782
@Override
6883
public String toString() {
69-
return "UdfDef{" + "name='" + name + '\'' + ", classpath='" + classpath + '\'' + '}';
84+
return "UdfDef{"
85+
+ "name='"
86+
+ name
87+
+ '\''
88+
+ ", classpath='"
89+
+ classpath
90+
+ '\''
91+
+ ", options="
92+
+ options
93+
+ '}';
7094
}
7195
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
3131
import org.apache.flink.streaming.api.datastream.DataStream;
3232

33-
import java.util.HashMap;
3433
import java.util.List;
3534
import java.util.Map;
3635
import java.util.stream.Collectors;
@@ -133,6 +132,6 @@ private Tuple3<String, String, Map<String, String>> modelToUDFTuple(ModelDef mod
133132
}
134133

135134
private Tuple3<String, String, Map<String, String>> udfDefToUDFTuple(UdfDef udf) {
136-
return Tuple3.of(udf.getName(), udf.getClasspath(), new HashMap<>());
135+
return Tuple3.of(udf.getName(), udf.getClasspath(), udf.getOptions());
137136
}
138137
}

0 commit comments

Comments
 (0)