From 512112335dadd62cd7d261404b4b8117a38a0d03 Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 4 Sep 2024 22:42:45 +0800 Subject: [PATCH 1/4] feat: Unified function module --- .../build.gradle | 5 - .../eventmesh-function-api}/build.gradle | 5 - .../function/api/AbstractFunctionChain.java | 76 +++++ .../eventmesh/function/api/Function.java | 43 +++ .../eventmesh-function-filter/build.gradle | 21 ++ .../function}/filter/PatternEntry.java | 4 +- .../condition/AnythingButCondition.java | 2 +- .../function}/filter/condition/Condition.java | 2 +- .../filter/condition/ConditionsBuilder.java | 2 +- .../filter/condition/ExistsCondition.java | 2 +- .../filter/condition/NumericCondition.java | 2 +- .../filter/condition/PrefixCondition.java | 2 +- .../filter/condition/SpecifiedCondition.java | 2 +- .../filter/condition/SuffixCondition.java | 2 +- .../function}/filter/pattern/Pattern.java | 25 +- .../filter/patternbuild/PatternBuilder.java | 10 +- .../function}/filter/PatternTest.java | 6 +- .../build.gradle | 16 +- .../transformer/ConstantTransformer.java | 2 +- .../function}/transformer/JsonPathParser.java | 2 +- .../transformer/OriginalTransformer.java | 3 +- .../function}/transformer/Template.java | 2 +- .../transformer/TemplateTransformer.java | 2 +- .../transformer/TransformException.java | 2 +- .../function/transformer/Transformer.java | 44 +++ .../transformer/TransformerBuilder.java | 2 +- .../transformer/TransformerParam.java | 2 +- .../transformer/TransformerType.java | 2 +- .../function}/transformer/Variable.java | 2 +- .../function}/transformer/TransformTest.java | 2 +- eventmesh-runtime-v2/build.gradle | 3 + .../runtime/function/FunctionRuntime.java | 277 ++++++++++++++++++ .../function/FunctionRuntimeConfig.java | 35 +++ .../function/FunctionRuntimeFactory.java | 2 +- .../runtime/function/StringFunctionChain.java | 38 +++ .../src/main/resources/function.yaml | 21 ++ eventmesh-runtime/build.gradle | 5 +- .../eventmesh/runtime/boot/FilterEngine.java | 4 +- .../runtime/boot/TransformerEngine.java | 6 +- .../processor/SendAsyncEventProcessor.java | 4 +- .../http/push/AsyncHTTPPushRequest.java | 4 +- settings.gradle | 7 +- 42 files changed, 623 insertions(+), 77 deletions(-) rename {eventmesh-filter => eventmesh-function}/build.gradle (92%) rename {eventmesh-transformer => eventmesh-function/eventmesh-function-api}/build.gradle (92%) create mode 100644 eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java create mode 100644 eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java create mode 100644 eventmesh-function/eventmesh-function-filter/build.gradle rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/PatternEntry.java (94%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/AnythingButCondition.java (97%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/Condition.java (94%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/ConditionsBuilder.java (97%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/ExistsCondition.java (95%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/NumericCondition.java (97%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/PrefixCondition.java (95%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/SpecifiedCondition.java (95%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/condition/SuffixCondition.java (95%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/pattern/Pattern.java (76%) rename {eventmesh-filter/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function}/filter/patternbuild/PatternBuilder.java (94%) rename {eventmesh-filter/src/test/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function}/filter/PatternTest.java (96%) rename eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java => eventmesh-function/eventmesh-function-transformer/build.gradle (70%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/ConstantTransformer.java (95%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/JsonPathParser.java (98%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/OriginalTransformer.java (94%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/Template.java (96%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/TemplateTransformer.java (96%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/TransformException.java (95%) create mode 100644 eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/TransformerBuilder.java (97%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/TransformerParam.java (97%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/TransformerType.java (97%) rename {eventmesh-transformer/src/main/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function}/transformer/Variable.java (96%) rename {eventmesh-transformer/src/test/java/org/apache/eventmesh => eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function}/transformer/TransformTest.java (99%) create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java create mode 100644 eventmesh-runtime-v2/src/main/resources/function.yaml diff --git a/eventmesh-filter/build.gradle b/eventmesh-function/build.gradle similarity index 92% rename from eventmesh-filter/build.gradle rename to eventmesh-function/build.gradle index ba88591b41..2944f98194 100644 --- a/eventmesh-filter/build.gradle +++ b/eventmesh-function/build.gradle @@ -14,8 +14,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - -dependencies { - implementation project(":eventmesh-common") -} diff --git a/eventmesh-transformer/build.gradle b/eventmesh-function/eventmesh-function-api/build.gradle similarity index 92% rename from eventmesh-transformer/build.gradle rename to eventmesh-function/eventmesh-function-api/build.gradle index ba88591b41..2944f98194 100644 --- a/eventmesh-transformer/build.gradle +++ b/eventmesh-function/eventmesh-function-api/build.gradle @@ -14,8 +14,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - -dependencies { - implementation project(":eventmesh-common") -} diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java new file mode 100644 index 0000000000..5501d0973d --- /dev/null +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.api; + +import java.util.ArrayList; +import java.util.List; + +/** + * AbstractFunctionChain is an abstract class that implements the {@link Function} interface and provides a framework + * for chaining multiple {@link Function} instances that operate on inputs of type {@code T} and produce outputs of type + * {@code R}. This class can be extended to create specific function chains with customized behavior for different + * data types. + * + *

The primary purpose of this class is to allow the sequential execution of functions, where the output of one + * function is passed as the input to the next function in the chain. The chain can be dynamically modified by adding + * functions either at the beginning or the end of the chain.

+ * + * @param the type of the input to the function + * @param the type of the result of the function + */ +public abstract class AbstractFunctionChain implements Function { + + protected final List> functions; + + /** + * Default constructor that initializes an empty function chain. + */ + public AbstractFunctionChain() { + this.functions = new ArrayList<>(); + } + + /** + * Constructor that initializes the function chain with a given list of functions. The functions will be executed + * in the order they are provided when the {@link #apply(Object)} method is called. + * + * @param functions the initial list of functions to be added to the chain + */ + public AbstractFunctionChain(List> functions) { + this.functions = functions; + } + + /** + * Adds a {@link Function} to the beginning of the chain. The function will be executed first when the + * {@link #apply(Object)} method is called. + * + * @param function the function to be added to the beginning of the chain + */ + public void addFirst(Function function) { + functions.add(0, function); + } + + /** + * Adds a {@link Function} to the end of the chain. The function will be executed in sequence after all previously + * added functions when the {@link #apply(Object)} method is called. + * + * @param function the function to be added to the end of the chain + */ + public void addLast(Function function) { + functions.add(function); + } +} \ No newline at end of file diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java new file mode 100644 index 0000000000..db657f72ee --- /dev/null +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.api; + +/** + * EventMesh Interface for a function that accepts one argument and produces a result. This is a functional interface whose functional method is + * {@link #apply(Object)}. + * + *

This interface is similar to {@link java.util.function.Function}, + * but it is specifically designed for use within the EventMesh. It allows defining custom functions to process data or events in the EventMesh. The + * main use case is to encapsulate operations that can be passed around and applied to data or event messages in the EventMesh processing + * pipeline.

+ * + * @param the type of the input to the function + * @param the type of the result of the function + */ +public interface Function { + + /** + * Applies this function to the given argument within the context of the EventMesh module. This method encapsulates the logic for processing the + * input data and producing a result, which can be used in the EventMesh event processing pipeline. + * + * @param t the function argument, representing the input data or event to be processed + * @return the function result, representing the processed output + */ + R apply(T t); + +} \ No newline at end of file diff --git a/eventmesh-function/eventmesh-function-filter/build.gradle b/eventmesh-function/eventmesh-function-filter/build.gradle new file mode 100644 index 0000000000..21e28d7baf --- /dev/null +++ b/eventmesh-function/eventmesh-function-filter/build.gradle @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + implementation project(":eventmesh-common") + implementation project(":eventmesh-function:eventmesh-function-api") +} \ No newline at end of file diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java similarity index 94% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java index 5a2493a371..acc2d5f073 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.eventmesh.filter; +package org.apache.eventmesh.function.filter; -import org.apache.eventmesh.filter.condition.Condition; +import org.apache.eventmesh.function.filter.condition.Condition; import java.util.ArrayList; import java.util.List; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java index 2d58136a70..d4f209225e 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import java.util.ArrayList; import java.util.Iterator; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java similarity index 94% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java index fbb4276c7b..9890d5e0d3 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java index 4e207663aa..961be85e5b 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java index 53c15bb297..c085ba6585 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java index 5eb5374c7c..40eb16a75e 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import java.util.ArrayList; import java.util.List; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java index 633ed1fb02..ff5d0313ce 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java index f9cc3fb5db..9eefb6b641 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java index 805df0ee17..090df24834 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java similarity index 76% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java index 8abb306b84..51315e0ed1 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.pattern; +package org.apache.eventmesh.function.filter.pattern; import org.apache.eventmesh.common.utils.JsonPathUtils; -import org.apache.eventmesh.filter.PatternEntry; +import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.filter.PatternEntry; import org.apache.commons.lang3.StringUtils; @@ -29,12 +30,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.jayway.jsonpath.PathNotFoundException; -public class Pattern { - private List requiredFieldList = new ArrayList<>(); - private List dataList = new ArrayList<>(); +public class Pattern implements Function { - private String content; + private final List requiredFieldList = new ArrayList<>(); + private final List dataList = new ArrayList<>(); public void addRequiredFieldList(PatternEntry patternEntry) { this.requiredFieldList.add(patternEntry); @@ -45,19 +45,22 @@ public void addDataList(PatternEntry patternEntry) { } public boolean filter(String content) { - this.content = content; - // this.jsonNode = JacksonUtils.STRING_TO_JSONNODE(content); + return matchRequiredFieldList(content, requiredFieldList) && matchRequiredFieldList(content, dataList); + } - return matchRequiredFieldList(requiredFieldList) && matchRequiredFieldList(dataList); + @Override + public String apply(String content) { + // filter content + return filter(content) ? content : null; } - private boolean matchRequiredFieldList(List dataList) { + private boolean matchRequiredFieldList(String content, List dataList) { for (final PatternEntry patternEntry : dataList) { JsonNode jsonElement = null; try { // content:filter - String matchRes = JsonPathUtils.matchJsonPathValue(this.content, patternEntry.getPatternPath()); + String matchRes = JsonPathUtils.matchJsonPathValue(content, patternEntry.getPatternPath()); if (StringUtils.isNoneBlank(matchRes)) { jsonElement = JsonPathUtils.parseStrict(matchRes); diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java similarity index 94% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java index 5f9a71d262..de8e2fd27f 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.patternbuild; +package org.apache.eventmesh.function.filter.patternbuild; import org.apache.eventmesh.common.exception.JsonException; -import org.apache.eventmesh.filter.PatternEntry; -import org.apache.eventmesh.filter.condition.Condition; -import org.apache.eventmesh.filter.condition.ConditionsBuilder; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.PatternEntry; +import org.apache.eventmesh.function.filter.condition.Condition; +import org.apache.eventmesh.function.filter.condition.ConditionsBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; import java.util.ArrayDeque; import java.util.Iterator; diff --git a/eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java similarity index 96% rename from eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java rename to eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java index 207992b0c1..b674a62395 100644 --- a/eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java +++ b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.eventmesh.filter; +package org.apache.eventmesh.function.filter; -import org.apache.eventmesh.filter.pattern.Pattern; -import org.apache.eventmesh.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java b/eventmesh-function/eventmesh-function-transformer/build.gradle similarity index 70% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java rename to eventmesh-function/eventmesh-function-transformer/build.gradle index 8239dfcb6e..6939bbd483 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java +++ b/eventmesh-function/eventmesh-function-transformer/build.gradle @@ -15,18 +15,8 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; - -import com.fasterxml.jackson.core.JsonProcessingException; - -/** - * EventMesh transformer interface, specified transformer implementation includes: - * 1. Constant - * 2. Original - * 3. Template - */ -public interface Transformer { - - String transform(String json) throws JsonProcessingException; +dependencies { + implementation project(":eventmesh-common") + implementation project(":eventmesh-function:eventmesh-function-api") } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java similarity index 95% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java index dd7c20aace..ae77f149f7 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class ConstantTransformer implements Transformer { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java similarity index 98% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java index a0ebde12d2..6b66221c15 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import org.apache.eventmesh.common.utils.JsonPathUtils; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java similarity index 94% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java index 61aa059d59..59ce0350eb 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; class OriginalTransformer implements Transformer { @@ -23,4 +23,5 @@ class OriginalTransformer implements Transformer { public String transform(String json) { return json; } + } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java index 19c3b5cec3..29d975c371 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import org.apache.commons.text.StringSubstitutor; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java index bc9907ff48..69cee68269 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import java.util.List; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java similarity index 95% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java index 1b11a29d80..aeb827fc88 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; /** * Transform exception diff --git a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java new file mode 100644 index 0000000000..2d3dfc6578 --- /dev/null +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.transformer; + +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.function.api.Function; + +import com.fasterxml.jackson.core.JsonProcessingException; + +/** + * EventMesh transformer interface, specified transformer implementation includes: + * 1. Constant + * 2. Original + * 3. Template + */ +public interface Transformer extends Function { + + String transform(String json) throws JsonProcessingException; + + @Override + default String apply(String content) { + try { + return transform(content); + } catch (JsonProcessingException e) { + throw new EventMeshException("Failed to transform content", e); + } + } + +} diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java similarity index 97% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java index e7277af73c..4eb0a818e6 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class TransformerBuilder { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java similarity index 97% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java index d747d7be4c..915111e01d 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class TransformerParam { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java similarity index 97% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java index 2dc7809478..969c49ce80 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import java.util.Objects; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java index c9259d335c..aee80e1454 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class Variable { diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java similarity index 99% rename from eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java rename to eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java index a55cde0baf..5fefd6b8bb 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/eventmesh-runtime-v2/build.gradle b/eventmesh-runtime-v2/build.gradle index 04b460ade3..74b9759b10 100644 --- a/eventmesh-runtime-v2/build.gradle +++ b/eventmesh-runtime-v2/build.gradle @@ -36,6 +36,9 @@ dependencies { implementation project(":eventmesh-common") implementation project(":eventmesh-connectors:eventmesh-connector-canal") implementation project(":eventmesh-connectors:eventmesh-connector-http") + implementation project(":eventmesh-function:eventmesh-function-api") + implementation project(":eventmesh-function:eventmesh-function-filter") + implementation project(":eventmesh-function:eventmesh-function-transformer") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-registry:eventmesh-registry-api") diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java index 66ba0a0c3d..e3a22f8d39 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java @@ -17,22 +17,299 @@ package org.apache.eventmesh.runtime.function; +import org.apache.eventmesh.common.ThreadPoolFactory; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.config.connector.SinkConfig; +import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.job.JobType; +import org.apache.eventmesh.function.api.AbstractFunctionChain; +import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.transformer.Transformer; +import org.apache.eventmesh.function.transformer.TransformerBuilder; +import org.apache.eventmesh.function.transformer.TransformerParam; +import org.apache.eventmesh.function.transformer.TransformerType; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; import org.apache.eventmesh.runtime.Runtime; +import org.apache.eventmesh.runtime.RuntimeInstanceConfig; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class FunctionRuntime implements Runtime { + private final RuntimeInstanceConfig runtimeInstanceConfig; + + private final LinkedBlockingQueue queue; + + private FunctionRuntimeConfig functionRuntimeConfig; + + private AbstractFunctionChain functionChain; + + private Sink sinkConnector; + + private Source sourceConnector; + + private final ExecutorService sourceService = ThreadPoolFactory.createSingleExecutor("eventMesh-sourceService"); + + private final ExecutorService sinkService = ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService"); + + + private volatile boolean isRunning = false; + + private volatile boolean isFailed = false; + + + public FunctionRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { + this.runtimeInstanceConfig = runtimeInstanceConfig; + this.queue = new LinkedBlockingQueue<>(1000); + } + + @Override public void init() throws Exception { + // load function runtime config from local file + this.functionRuntimeConfig = ConfigService.getInstance().buildConfigInstance(FunctionRuntimeConfig.class); + + // TODO init admin service + + // TODO get remote config from admin service and update local config + + // init connector service + initConnectorService(); + } + + private void initConnectorService() throws Exception { + final JobType jobType = (JobType) functionRuntimeConfig.getRuntimeConfig().get("jobType"); + + // create sink connector + ConnectorCreateService sinkConnectorCreateService = + ConnectorPluginFactory.createConnector(functionRuntimeConfig.getSinkConnectorType() + "-Sink"); + this.sinkConnector = (Sink) sinkConnectorCreateService.create(); + + // parse sink config and init sink connector + SinkConfig sinkConfig = (SinkConfig) ConfigUtil.parse(functionRuntimeConfig.getSinkConnectorConfig(), sinkConnector.configClass()); + SinkConnectorContext sinkConnectorContext = new SinkConnectorContext(); + sinkConnectorContext.setSinkConfig(sinkConfig); + sinkConnectorContext.setRuntimeConfig(functionRuntimeConfig.getRuntimeConfig()); + sinkConnectorContext.setJobType(jobType); + sinkConnector.init(sinkConnectorContext); + + // create source connector + ConnectorCreateService sourceConnectorCreateService = + ConnectorPluginFactory.createConnector(functionRuntimeConfig.getSourceConnectorType() + "-Source"); + this.sourceConnector = (Source) sourceConnectorCreateService.create(); + // parse source config and init source connector + SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(functionRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass()); + SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); + sourceConnectorContext.setSourceConfig(sourceConfig); + sourceConnectorContext.setRuntimeConfig(functionRuntimeConfig.getRuntimeConfig()); + sourceConnectorContext.setJobType(jobType); +// sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); +// if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { +// sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); +// } + sourceConnector.init(sourceConnectorContext); } @Override public void start() throws Exception { + // build function chain + this.functionChain = buildFunctionChain(functionRuntimeConfig.getFunctionConfigs()); + // start sink service + sinkService.execute(() -> { + try { + startSinkConnector(); + } catch (Exception e) { + isFailed = true; + log.error("Sink Connector [{}] failed to start.", sinkConnector.name(), e); + try { + this.stop(); + } catch (Exception ex) { + log.error("Failed to stop after exception", ex); + } + throw new RuntimeException(e); + } + }); + + // start source service + sourceService.execute(() -> { + try { + startSourceConnector(); + } catch (Exception e) { + isFailed = true; + log.error("Source Connector [{}] failed to start.", sourceConnector.name(), e); + try { + this.stop(); + } catch (Exception ex) { + log.error("Failed to stop after exception", ex); + } + throw new RuntimeException(e); + } + }); } + private StringFunctionChain buildFunctionChain(List> functionConfigs) { + StringFunctionChain functionChain = new StringFunctionChain(); + for (Map functionConfig : functionConfigs) { + + String functionType = (String) functionConfig.getOrDefault("functionType", ""); + if (functionType.isEmpty()) { + throw new IllegalArgumentException("'functionType' is required for function"); + } + + // build function based on functionType + Function function; + switch (functionType) { + case "filter": + function = buildFilter(functionConfig); + break; + case "transformer": + function = buildTransformer(functionConfig); + break; + default: + throw new IllegalArgumentException( + "Invalid functionType: '" + functionType + "'. Supported functionType: 'filter', 'transformer'"); + } + // add function to functionChain + functionChain.addLast(function); + } + + return functionChain; + } + + private Pattern buildFilter(Map functionConfig) { + // get condition from attributes + String condition = (String) functionConfig.get("condition"); + if (condition == null) { + throw new IllegalArgumentException("'condition' is required for filter function"); + } + return PatternBuilder.build(condition); + } + + private Transformer buildTransformer(Map functionConfig) { + // get transformerType from attributes + TransformerType transformerType = TransformerType.getItem((String) functionConfig.getOrDefault("transformerType", "")); + if (transformerType == null) { + throw new IllegalArgumentException( + "Invalid transformerType: '" + functionConfig.get("transformerType") + + "'. Supported transformerType: 'CONSTANT', 'TEMPLATE', 'ORIGINAL'"); + } + + // build transformer + TransformerParam transformerParam = new TransformerParam(); + transformerParam.setTransformerType(transformerType); + + String value = (String) functionConfig.getOrDefault("value", ""); + String template = (String) functionConfig.getOrDefault("template", ""); + + switch (transformerType) { + case CONSTANT: + // check value + if (value.isEmpty()) { + throw new IllegalArgumentException("'value' is required for constant transformer"); + } + transformerParam.setValue(value); + break; + case TEMPLATE: + // check value and template + if (value.isEmpty() || template.isEmpty()) { + throw new IllegalArgumentException("'value' and 'template' are required for template transformer"); + } + transformerParam.setValue(value); + transformerParam.setTemplate(template); + break; + default: + // ORIGINAL doesn't need value and template + break; + } + + return TransformerBuilder.buildTransformer(transformerParam); + } + + + private void startSinkConnector() throws Exception { + // start sink connector + this.sinkConnector.start(); + + // try to get data from queue and send it. + while (this.isRunning) { + ConnectRecord connectRecord = null; + try { + connectRecord = queue.poll(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Failed to poll data from queue.", e); + Thread.currentThread().interrupt(); + } + + // send data if not null + if (connectRecord != null) { + sinkConnector.put(Collections.singletonList(connectRecord)); + } + } + } + + private void startSourceConnector() throws Exception { + // start source connector + this.sourceConnector.start(); + + // try to get data from source connector and handle it. + while (this.isRunning) { + List connectorRecordList = sourceConnector.poll(); + + // handle data + if (connectorRecordList != null && !connectorRecordList.isEmpty()) { + for (ConnectRecord connectRecord : connectorRecordList) { + if (connectRecord == null || connectRecord.getData() == null) { + // If data is null, just put it into queue. + this.queue.put(connectRecord); + } else { + // Apply function chain to data + String data = functionChain.apply((String) connectRecord.getData()); + if (data != null) { + connectRecord.setData(data); + this.queue.put(connectRecord); + } + } + } + } + } + } + + @Override public void stop() throws Exception { + log.info("FunctionRuntime is stopping..."); + +// if (isFailed) { +// reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL); +// } else { +// reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.COMPLETE); +// } + isRunning = false; + sinkConnector.stop(); + sourceConnector.stop(); + sinkService.shutdown(); + sourceService.shutdown(); + log.info("FunctionRuntime stopped."); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java index 40aec65e99..4d57c83e82 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java @@ -17,5 +17,40 @@ package org.apache.eventmesh.runtime.function; +import org.apache.eventmesh.common.config.Config; + +import java.util.List; +import java.util.Map; + + +import lombok.Data; + +@Data +@Config(path = "classPath://function.yaml") public class FunctionRuntimeConfig { + + private String functionRuntimeInstanceId; + + private String taskID; + + private String jobID; + + private String region; + + private Map runtimeConfig; + + private String sourceConnectorType; + + private String sourceConnectorDesc; + + private Map sourceConnectorConfig; + + private String sinkConnectorType; + + private String sinkConnectorDesc; + + private Map sinkConnectorConfig; + + private List> functionConfigs; + } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java index 3ba91986cb..40346e272f 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java @@ -30,7 +30,7 @@ public void init() throws Exception { @Override public Runtime createRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { - return null; + return new FunctionRuntime(runtimeInstanceConfig); } @Override diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java new file mode 100644 index 0000000000..8a4e0564cc --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.function; + +import org.apache.eventmesh.function.api.AbstractFunctionChain; +import org.apache.eventmesh.function.api.Function; + +/** + * ConnectRecord Function Chain. + */ +public class StringFunctionChain extends AbstractFunctionChain { + + @Override + public String apply(String content) { + for (Function function : functions) { + if (content == null) { + break; + } + content = function.apply(content); + } + return content; + } +} diff --git a/eventmesh-runtime-v2/src/main/resources/function.yaml b/eventmesh-runtime-v2/src/main/resources/function.yaml new file mode 100644 index 0000000000..eae2b063ec --- /dev/null +++ b/eventmesh-runtime-v2/src/main/resources/function.yaml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +taskID: c6233632-ab9a-4aba-904f-9d22fba6aa74 +jobID: 8190fe5b-1f9b-4815-8983-2467e76edbf0 +region: region1 + diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 95924faad4..b016e18bfe 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -36,8 +36,10 @@ dependencies { implementation "commons-validator:commons-validator" implementation project(":eventmesh-common") - implementation project(":eventmesh-filter") implementation project(":eventmesh-spi") + implementation project(":eventmesh-function:eventmesh-function-api") + implementation project(":eventmesh-function:eventmesh-function-filter") + implementation project(":eventmesh-function:eventmesh-function-transformer") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") implementation project(":eventmesh-storage-plugin:eventmesh-storage-standalone") implementation project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq") @@ -45,7 +47,6 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-token") - implementation project(":eventmesh-transformer") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java index bf6eb9dadc..14677dc690 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java @@ -19,8 +19,8 @@ import org.apache.eventmesh.api.meta.MetaServiceListener; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.filter.pattern.Pattern; -import org.apache.eventmesh.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java index 551bcb2799..1d2f8ca30c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java @@ -19,14 +19,14 @@ import org.apache.eventmesh.api.meta.MetaServiceListener; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.function.transformer.Transformer; +import org.apache.eventmesh.function.transformer.TransformerBuilder; +import org.apache.eventmesh.function.transformer.TransformerParam; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager; import org.apache.eventmesh.runtime.meta.MetaStorage; -import org.apache.eventmesh.transformer.Transformer; -import org.apache.eventmesh.transformer.TransformerBuilder; -import org.apache.eventmesh.transformer.TransformerParam; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index b30238a28c..0e41d827ab 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -31,7 +31,8 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.transformer.Transformer; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.acl.Acl; @@ -44,7 +45,6 @@ import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; -import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index be95971536..69506ede8a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -30,14 +30,14 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.transformer.Transformer; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.WebhookUtil; -import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; diff --git a/settings.gradle b/settings.gradle index e9346bc461..b263dbbe50 100644 --- a/settings.gradle +++ b/settings.gradle @@ -47,8 +47,6 @@ include 'eventmesh-common' include 'eventmesh-starter' include 'eventmesh-examples' include 'eventmesh-spi' -include 'eventmesh-filter' -include 'eventmesh-transformer' include 'eventmesh-openconnect:eventmesh-openconnect-java' include 'eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api' @@ -133,3 +131,8 @@ include 'eventmesh-registry' include 'eventmesh-registry:eventmesh-registry-api' include 'eventmesh-registry:eventmesh-registry-nacos' +include 'eventmesh-function' +include 'eventmesh-function:eventmesh-function-api' +include 'eventmesh-function:eventmesh-function-filter' +include 'eventmesh-function:eventmesh-function-transformer' + From 45f6a8f501af92e3112b7b604ebc058b0c33aaae Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 11 Sep 2024 23:14:44 +0800 Subject: [PATCH 2/4] feat: update something --- ...va => AbstractEventMeshFunctionChain.java} | 24 +++---- .../{Function.java => EventMeshFunction.java} | 2 +- .../function/filter/pattern/Pattern.java | 4 +- .../filter/patternbuild/PatternBuilder.java | 26 ++++++-- .../function/transformer/Transformer.java | 4 +- .../runtime/function/FunctionRuntime.java | 64 +++++++++++-------- ...java => StringEventMeshFunctionChain.java} | 8 +-- 7 files changed, 77 insertions(+), 55 deletions(-) rename eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/{AbstractFunctionChain.java => AbstractEventMeshFunctionChain.java} (69%) rename eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/{Function.java => EventMeshFunction.java} (97%) rename eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/{StringFunctionChain.java => StringEventMeshFunctionChain.java} (78%) diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java similarity index 69% rename from eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java rename to eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java index 5501d0973d..8cbb0f9381 100644 --- a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractFunctionChain.java +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java @@ -21,8 +21,8 @@ import java.util.List; /** - * AbstractFunctionChain is an abstract class that implements the {@link Function} interface and provides a framework - * for chaining multiple {@link Function} instances that operate on inputs of type {@code T} and produce outputs of type + * AbstractEventMeshFunctionChain is an abstract class that implements the {@link EventMeshFunction} interface and provides a framework + * for chaining multiple {@link EventMeshFunction} instances that operate on inputs of type {@code T} and produce outputs of type * {@code R}. This class can be extended to create specific function chains with customized behavior for different * data types. * @@ -33,14 +33,14 @@ * @param the type of the input to the function * @param the type of the result of the function */ -public abstract class AbstractFunctionChain implements Function { +public abstract class AbstractEventMeshFunctionChain implements EventMeshFunction { - protected final List> functions; + protected final List> functions; /** * Default constructor that initializes an empty function chain. */ - public AbstractFunctionChain() { + public AbstractEventMeshFunctionChain() { this.functions = new ArrayList<>(); } @@ -50,27 +50,27 @@ public AbstractFunctionChain() { * * @param functions the initial list of functions to be added to the chain */ - public AbstractFunctionChain(List> functions) { + public AbstractEventMeshFunctionChain(List> functions) { this.functions = functions; } /** - * Adds a {@link Function} to the beginning of the chain. The function will be executed first when the + * Adds a {@link EventMeshFunction} to the beginning of the chain. The function will be executed first when the * {@link #apply(Object)} method is called. * * @param function the function to be added to the beginning of the chain */ - public void addFirst(Function function) { - functions.add(0, function); + public void addFirst(EventMeshFunction function) { + this.functions.add(0, function); } /** - * Adds a {@link Function} to the end of the chain. The function will be executed in sequence after all previously + * Adds a {@link EventMeshFunction} to the end of the chain. The function will be executed in sequence after all previously * added functions when the {@link #apply(Object)} method is called. * * @param function the function to be added to the end of the chain */ - public void addLast(Function function) { - functions.add(function); + public void addLast(EventMeshFunction function) { + this.functions.add(function); } } \ No newline at end of file diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java similarity index 97% rename from eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java rename to eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java index db657f72ee..973f097ae0 100644 --- a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/Function.java +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java @@ -29,7 +29,7 @@ * @param the type of the input to the function * @param the type of the result of the function */ -public interface Function { +public interface EventMeshFunction { /** * Applies this function to the given argument within the context of the EventMesh module. This method encapsulates the logic for processing the diff --git a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java index 51315e0ed1..955d9f59ef 100644 --- a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java @@ -18,7 +18,7 @@ package org.apache.eventmesh.function.filter.pattern; import org.apache.eventmesh.common.utils.JsonPathUtils; -import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.api.EventMeshFunction; import org.apache.eventmesh.function.filter.PatternEntry; import org.apache.commons.lang3.StringUtils; @@ -31,7 +31,7 @@ import com.jayway.jsonpath.PathNotFoundException; -public class Pattern implements Function { +public class Pattern implements EventMeshFunction { private final List requiredFieldList = new ArrayList<>(); private final List dataList = new ArrayList<>(); diff --git a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java index de8e2fd27f..939d1b6d6f 100644 --- a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java @@ -38,19 +38,33 @@ public class PatternBuilder { private static final ObjectMapper mapper = new ObjectMapper(); - public static Pattern build(String jsonStr) { - Pattern pattern = new Pattern(); - JsonNode jsonNode = null; + public static Pattern build(String jsonStr) { try { - jsonNode = mapper.readTree(jsonStr); + JsonNode jsonNode = mapper.readTree(jsonStr); + if (jsonNode.isEmpty() || !jsonNode.isObject()) { + return null; + } + return build(jsonNode); } catch (Exception e) { throw new JsonException("INVALID_JSON_STRING", e); } + } - if (jsonNode.isEmpty() || !jsonNode.isObject()) { - return null; + public static Pattern build(Map conditionMap) { + try { + JsonNode jsonNode = mapper.valueToTree(conditionMap); + if (jsonNode.isEmpty() || !jsonNode.isObject()) { + return null; + } + return build(jsonNode); + } catch (Exception e) { + throw new JsonException("INVALID_JSON_STRING", e); } + } + + public static Pattern build(JsonNode jsonNode) { + Pattern pattern = new Pattern(); // iter all json data Iterator> iterator = jsonNode.fields(); diff --git a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java index 2d3dfc6578..be0e815808 100644 --- a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java @@ -18,7 +18,7 @@ package org.apache.eventmesh.function.transformer; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.api.EventMeshFunction; import com.fasterxml.jackson.core.JsonProcessingException; @@ -28,7 +28,7 @@ * 2. Original * 3. Template */ -public interface Transformer extends Function { +public interface Transformer extends EventMeshFunction { String transform(String json) throws JsonProcessingException; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java index e3a22f8d39..70f2eed124 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java @@ -21,10 +21,9 @@ import org.apache.eventmesh.common.config.ConfigService; import org.apache.eventmesh.common.config.connector.SinkConfig; import org.apache.eventmesh.common.config.connector.SourceConfig; -import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.job.JobType; -import org.apache.eventmesh.function.api.AbstractFunctionChain; -import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.api.AbstractEventMeshFunctionChain; +import org.apache.eventmesh.function.api.EventMeshFunction; import org.apache.eventmesh.function.filter.pattern.Pattern; import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; import org.apache.eventmesh.function.transformer.Transformer; @@ -42,6 +41,8 @@ import org.apache.eventmesh.runtime.Runtime; import org.apache.eventmesh.runtime.RuntimeInstanceConfig; +import org.apache.commons.lang3.StringUtils; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -49,6 +50,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -60,7 +64,7 @@ public class FunctionRuntime implements Runtime { private FunctionRuntimeConfig functionRuntimeConfig; - private AbstractFunctionChain functionChain; + private AbstractEventMeshFunctionChain functionChain; private Sink sinkConnector; @@ -70,7 +74,6 @@ public class FunctionRuntime implements Runtime { private final ExecutorService sinkService = ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService"); - private volatile boolean isRunning = false; private volatile boolean isFailed = false; @@ -122,10 +125,7 @@ private void initConnectorService() throws Exception { sourceConnectorContext.setSourceConfig(sourceConfig); sourceConnectorContext.setRuntimeConfig(functionRuntimeConfig.getRuntimeConfig()); sourceConnectorContext.setJobType(jobType); -// sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); -// if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { -// sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); -// } + sourceConnector.init(sourceConnectorContext); } @@ -167,17 +167,18 @@ public void start() throws Exception { }); } - private StringFunctionChain buildFunctionChain(List> functionConfigs) { - StringFunctionChain functionChain = new StringFunctionChain(); - for (Map functionConfig : functionConfigs) { + private StringEventMeshFunctionChain buildFunctionChain(List> functionConfigs) { + StringEventMeshFunctionChain functionChain = new StringEventMeshFunctionChain(); - String functionType = (String) functionConfig.getOrDefault("functionType", ""); - if (functionType.isEmpty()) { + // build function chain + for (Map functionConfig : functionConfigs) { + String functionType = String.valueOf(functionConfig.getOrDefault("functionType", "")); + if (StringUtils.isEmpty(functionType)) { throw new IllegalArgumentException("'functionType' is required for function"); } // build function based on functionType - Function function; + EventMeshFunction function; switch (functionType) { case "filter": function = buildFilter(functionConfig); @@ -189,6 +190,7 @@ private StringFunctionChain buildFunctionChain(List> functio throw new IllegalArgumentException( "Invalid functionType: '" + functionType + "'. Supported functionType: 'filter', 'transformer'"); } + // add function to functionChain functionChain.addLast(function); } @@ -196,18 +198,29 @@ private StringFunctionChain buildFunctionChain(List> functio return functionChain; } + + @SuppressWarnings("unchecked") private Pattern buildFilter(Map functionConfig) { // get condition from attributes - String condition = (String) functionConfig.get("condition"); + Object condition = functionConfig.get("condition"); + if (condition == null) { throw new IllegalArgumentException("'condition' is required for filter function"); } - return PatternBuilder.build(condition); + + if (condition instanceof String) { + return PatternBuilder.build(String.valueOf(condition)); + } else if (condition instanceof Map) { + return PatternBuilder.build((Map) condition); + } else { + throw new IllegalArgumentException("Invalid condition"); + } } private Transformer buildTransformer(Map functionConfig) { // get transformerType from attributes - TransformerType transformerType = TransformerType.getItem((String) functionConfig.getOrDefault("transformerType", "")); + String transformerTypeStr = String.valueOf(functionConfig.getOrDefault("transformerType", "")).toUpperCase(); + TransformerType transformerType = TransformerType.getItem(transformerTypeStr); if (transformerType == null) { throw new IllegalArgumentException( "Invalid transformerType: '" + functionConfig.get("transformerType") @@ -218,20 +231,20 @@ private Transformer buildTransformer(Map functionConfig) { TransformerParam transformerParam = new TransformerParam(); transformerParam.setTransformerType(transformerType); - String value = (String) functionConfig.getOrDefault("value", ""); - String template = (String) functionConfig.getOrDefault("template", ""); + String value = String.valueOf(functionConfig.getOrDefault("value", "")); + String template = String.valueOf(functionConfig.getOrDefault("template", "")); switch (transformerType) { case CONSTANT: // check value - if (value.isEmpty()) { + if (StringUtils.isEmpty(value)) { throw new IllegalArgumentException("'value' is required for constant transformer"); } transformerParam.setValue(value); break; case TEMPLATE: // check value and template - if (value.isEmpty() || template.isEmpty()) { + if (StringUtils.isAnyEmpty(value, template)) { throw new IllegalArgumentException("'value' and 'template' are required for template transformer"); } transformerParam.setValue(value); @@ -299,17 +312,12 @@ private void startSourceConnector() throws Exception { public void stop() throws Exception { log.info("FunctionRuntime is stopping..."); -// if (isFailed) { -// reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL); -// } else { -// reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.COMPLETE); -// } - isRunning = false; sinkConnector.stop(); sourceConnector.stop(); sinkService.shutdown(); sourceService.shutdown(); + log.info("FunctionRuntime stopped."); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java similarity index 78% rename from eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java rename to eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java index 8a4e0564cc..0035999ecb 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringFunctionChain.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java @@ -17,17 +17,17 @@ package org.apache.eventmesh.runtime.function; -import org.apache.eventmesh.function.api.AbstractFunctionChain; -import org.apache.eventmesh.function.api.Function; +import org.apache.eventmesh.function.api.AbstractEventMeshFunctionChain; +import org.apache.eventmesh.function.api.EventMeshFunction; /** * ConnectRecord Function Chain. */ -public class StringFunctionChain extends AbstractFunctionChain { +public class StringEventMeshFunctionChain extends AbstractEventMeshFunctionChain { @Override public String apply(String content) { - for (Function function : functions) { + for (EventMeshFunction function : functions) { if (content == null) { break; } From c07d686787463810af9a6c47715ee8e7f73e4339 Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 18 Sep 2024 23:28:35 +0800 Subject: [PATCH 3/4] feat: update FunctionRuntime --- .../remote/response/FetchJobResponse.java | 3 + .../filter/patternbuild/PatternBuilder.java | 2 +- .../function/filter/PatternTest.java | 21 ++ .../function/transformer/JsonPathParser.java | 13 + .../transformer/TransformerBuilder.java | 20 +- .../function/transformer/TransformTest.java | 18 ++ .../runtime/function/FunctionRuntime.java | 229 +++++++++++++++--- 7 files changed, 276 insertions(+), 30 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 95d2d157e0..92056f5ed3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -25,6 +25,7 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; +import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; @@ -39,6 +40,8 @@ public class FetchJobResponse extends BaseRemoteResponse { private JobConnectorConfig connectorConfig; + private List> functionConfigs; + private List position; private TaskState state; diff --git a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java index 939d1b6d6f..60193a4efa 100644 --- a/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java @@ -59,7 +59,7 @@ public static Pattern build(Map conditionMap) { } return build(jsonNode); } catch (Exception e) { - throw new JsonException("INVALID_JSON_STRING", e); + throw new JsonException("INVALID_MAP", e); } } diff --git a/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java index b674a62395..bc0aeff4ea 100644 --- a/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java +++ b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java @@ -20,6 +20,11 @@ import org.apache.eventmesh.function.filter.pattern.Pattern; import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -144,4 +149,20 @@ public void testAnythingButFilter() { Assertions.assertEquals(false, res); } + @Test + public void testPrefixFilterMap() { + // Create the inner Map representing {prefix=eventmesh.} + Map innerMap = new HashMap<>(); + innerMap.put("prefix", "eventmesh."); + // Create a List representing [{prefix=eventmesh.}] + List> sourceList = Collections.singletonList(innerMap); + // Create the condition representing {source=[{prefix=eventmesh.}]} + Map condition = new HashMap<>(); + condition.put("source", sourceList); + + Pattern pattern = PatternBuilder.build(condition); + Boolean res = pattern.filter(event); + Assertions.assertEquals(true, res); + } + } diff --git a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java index 6b66221c15..c578310dc4 100644 --- a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java @@ -35,6 +35,19 @@ public List getVariablesList() { return variablesList; } + /** + * parser input jsonpath map into variable list + * + * @param jsonPathMap jsonpath map + */ + public JsonPathParser(Map jsonPathMap) { + for (Map.Entry entry : jsonPathMap.entrySet()) { + String name = entry.getKey(); + String value = entry.getValue(); + variablesList.add(new Variable(name, value)); + } + } + /** * parser input jsonpath string into variable list * diff --git a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java index 4eb0a818e6..916f1ef7bc 100644 --- a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.function.transformer; +import java.util.Map; + public class TransformerBuilder { public static Transformer buildTransformer(TransformerParam transformerParam) { @@ -32,9 +34,23 @@ public static Transformer buildTransformer(TransformerParam transformerParam) { } } - public static Transformer buildTemplateTransFormer(String jsonContent, String template) { - JsonPathParser jsonPathParser = new JsonPathParser(jsonContent); + /** + * build template transformer + * @param jsonContent json content, support string and map, other type will throw IllegalArgumentException + * @param template template string + * @return transformer + */ + @SuppressWarnings("unchecked") + public static Transformer buildTemplateTransFormer(Object jsonContent, String template) { Template templateEntry = new Template(template); + JsonPathParser jsonPathParser; + if (jsonContent instanceof String) { + jsonPathParser = new JsonPathParser((String) jsonContent); + } else if (jsonContent instanceof Map) { + jsonPathParser = new JsonPathParser((Map) jsonContent); + } else { + throw new TransformException("invalid json content"); + } return new TemplateTransformer(jsonPathParser, templateEntry); } diff --git a/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java index 5fefd6b8bb..f9a444e8f9 100644 --- a/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java +++ b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java @@ -17,6 +17,9 @@ package org.apache.eventmesh.function.transformer; +import java.util.Collections; +import java.util.Map; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -139,4 +142,19 @@ public void testTemplateTransFormerWithConstant() throws JsonProcessingException output); } + @Test + public void testTemplateTransFormerWithStringValueMap() throws JsonProcessingException { + Map content = Collections.singletonMap("data-name", "$.data.name"); + + String template = "Transformers test:data name is ${data-name}"; + Transformer transform = TransformerBuilder.buildTemplateTransFormer(content, template); + String output = transform.transform(EVENT); + Assertions.assertEquals("Transformers test:data name is test-transformer", output); + + Transformer transformer1 = TransformerBuilder.buildTemplateTransFormer(content, template); + String output1 = transformer1.transform(EVENT); + Assertions.assertEquals("Transformers test:data name is test-transformer", output1); + + } + } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java index 70f2eed124..8a827bbcdc 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java @@ -21,14 +21,26 @@ import org.apache.eventmesh.common.config.ConfigService; import org.apache.eventmesh.common.config.connector.SinkConfig; import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.JobType; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.function.api.AbstractEventMeshFunctionChain; import org.apache.eventmesh.function.api.EventMeshFunction; import org.apache.eventmesh.function.filter.pattern.Pattern; import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; import org.apache.eventmesh.function.transformer.Transformer; import org.apache.eventmesh.function.transformer.TransformerBuilder; -import org.apache.eventmesh.function.transformer.TransformerParam; import org.apache.eventmesh.function.transformer.TransformerType; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; @@ -46,12 +58,20 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Random; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; import lombok.extern.slf4j.Slf4j; @@ -60,6 +80,16 @@ public class FunctionRuntime implements Runtime { private final RuntimeInstanceConfig runtimeInstanceConfig; + private ManagedChannel channel; + + private AdminServiceStub adminServiceStub; + + private AdminServiceBlockingStub adminServiceBlockingStub; + + StreamObserver responseObserver; + + StreamObserver requestObserver; + private final LinkedBlockingQueue queue; private FunctionRuntimeConfig functionRuntimeConfig; @@ -74,10 +104,14 @@ public class FunctionRuntime implements Runtime { private final ExecutorService sinkService = ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService"); + private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(); + private volatile boolean isRunning = false; private volatile boolean isFailed = false; + private String adminServerAddr; + public FunctionRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; @@ -90,14 +124,106 @@ public void init() throws Exception { // load function runtime config from local file this.functionRuntimeConfig = ConfigService.getInstance().buildConfigInstance(FunctionRuntimeConfig.class); - // TODO init admin service + // init admin service + initAdminService(); - // TODO get remote config from admin service and update local config + // get remote config from admin service and update local config + getAndUpdateRemoteConfig(); // init connector service initConnectorService(); + + // report status to admin server + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.INIT); + } + + private void initAdminService() { + adminServerAddr = getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr()); + // create gRPC channel + channel = ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build(); + + adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady(); + + adminServiceBlockingStub = AdminServiceGrpc.newBlockingStub(channel).withWaitForReady(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.info("runtime receive message: {} ", response); + } + + @Override + public void onError(Throwable t) { + log.error("runtime receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("runtime finished receive message and completed"); + } + }; + + requestObserver = adminServiceStub.invokeBiStream(responseObserver); + } + + private String getRandomAdminServerAddr(String adminServerAddrList) { + String[] addresses = adminServerAddrList.split(";"); + if (addresses.length == 0) { + throw new IllegalArgumentException("Admin server address list is empty"); + } + Random random = new Random(); + int randomIndex = random.nextInt(addresses.length); + return addresses[randomIndex]; } + private void getAndUpdateRemoteConfig() { + String jobId = functionRuntimeConfig.getJobID(); + FetchJobRequest jobRequest = new FetchJobRequest(); + jobRequest.setJobID(jobId); + + Metadata metadata = Metadata.newBuilder().setType(FetchJobRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(jobRequest)))).build()) + .build(); + Payload response = adminServiceBlockingStub.invoke(request); + FetchJobResponse jobResponse = null; + if (response.getMetadata().getType().equals(FetchJobResponse.class.getSimpleName())) { + jobResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchJobResponse.class); + } + + if (jobResponse == null || jobResponse.getErrorCode() != ErrorCode.SUCCESS) { + if (jobResponse != null) { + log.error("Failed to get remote config from admin server. ErrorCode: {}, Response: {}", + jobResponse.getErrorCode(), jobResponse); + } else { + log.error("Failed to get remote config from admin server. "); + } + isFailed = true; + try { + stop(); + } catch (Exception e) { + log.error("Failed to stop after exception", e); + } + throw new RuntimeException("Failed to get remote config from admin server."); + } + + // update local config + // source + functionRuntimeConfig.setSourceConnectorType(jobResponse.getTransportType().getSrc().getName()); + functionRuntimeConfig.setSourceConnectorDesc(jobResponse.getConnectorConfig().getSourceConnectorDesc()); + functionRuntimeConfig.setSourceConnectorConfig(jobResponse.getConnectorConfig().getSourceConnectorConfig()); + + // sink + functionRuntimeConfig.setSinkConnectorType(jobResponse.getTransportType().getDst().getName()); + functionRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc()); + functionRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig()); + + // function + functionRuntimeConfig.setFunctionConfigs(jobResponse.getFunctionConfigs()); + } + + private void initConnectorService() throws Exception { final JobType jobType = (JobType) functionRuntimeConfig.getRuntimeConfig().get("jobType"); @@ -129,13 +255,48 @@ private void initConnectorService() throws Exception { sourceConnector.init(sourceConnectorContext); } + private void reportJobRequest(String jobId, JobState jobState) throws InterruptedException { + ReportJobRequest reportJobRequest = new ReportJobRequest(); + reportJobRequest.setJobID(jobId); + reportJobRequest.setState(jobState); + Metadata metadata = Metadata.newBuilder() + .setType(ReportJobRequest.class.getSimpleName()) + .build(); + Payload payload = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest)))) + .build()) + .build(); + requestObserver.onNext(payload); + } + + @Override public void start() throws Exception { + this.isRunning = true; + // build function chain this.functionChain = buildFunctionChain(functionRuntimeConfig.getFunctionConfigs()); + // start heart beat + this.heartBeatExecutor.scheduleAtFixedRate(() -> { + + ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest(); + heartBeat.setAddress(IPUtils.getLocalAddress()); + heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis())); + heartBeat.setJobID(functionRuntimeConfig.getJobID()); + + Metadata metadata = Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build()) + .build(); + + requestObserver.onNext(request); + }, 5, 5, TimeUnit.SECONDS); + // start sink service - sinkService.execute(() -> { + this.sinkService.execute(() -> { try { startSinkConnector(); } catch (Exception e) { @@ -151,7 +312,7 @@ public void start() throws Exception { }); // start source service - sourceService.execute(() -> { + this.sourceService.execute(() -> { try { startSourceConnector(); } catch (Exception e) { @@ -165,6 +326,8 @@ public void start() throws Exception { throw new RuntimeException(e); } }); + + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.RUNNING); } private StringEventMeshFunctionChain buildFunctionChain(List> functionConfigs) { @@ -203,11 +366,9 @@ private StringEventMeshFunctionChain buildFunctionChain(List private Pattern buildFilter(Map functionConfig) { // get condition from attributes Object condition = functionConfig.get("condition"); - if (condition == null) { throw new IllegalArgumentException("'condition' is required for filter function"); } - if (condition instanceof String) { return PatternBuilder.build(String.valueOf(condition)); } else if (condition instanceof Map) { @@ -219,43 +380,44 @@ private Pattern buildFilter(Map functionConfig) { private Transformer buildTransformer(Map functionConfig) { // get transformerType from attributes - String transformerTypeStr = String.valueOf(functionConfig.getOrDefault("transformerType", "")).toUpperCase(); + String transformerTypeStr = String.valueOf(functionConfig.getOrDefault("transformerType", "")).toLowerCase(); TransformerType transformerType = TransformerType.getItem(transformerTypeStr); if (transformerType == null) { throw new IllegalArgumentException( - "Invalid transformerType: '" + functionConfig.get("transformerType") - + "'. Supported transformerType: 'CONSTANT', 'TEMPLATE', 'ORIGINAL'"); + "Invalid transformerType: '" + transformerTypeStr + + "'. Supported transformerType: 'constant', 'template', 'original' (case insensitive)"); } // build transformer - TransformerParam transformerParam = new TransformerParam(); - transformerParam.setTransformerType(transformerType); - - String value = String.valueOf(functionConfig.getOrDefault("value", "")); - String template = String.valueOf(functionConfig.getOrDefault("template", "")); + Transformer transformer = null; switch (transformerType) { case CONSTANT: // check value - if (StringUtils.isEmpty(value)) { - throw new IllegalArgumentException("'value' is required for constant transformer"); + String content = String.valueOf(functionConfig.getOrDefault("content", "")); + if (StringUtils.isEmpty(content)) { + throw new IllegalArgumentException("'content' is required for constant transformer"); } - transformerParam.setValue(value); + transformer = TransformerBuilder.buildConstantTransformer(content); break; case TEMPLATE: // check value and template - if (StringUtils.isAnyEmpty(value, template)) { - throw new IllegalArgumentException("'value' and 'template' are required for template transformer"); + Object valueMap = functionConfig.get("valueMap"); + String template = String.valueOf(functionConfig.getOrDefault("template", "")); + if (valueMap == null || StringUtils.isEmpty(template)) { + throw new IllegalArgumentException("'valueMap' and 'template' are required for template transformer"); } - transformerParam.setValue(value); - transformerParam.setTemplate(template); + transformer = TransformerBuilder.buildTemplateTransFormer(valueMap, template); break; - default: - // ORIGINAL doesn't need value and template + case ORIGINAL: + // ORIGINAL transformer does not need any parameter break; + default: + throw new IllegalArgumentException( + "Invalid transformerType: '" + transformerType + "', supported transformerType: 'CONSTANT', 'TEMPLATE', 'ORIGINAL'"); } - return TransformerBuilder.buildTransformer(transformerParam); + return transformer; } @@ -313,10 +475,23 @@ public void stop() throws Exception { log.info("FunctionRuntime is stopping..."); isRunning = false; + + if (isFailed) { + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.FAIL); + } else { + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.COMPLETE); + } + sinkConnector.stop(); sourceConnector.stop(); sinkService.shutdown(); sourceService.shutdown(); + heartBeatExecutor.shutdown(); + + requestObserver.onCompleted(); + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } log.info("FunctionRuntime stopped."); } From 63679c96281ecc69ee0e7fcc0489fa7a13f56b5f Mon Sep 17 00:00:00 2001 From: zaki Date: Tue, 24 Sep 2024 22:06:29 +0800 Subject: [PATCH 4/4] feat: update FunctionRuntime --- .../common/remote/response/FetchJobResponse.java | 3 --- .../eventmesh/runtime/function/FunctionRuntime.java | 11 ++++++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 92056f5ed3..95d2d157e0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -25,7 +25,6 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; -import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; @@ -40,8 +39,6 @@ public class FetchJobResponse extends BaseRemoteResponse { private JobConnectorConfig connectorConfig; - private List> functionConfigs; - private List position; private TaskState state; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java index 8a827bbcdc..4a68001909 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java @@ -219,8 +219,8 @@ private void getAndUpdateRemoteConfig() { functionRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc()); functionRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig()); - // function - functionRuntimeConfig.setFunctionConfigs(jobResponse.getFunctionConfigs()); + // TODO: update functionConfigs + } @@ -255,7 +255,7 @@ private void initConnectorService() throws Exception { sourceConnector.init(sourceConnectorContext); } - private void reportJobRequest(String jobId, JobState jobState) throws InterruptedException { + private void reportJobRequest(String jobId, JobState jobState) { ReportJobRequest reportJobRequest = new ReportJobRequest(); reportJobRequest.setJobID(jobId); reportJobRequest.setState(jobState); @@ -460,8 +460,13 @@ private void startSourceConnector() throws Exception { // Apply function chain to data String data = functionChain.apply((String) connectRecord.getData()); if (data != null) { + if (log.isDebugEnabled()) { + log.debug("Function chain applied. Original data: {}, Transformed data: {}", connectRecord.getData(), data); + } connectRecord.setData(data); this.queue.put(connectRecord); + } else if (log.isDebugEnabled()) { + log.debug("Data filtered out by function chain. Original data: {}", connectRecord.getData()); } } }