diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 5b56057cec4..7f5ba9f58f6 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -395,6 +395,51 @@ pipeline: 注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过 `flink-cdc.sh --jar` 选项传递。 +### UDF 配置选项 + +你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过 `UserDefinedFunctionContext.configuration()` 获取: + +```yaml +pipeline: + user-defined-function: + - name: query_redis + classpath: com.example.flink.cdc.udf.RedisQueryFunction + options: + hostname: localhost + port: "6379" + cache.enabled: "true" +``` + +在你的 UDF 实现中,可以通过定义 `ConfigOption` 实例来访问这些配置选项: + +```java +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; + +public class RedisQueryFunction implements UserDefinedFunction { + private static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname").stringType().noDefaultValue(); + private static final ConfigOption PORT = + ConfigOptions.key("port").intType().defaultValue(6379); + + private String hostname; + private int port; + + @Override + public void open(UserDefinedFunctionContext context) throws Exception { + hostname = context.configuration().get(HOSTNAME); + port = context.configuration().get(PORT); + // 在这里初始化你的连接... + } + + public Object eval(String key) { + // 使用 hostname 和 port 查询 Redis... + } +} +``` + +`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。 + 在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样: ```yaml diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 024401782fc..9841a37b8e1 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -400,6 +400,51 @@ pipeline: Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option. +### UDF Options + +You can pass extra options to UDFs by adding an `options` block. These options will be available in the `open` method through `UserDefinedFunctionContext.configuration()`: + +```yaml +pipeline: + user-defined-function: + - name: query_redis + classpath: com.example.flink.cdc.udf.RedisQueryFunction + options: + hostname: localhost + port: "6379" + cache.enabled: "true" +``` + +And in your UDF implementation, you can access these options by defining `ConfigOption` instances: + +```java +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; + +public class RedisQueryFunction implements UserDefinedFunction { + private static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname").stringType().noDefaultValue(); + private static final ConfigOption PORT = + ConfigOptions.key("port").intType().defaultValue(6379); + + private String hostname; + private int port; + + @Override + public void open(UserDefinedFunctionContext context) throws Exception { + hostname = context.configuration().get(HOSTNAME); + port = context.configuration().get(PORT); + // Initialize your connection here... + } + + public Object eval(String key) { + // Query Redis using hostname and port... + } +} +``` + +The `options` field is optional. If not specified, an empty configuration will be passed to the UDF. + After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions: ```yaml diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index efb4e79d26b..79886ea08f4 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -92,6 +92,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String UDF_KEY = "user-defined-function"; private static final String UDF_FUNCTION_NAME_KEY = "name"; private static final String UDF_CLASSPATH_KEY = "classpath"; + private static final String UDF_OPTIONS_KEY = "options"; // Model related keys private static final String MODEL_NAME_KEY = "model-name"; @@ -295,7 +296,7 @@ private UdfDef toUdfDef(JsonNode udfNode) { "UDF", udfNode, Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY), - Collections.emptyList()); + Collections.singletonList(UDF_OPTIONS_KEY)); String functionName = checkNotNull( @@ -310,7 +311,15 @@ private UdfDef toUdfDef(JsonNode udfNode) { UDF_CLASSPATH_KEY) .asText(); - return new UdfDef(functionName, classpath); + Map options = + Optional.ofNullable(udfNode.get(UDF_OPTIONS_KEY)) + .map( + node -> + mapper.convertValue( + node, new TypeReference>() {})) + .orElse(null); + + return new UdfDef(functionName, classpath, options); } private TransformDef toTransformDef(JsonNode transformNode) { diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index bf4c377ee2c..bbfafcf8e69 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -197,6 +197,15 @@ void testUdfDefinition() throws Exception { assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf); } + @Test + void testUdfDefinitionWithOptions() throws Exception { + URL resource = + Resources.getResource("definitions/pipeline-definition-with-udf-options.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions); + } + @Test void testSchemaEvolutionTypesConfiguration() throws Exception { testSchemaEvolutionTypesParsing( @@ -669,4 +678,42 @@ void testParsingFullDefinitionFromString() throws Exception { ImmutableMap.builder() .put("parallelism", "1") .build())); + + private final PipelineDef pipelineDefWithUdfOptions = + new PipelineDef( + new SourceDef("values", null, new Configuration()), + new SinkDef( + "values", + null, + new Configuration(), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "mydb.web_order", + "*, query_redis(id) as redis_value", + "id > 0", + null, + null, + null, + null, + null)), + Collections.singletonList( + new UdfDef( + "query_redis", + "org.apache.flink.cdc.udf.examples.java.RedisQueryFunction", + ImmutableMap.builder() + .put("hostname", "localhost") + .put("port", "6379") + .put("cache.enabled", "true") + .build())), + Configuration.fromMap( + ImmutableMap.builder() + .put("parallelism", "1") + .build())); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml new file mode 100644 index 00000000000..6a985cbf41c --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml @@ -0,0 +1,36 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +source: + type: values + +sink: + type: values + +transform: + - source-table: mydb.web_order + projection: "*, query_redis(id) as redis_value" + filter: id > 0 + +pipeline: + parallelism: 1 + user-defined-function: + - name: query_redis + classpath: org.apache.flink.cdc.udf.examples.java.RedisQueryFunction + options: + hostname: localhost + port: "6379" + cache.enabled: "true" diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java index 6dbc580fb38..0486619ec85 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.composer.definition; +import java.util.Collections; +import java.util.Map; import java.util.Objects; /** @@ -27,15 +29,22 @@ *
    *
  • name: Static method name of user-defined functions. *
  • classpath: Fully-qualified class path of package containing given function. + *
  • options: Configuration options for the user-defined function. *
