Skip to content

Commit 85c4a12

Browse files
committed
[hotfix] revert unnecessary change
1 parent 4a57308 commit 85c4a12

1 file changed

Lines changed: 108 additions & 0 deletions

File tree

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.operators.transform;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypes;
22+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
23+
import org.apache.flink.cdc.runtime.model.OpenAIEmbeddingModel;
24+
import org.apache.flink.table.functions.ScalarFunction;
25+
26+
import com.fasterxml.jackson.core.JsonProcessingException;
27+
import org.junit.jupiter.api.Test;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
32+
/** Testcases for {@link UserDefinedFunctionDescriptor}. */
33+
class UserDefinedFunctionDescriptorTest {
34+
35+
/** This is a plain Flink CDC UDF. */
36+
public static class CdcUdf implements UserDefinedFunction {}
37+
38+
/** This is a Flink CDC UDF with type hint. */
39+
public static class CdcUdfWithTypeHint implements UserDefinedFunction {
40+
@Override
41+
public DataType getReturnType() {
42+
return DataTypes.TIMESTAMP_LTZ(9);
43+
}
44+
}
45+
46+
/** This is a Flink ScalarFunction. */
47+
public static class FlinkUdf extends ScalarFunction {}
48+
49+
/** This is not a valid UDF class. */
50+
public static class NotUDF {}
51+
52+
@Test
53+
void testUserDefinedFunctionDescriptor() throws JsonProcessingException {
54+
55+
assertThat(new UserDefinedFunctionDescriptor("cdc_udf", CdcUdf.class.getName()))
56+
.extracting("name", "className", "classpath", "returnTypeHint", "isCdcPipelineUdf")
57+
.containsExactly(
58+
"cdc_udf",
59+
"UserDefinedFunctionDescriptorTest$CdcUdf",
60+
"org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$CdcUdf",
61+
null,
62+
true);
63+
64+
assertThat(
65+
new UserDefinedFunctionDescriptor(
66+
"cdc_udf_with_type_hint", CdcUdfWithTypeHint.class.getName()))
67+
.extracting("name", "className", "classpath", "returnTypeHint", "isCdcPipelineUdf")
68+
.containsExactly(
69+
"cdc_udf_with_type_hint",
70+
"UserDefinedFunctionDescriptorTest$CdcUdfWithTypeHint",
71+
"org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$CdcUdfWithTypeHint",
72+
DataTypes.TIMESTAMP_LTZ(9),
73+
true);
74+
75+
assertThat(new UserDefinedFunctionDescriptor("flink_udf", FlinkUdf.class.getName()))
76+
.extracting("name", "className", "classpath", "returnTypeHint", "isCdcPipelineUdf")
77+
.containsExactly(
78+
"flink_udf",
79+
"UserDefinedFunctionDescriptorTest$FlinkUdf",
80+
"org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$FlinkUdf",
81+
null,
82+
false);
83+
84+
assertThatThrownBy(
85+
() -> new UserDefinedFunctionDescriptor("not_udf", NotUDF.class.getName()))
86+
.isExactlyInstanceOf(IllegalArgumentException.class)
87+
.hasMessage(
88+
"Failed to detect UDF class class org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$NotUDF "
89+
+ "since it never implements interface org.apache.flink.cdc.common.udf.UserDefinedFunction or "
90+
+ "extends Flink class org.apache.flink.table.functions.ScalarFunction.");
91+
92+
assertThatThrownBy(
93+
() ->
94+
new UserDefinedFunctionDescriptor(
95+
"not_even_exist", "not.a.valid.class.path"))
96+
.isExactlyInstanceOf(IllegalArgumentException.class)
97+
.hasMessage("Failed to instantiate UDF not_even_exist@not.a.valid.class.path");
98+
String name = "GET_EMBEDDING";
99+
assertThat(new UserDefinedFunctionDescriptor(name, OpenAIEmbeddingModel.class.getName()))
100+
.extracting("name", "className", "classpath", "returnTypeHint", "isCdcPipelineUdf")
101+
.containsExactly(
102+
"GET_EMBEDDING",
103+
"OpenAIEmbeddingModel",
104+
"org.apache.flink.cdc.runtime.model.OpenAIEmbeddingModel",
105+
DataTypes.ARRAY(DataTypes.FLOAT()),
106+
true);
107+
}
108+
}

0 commit comments

Comments
 (0)