*/ public class UdfDef { private final String name; private final String classpath; + private final Map options; public UdfDef(String name, String classpath) { + this(name, classpath, Collections.emptyMap()); + } + + public UdfDef(String name, String classpath, Map options) { this.name = name; this.classpath = classpath; + this.options = options != null ? options : Collections.emptyMap(); } public String getName() { @@ -46,6 +55,10 @@ public String getClasspath() { return classpath; } + public Map getOptions() { + return options; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -56,16 +69,27 @@ public boolean equals(Object o) { } UdfDef udfDef = (UdfDef) o; - return Objects.equals(name, udfDef.name) && Objects.equals(classpath, udfDef.classpath); + return Objects.equals(name, udfDef.name) + && Objects.equals(classpath, udfDef.classpath) + && Objects.equals(options, udfDef.options); } @Override public int hashCode() { - return Objects.hash(name, classpath); + return Objects.hash(name, classpath, options); } @Override public String toString() { - return "UdfDef{" + "name='" + name + '\'' + ", classpath='" + classpath + '\'' + '}'; + return "UdfDef{" + + "name='" + + name + + '\'' + + ", classpath='" + + classpath + + '\'' + + ", options=" + + options + + '}'; } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index 4cc7a0b2423..a938d7ab05e 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -30,7 +30,6 @@ import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -133,6 +132,6 @@ private Tuple3> modelToUDFTuple(ModelDef mod } private Tuple3> udfDefToUDFTuple(UdfDef udf) { - return Tuple3.of(udf.getName(), udf.getClasspath(), new HashMap<>()); + return Tuple3.of(udf.getName(), udf.getClasspath(), udf.getOptions()); } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index 5daf370af62..9d5a8b22e23 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -547,6 +547,79 @@ void testComplicatedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws .contains("[ LifecycleFunction ] closed. Called 6 times."); } + @ParameterizedTest + @MethodSource("testParams") + void testConfigurableUdf(ValuesDataSink.SinkApi sinkApi, String language) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*, greet(col1) as greeting", + null, + "col1", + null, + "key1=value1", + "", + null); + + // Setup UDF with options + UdfDef udfDef = + new UdfDef( + "greet", + String.format( + "org.apache.flink.cdc.udf.examples.%s.ConfigurableFunctionClass", + language), + ImmutableMap.of("greeting", "Hi", "suffix", "~")); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList(transformDef), + Collections.singletonList(udfDef), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`greeting` STRING}, primaryKeys=col1, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Hi 1~], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Hi 2~], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Hi 3~], op=INSERT, meta=({op_ts=3})}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Hi 1~], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Hi 2~], after=[2, x, Hi 2~], op=UPDATE, meta=({op_ts=5})}"); + } + // -------------------------- // Flink-compatible UDF tests // -------------------------- diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml index 293f76cd455..ff7339a4c51 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml @@ -79,5 +79,5 @@ steps: language: clojure error: | Unexpected key `language` in YAML UDF block. - Allowed keys in this context are: [name, classpath] + Allowed keys in this context are: [name, classpath, options] Note: option language: "clojure" is unexpected. It was silently ignored in previous versions, and probably should be removed. diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java new file mode 100644 index 00000000000..1ce7309b726 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; +import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; + +/** This is an example UDF class that reads options from configuration. */ +public class ConfigurableFunctionClass implements UserDefinedFunction { + + private static final ConfigOption GREETING = + ConfigOptions.key("greeting").stringType().defaultValue("Hello"); + + private static final ConfigOption SUFFIX = + ConfigOptions.key("suffix").stringType().defaultValue("!"); + + private String greeting; + private String suffix; + + public String eval(String value) { + return greeting + " " + value + suffix; + } + + @Override + public void open(UserDefinedFunctionContext context) throws Exception { + greeting = context.configuration().get(GREETING); + suffix = context.configuration().get(SUFFIX); + } + + @Override + public void close() throws Exception {} +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala new file mode 100644 index 00000000000..6405ef715ff --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala + +import org.apache.flink.cdc.common.configuration.ConfigOptions +import org.apache.flink.cdc.common.udf.{UserDefinedFunction, UserDefinedFunctionContext} + +/** This is an example UDF class that reads options from configuration. */ +class ConfigurableFunctionClass extends UserDefinedFunction { + + private var greeting: String = "Hello" + private var suffix: String = "!" + + def eval(value: String): String = { + greeting + " " + value + suffix + } + + override def open(context: UserDefinedFunctionContext): Unit = { + greeting = context.configuration().get(ConfigurableFunctionClass.GREETING) + suffix = context.configuration().get(ConfigurableFunctionClass.SUFFIX) + } + + override def close(): Unit = {} +} + +object ConfigurableFunctionClass { + private val GREETING = ConfigOptions.key("greeting").stringType().defaultValue("Hello") + private val SUFFIX = ConfigOptions.key("suffix").stringType().defaultValue("!") +}