diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle new file mode 100644 index 0000000000..b2c45b620f --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + api project(":eventmesh-openconnect:eventmesh-openconnect-java") + implementation project(":eventmesh-common") + implementation 'com.theokanning.openai-gpt3-java:service:0.18.2' + implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0' + implementation 'io.vertx:vertx-web:4.4.6' + + testImplementation "org.apache.httpcomponents:httpclient" + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties new file mode 100644 index 0000000000..715bad3de4 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +pluginType=connector +pluginName=chatgpt \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java new file mode 100644 index 0000000000..7d162920d7 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.config; + +import org.apache.eventmesh.openconnect.api.config.Config; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class ChatGPTServerConfig extends Config { + + private boolean sourceEnable; + + private boolean sinkEnable; +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java new file mode 100644 index 0000000000..ca104fe562 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.server; + +import org.apache.eventmesh.connector.chatgpt.config.ChatGPTServerConfig; +import org.apache.eventmesh.connector.chatgpt.source.connector.ChatGPTSourceConnector; +import org.apache.eventmesh.openconnect.Application; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +public class ChatGPTConnectServer { + + public static void main(String[] args) throws Exception { + ChatGPTServerConfig serverConfig = ConfigUtil.parse(ChatGPTServerConfig.class, "server-config.yml"); + + if (serverConfig.isSourceEnable()) { + Application chatGPTSourceApp = new Application(); + chatGPTSourceApp.run(ChatGPTSourceConnector.class); + } + + if (serverConfig.isSinkEnable()) { + // TODO support sink connector + } + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java new file mode 100644 index 0000000000..9596866910 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.config; + +import org.apache.eventmesh.openconnect.api.config.SourceConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class ChatGPTSourceConfig extends SourceConfig { + + public ChatGPTSourceConnectorConfig connectorConfig; + + public OpenaiProxyConfig openaiProxyConfig; + + public OpenaiConfig openaiConfig; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java new file mode 100644 index 0000000000..316fb5f241 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.config; + +import lombok.Data; + +@Data +public class ChatGPTSourceConnectorConfig { + + private String connectorName = "chatgptSource"; + + private String path = "/chatgpt"; + + private int port = 3756; + + private int idleTimeout; + + private boolean proxyEnable = false; + + private String parsePromptFileName = "prompt"; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java new file mode 100644 index 0000000000..51858a709a --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.config; + + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class OpenaiConfig { + + private String token; + private String model = "gpt-3.5-turbo"; + private long timeout; + private Double temperature; + private Integer maxTokens; + private Boolean logprob; + private Double topLogprobs; + private Map logitBias; + private Double frequencyPenalty; + private Double presencePenalty; + private String user = "eventMesh"; + private List stop; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java new file mode 100644 index 0000000000..14dd69f350 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.config; + +import lombok.Data; + +@Data +public class OpenaiProxyConfig { + + private String host; + + private int port; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java new file mode 100644 index 0000000000..a947bc135d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.connector; + +import org.apache.eventmesh.common.ThreadPoolFactory; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType; +import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler; +import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.CloudEventUtil; + +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.cloudevents.CloudEvent; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ChatGPTSourceConnector implements Source { + + private static final int DEFAULT_BATCH_SIZE = 10; + + private ChatGPTSourceConfig sourceConfig; + private BlockingQueue queue; + private HttpServer server; + private final ExecutorService chatgptSourceExecutorService = + ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, + "ChatGPTSourceThread"); + + private OpenaiManager openaiManager; + private String parsePromptTemplateStr; + private ChatHandler chatHandler; + private ParseHandler parseHandler; + private static final int DEFAULT_TIMEOUT = 0; + + private static final String APPLICATION_JSON = "application/json"; + private static final String TEXT_PLAIN = "text/plain"; + + + @Override + public Class configClass() { + return ChatGPTSourceConfig.class; + } + + @Override + public void init(Config config) { + this.sourceConfig = (ChatGPTSourceConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) { + SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; + this.sourceConfig = (ChatGPTSourceConfig) sourceConnectorContext.getSourceConfig(); + doInit(); + } + + public void initParsePrompt() { + String parsePromptFileName = sourceConfig.getConnectorConfig().getParsePromptFileName(); + URL resource = Thread.currentThread().getContextClassLoader().getResource(parsePromptFileName); + if (resource == null) { + log.warn("cannot find prompt file {} in resources", parsePromptFileName); + return; + } + String filePath = resource.getPath(); + try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { + StringBuilder builder = new StringBuilder(); + String line; + while ((line = br.readLine()) != null) { + if (!line.startsWith("#") && StringUtils.isNotBlank(line)) { + builder.append(line).append("\n"); + } + } + this.parsePromptTemplateStr = builder.toString(); + } catch (IOException e) { + throw new IllegalStateException("Unable to read file", e); + } + } + + + @SuppressWarnings("checkstyle:WhitespaceAround") + private void doInit() { + initParsePrompt(); + this.openaiManager = new OpenaiManager(sourceConfig); + this.chatHandler = new ChatHandler(this.openaiManager); + if (StringUtils.isNotEmpty(parsePromptTemplateStr)) { + this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr); + } + this.queue = new LinkedBlockingQueue<>(1024); + final Vertx vertx = Vertx.vertx(); + final Router router = Router.router(vertx); + router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> { + try { + RequestBody body = ctx.body(); + ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class); + validateRequestDTO(bodyObject); + handleRequest(bodyObject, ctx); + } catch (Exception e) { + handleError(e, ctx); + } + }); + if (sourceConfig.connectorConfig.getIdleTimeout() < 0) { + log.warn("idleTimeout must be >= 0, your config value is {}, idleTimeout will be reset {}", sourceConfig.connectorConfig.getIdleTimeout(), + DEFAULT_TIMEOUT); + sourceConfig.connectorConfig.setIdleTimeout(DEFAULT_TIMEOUT); + } + this.server = vertx.createHttpServer(new HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort()) + .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router); + } + + + private void validateRequestDTO(ChatGPTRequestDTO bodyObject) { + if (StringUtils.isBlank(bodyObject.getText())) { + throw new IllegalArgumentException("Attributes 'text' cannot be null"); + } + } + + private void handleRequest(ChatGPTRequestDTO bodyObject, RoutingContext ctx) { + chatgptSourceExecutorService.execute(() -> { + try { + ChatGPTRequestType chatgptRequestType = ChatGPTRequestType.valueOf(bodyObject.getRequestType()); + CloudEvent cloudEvent = invokeHandler(chatgptRequestType, bodyObject); + queue.add(cloudEvent); + log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); + ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + } catch (IllegalArgumentException e) { + log.error("[ChatGPTSourceConnector] the request type is illegal: {}", e.getMessage(), e); + ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()) + .setStatusMessage(String.format("request type '%s' is not supported", bodyObject.getRequestType())).end(); + } catch (Exception e) { + log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e); + ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end(); + } + }); + } + + private CloudEvent invokeHandler(ChatGPTRequestType chatgptRequestType, ChatGPTRequestDTO bodyObject) { + switch (chatgptRequestType) { + case CHAT: + if (StringUtils.isBlank(bodyObject.getDataContentType())) { + bodyObject.setDataContentType(TEXT_PLAIN); + } + return chatHandler.invoke(bodyObject); + case PARSE: + if (StringUtils.isBlank(parsePromptTemplateStr)) { + throw new IllegalStateException( + "the request type of PARSE must be configured with the correct parsePromptFileName in source-config.yml"); + } + if (StringUtils.isBlank(bodyObject.getFields())) { + throw new IllegalStateException("Attributes 'fields' cannot be null in PARSE"); + } + if (StringUtils.isBlank(bodyObject.getDataContentType())) { + bodyObject.setDataContentType(APPLICATION_JSON); + } + return parseHandler.invoke(bodyObject); + default: + throw new IllegalStateException("the request type is illegal"); + } + } + + private void handleError(Exception e, RoutingContext ctx) { + log.error("[ChatGPTSourceConnector] Malformed request.", e); + ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); + } + + @Override + public void start() { + Throwable t = this.server.listen().cause(); + if (t != null) { + throw new EventMeshException("failed to start Vertx server", t); + } + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return this.sourceConfig.getConnectorConfig().getConnectorName(); + } + + @Override + public void stop() { + Throwable t = this.server.close().cause(); + if (t != null) { + throw new EventMeshException("failed to stop Vertx server", t); + } + } + + @Override + public List poll() { + List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); + for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { + try { + CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + if (event == null) { + break; + } + connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + } catch (InterruptedException e) { + break; + } + } + return connectRecords; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java new file mode 100644 index 0000000000..a203a24e5a --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.dto; + +import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType; + +import java.time.ZonedDateTime; +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ChatGPTRequestDTO { + + private String requestType = ChatGPTRequestType.CHAT.name(); + + private String source = "/"; + + private String subject = "chatGPT"; + + @JsonProperty("datacontenttype") + private String dataContentType; + + private String type = "cloudevents"; + + private String text; + + private String fields; + + @JsonInclude + private String id = UUID.randomUUID().toString(); + + @JsonInclude + private String time = ZonedDateTime.now().toOffsetDateTime().toString(); + + public String getFields() { + return fields.replace(";", "\n"); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java new file mode 100644 index 0000000000..9930525651 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.enums; + + +public enum ChatGPTRequestType { + + CHAT, PARSE; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java new file mode 100644 index 0000000000..6d79a0559f --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.handlers; + + +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.completion.chat.ChatMessageRole; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ChatHandler { + + private final OpenaiManager openaiManager; + + public ChatHandler(OpenaiManager openaiManager) { + this.openaiManager = openaiManager; + } + + public CloudEvent invoke(ChatGPTRequestDTO event) { + return genGptConnectRecord(event); + } + + private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) { + List chatMessages = new ArrayList<>(); + chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getText())); + ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages); + String chatResult = openaiManager.getResult(req); + + return CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create(event.getSource())) + .withType(event.getType()) + .withTime(ZonedDateTime.now().toOffsetDateTime()) + .withData(chatResult.getBytes()) + .withSubject(event.getSubject()) + .withDataContentType(event.getDataContentType()) + .build(); + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java new file mode 100644 index 0000000000..aad3d384cf --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.handlers; + + +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.cloudevents.CloudEvent; +import io.cloudevents.jackson.JsonFormat; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.completion.chat.ChatMessageRole; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ParseHandler { + + private final OpenaiManager openaiManager; + + private final String promptTemplate; + + private static final JsonFormat jsonFormat = new JsonFormat(false, true); + + + public ParseHandler(OpenaiManager openaiManager, String promptTemplate) { + this.openaiManager = openaiManager; + this.promptTemplate = promptTemplate; + } + + @SuppressWarnings("checkstyle:WhitespaceAfter") + public CloudEvent invoke(ChatGPTRequestDTO event) { + Map map = convertToMap(event); + + StringSubstitutor substitute = new StringSubstitutor(map); + String finalPrompt = substitute.replace(promptTemplate); + List chatMessages = new ArrayList<>(); + chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), finalPrompt)); + ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages); + String chatResult = openaiManager.getResult(req); + chatResult = StringUtils.removeFirst(chatResult, "```json"); + chatResult = StringUtils.removeEnd(chatResult, "```"); + CloudEvent cloudEvent; + try { + cloudEvent = jsonFormat.deserialize(chatResult.getBytes(Constants.DEFAULT_CHARSET)); + } catch (Exception e) { + throw new IllegalStateException("cloudEvent parse fail, please check your parse prompt file content", e); + } + return cloudEvent; + } + + public Map convertToMap(Object obj) { + Map map = new HashMap<>(); + Class clazz = obj.getClass(); + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.isSynthetic()) { + continue; + } + if (Map.class.isAssignableFrom(field.getType()) || List.class.isAssignableFrom(field.getType())) { + continue; + } + try { + String key = field.getName(); + if (field.isAnnotationPresent(JsonProperty.class)) { + JsonProperty annotation = field.getAnnotation(JsonProperty.class); + key = annotation.value(); + } + Method getter = getGetter(field, clazz); + map.put(key, String.valueOf(getter.invoke(obj))); + } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new IllegalStateException("convert to Map is fail", e); + } + } + + return map; + } + + public Method getGetter(Field field, Class clazz) throws NoSuchMethodException { + boolean isBooleanField = boolean.class.isAssignableFrom(field.getType()) || Boolean.class.isAssignableFrom(field.getType()); + String handledFieldName = upperFirst(field.getName()); + String methodName; + if (isBooleanField) { + methodName = "is" + handledFieldName; + } else { + methodName = "get" + handledFieldName; + } + return clazz.getDeclaredMethod(methodName); + } + + public String upperFirst(String str) { + if (null == str) { + return null; + } + if (!str.isEmpty()) { + char firstChar = str.charAt(0); + if (Character.isLowerCase(firstChar)) { + return Character.toUpperCase(firstChar) + StringUtils.substring(str, 1); + } + } + return str; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java new file mode 100644 index 0000000000..fda5216bbf --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.managers; + +import static com.theokanning.openai.service.OpenAiService.defaultClient; +import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper; +import static com.theokanning.openai.service.OpenAiService.defaultRetrofit; + +import org.apache.eventmesh.common.utils.AssertUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig; + +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.time.Duration; +import java.util.List; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.theokanning.openai.client.OpenAiApi; +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.service.OpenAiService; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import okhttp3.OkHttpClient; +import retrofit2.Retrofit; + + +@Slf4j +public class OpenaiManager { + + @Getter + private OpenAiService openAiService; + + private String chatCompletionRequestTemplateStr; + + private static final int DEFAULT_TIMEOUT = 0; + + public OpenaiManager(ChatGPTSourceConfig sourceConfig) { + initOpenAi(sourceConfig); + } + + public String getResult(ChatCompletionRequest req) { + StringBuilder gptData = new StringBuilder(); + try { + openAiService.createChatCompletion(req).getChoices() + .forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent())); + } catch (Exception e) { + log.error("Failed to generate GPT connection record: {}", e.getMessage()); + } + return gptData.toString(); + } + + public ChatCompletionRequest newChatCompletionRequest(List chatMessages) { + ChatCompletionRequest request = JsonUtils.parseObject(chatCompletionRequestTemplateStr, ChatCompletionRequest.class); + request.setMessages(chatMessages); + return request; + } + + private void initOpenAi(ChatGPTSourceConfig sourceConfig) { + OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); + if (openaiConfig.getTimeout() < 0) { + log.warn("openaiTimeout must be >= 0, your config value is {}, openaiTimeout will be reset {}", openaiConfig.getTimeout(), + DEFAULT_TIMEOUT); + openaiConfig.setTimeout(DEFAULT_TIMEOUT); + } + boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable(); + if (proxyEnable) { + OpenaiProxyConfig chatgptProxyConfig = sourceConfig.openaiProxyConfig; + if (chatgptProxyConfig.getHost() == null) { + throw new IllegalStateException("chatgpt proxy config 'host' cannot be null"); + } + ObjectMapper mapper = defaultObjectMapper(); + Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort())); + OkHttpClient client = + defaultClient(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())).newBuilder().proxy(proxy).build(); + Retrofit retrofit = defaultRetrofit(client, mapper); + OpenAiApi api = retrofit.create(OpenAiApi.class); + this.openAiService = new OpenAiService(api); + } else { + this.openAiService = new OpenAiService(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())); + } + ChatCompletionRequestBuilder builder = ChatCompletionRequest.builder().model(openaiConfig.getModel()); + AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null"); + builder = builder.model(openaiConfig.getModel()); + if (openaiConfig.getUser() != null) { + builder = builder.user(openaiConfig.getUser()); + } + if (openaiConfig.getPresencePenalty() != null) { + builder = builder.presencePenalty(openaiConfig.getPresencePenalty()); + } + if (openaiConfig.getFrequencyPenalty() != null) { + builder = builder.frequencyPenalty(openaiConfig.getFrequencyPenalty()); + } + if (openaiConfig.getMaxTokens() != null) { + builder = builder.maxTokens(openaiConfig.getMaxTokens()); + } + if (openaiConfig.getTemperature() != null) { + builder = builder.temperature(openaiConfig.getTemperature()); + } + if (openaiConfig.getLogitBias() != null && !openaiConfig.getLogitBias().isEmpty()) { + builder = builder.logitBias(openaiConfig.getLogitBias()); + } + if (openaiConfig.getStop() != null && !openaiConfig.getStop().isEmpty()) { + builder = builder.stop(openaiConfig.getStop()); + } + this.chatCompletionRequestTemplateStr = JsonUtils.toJSONString(builder.build()); + } + + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt new file mode 100644 index 0000000000..e10ecc331d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +You are an AI assistant named CloudEventsConverter. avoid escape characters . +Your task is to construct a JSON object in CloudEvents format. Based on the field name and field description in the 'data' field of the CloudEvents formatted JSON object, convert the input text provided by the user into the content of the 'data' field, which must comply with the specifications of the content of the 'datacontenttype' field. +The role is : + - If the 'datacontenttype' field content is 'application/json', then the' data 'field content should be a JSON object, + - else If the 'datacontenttype' field content is not 'application/json' and is 'application/xml', then the' data 'field content should be a string in XML format and the outermost of XML format is , inside is the XML generated by you based on field info; + - else the 'datacontenttype' field content is not 'application/json' and 'application/xml', then the' data 'field content is string of the 'text' field content; +Except for the content of the data field, all other values should be set to and cannot be modified. Finally, return to me the JSON object in CloudEvents format that you constructed + +The following text is the field name and field description in the 'data' field of the CloudEvents-formatted JSON object, extract the following information: + +${fields} + + +text: ${text} + +The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```": +```json +{ + "specversion": string, Set to "1.0" + "type": string, Set to ${type} + "source": string, Set to ${source} + "subject": string, Set to ${subject} + "id": string, Set to ${id} + "time": string, Set to ${time} + "datacontenttype": string, Set to ${datacontenttype} + "data": object or string +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml new file mode 100644 index 0000000000..0cd7b5b5ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml new file mode 100644 index 0000000000..b194e99ecb --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: chatgptSource + appId: 5032 + userName: chatgptSourceUser + passWord: chatgptPassWord +connectorConfig: + connectorName: chatgptSource + path: /chatgpt + port: 3756 + idleTimeout: 0 + proxyEnable: false + parsePromptFileName: prompt + +# https://platform.openai.com/docs/api-reference/chat/create +openaiConfig: + token: + model: gpt-3.5-turbo + timeout: 0 + temperature: 1 + maxTokens: + frequencyPenalty: 0 + presencePenalty: 0 + user: eventMesh + stop: [] + logitBias: {} + +openaiProxyConfig: + host: 127.0.0.1 + port: 7890 + diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java new file mode 100644 index 0000000000..8347fdcbb1 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.connector; + +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConnectorConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class ChatGPTSourceConnectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger("ChatGPTSourceConnectorTest"); + + private ChatGPTSourceConnector connector; + private ChatGPTSourceConnectorConfig config; + private CloseableHttpClient httpClient; + private String uri; + private final String expectedMessage = "Hello, can you tell me a story."; + + private final String expectedParseMessage = "User 13356288979 from Tianjin store placed an order with order number 11221122"; + + + public boolean checkOpenAi() throws Exception { + ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) ConfigUtil.parse(connector.configClass()); + OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); + if (StringUtils.isBlank(openaiConfig.getToken())) { + return false; + } + return true; + } + + @BeforeEach + void setUp() throws Exception { + connector = new ChatGPTSourceConnector(); + if (!checkOpenAi()) { + LOGGER.error("please set openai token in the config"); + return; + } + ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) ConfigUtil.parse(connector.configClass()); + config = sourceConfig.getConnectorConfig(); + connector.init(sourceConfig); + connector.start(); + + uri = new URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString(); + + httpClient = HttpClients.createDefault(); + } + + @Test + void testPoll() throws Exception { + ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) ConfigUtil.parse(connector.configClass()); + OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); + if (StringUtils.isBlank(openaiConfig.getToken())) { + LOGGER.error("please set openai token in the config"); + return; + } + + final int batchSize = 10; + + for (int i = 0; i < batchSize; i++) { + HttpResponse resp = mockStructuredChatRequest(); + Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); + } + + List res = connector.poll(); + Assertions.assertEquals(batchSize, res.size()); + + for (int i = 0; i < batchSize; i++) { + HttpResponse resp = mockStructuredParseRequest(); + Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); + } + + List res1 = connector.poll(); + Assertions.assertEquals(batchSize, res1.size()); + } + + + HttpResponse mockStructuredChatRequest() throws Exception { + TestEvent event = new TestEvent(); + event.type = "com.example.someevent"; + event.source = "/mycontext"; + event.subject = "test"; + event.datacontenttype = "text/plain"; + event.text = expectedMessage; + event.requestType = "CHAT"; + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); + + return httpClient.execute(httpPost); + } + + + HttpResponse mockStructuredParseRequest() throws Exception { + TestEvent event = new TestEvent(); + event.type = "com.example.someevent"; + event.source = "/mycontext"; + event.subject = "test"; + event.datacontenttype = "application/json"; + event.text = expectedParseMessage; + event.requestType = "PARSE"; + event.fields = "orderNo:this is order number;address:this is a address;phone:this is phone number"; + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); + return httpClient.execute(httpPost); + } + + @AfterEach + void tearDown() throws Exception { + if (!checkOpenAi()) { + return; + } + if (connector != null) { + connector.stop(); + } + if (httpClient != null) { + httpClient.close(); + } + } + + class TestEvent { + + public String requestType; + public String type; + public String source; + public String subject; + public String datacontenttype; + public String text; + public String fields; + } +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml new file mode 100644 index 0000000000..0cd7b5b5ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml new file mode 100644 index 0000000000..47f25edbb2 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: chatgptSource + appId: 5032 + userName: chatgptSourceUser + passWord: chatgptPassWord +connectorConfig: + connectorName: chatgptSource + path: /chatgpt + port: 3756 + idleTimeout: 0 + proxyEnable: true + parsePromptFileName: prompt + +# https://platform.openai.com/docs/api-reference/chat/create +openaiConfig: + token: + model: gpt-3.5-turbo + timeout: 0 + temperature: 1 + maxTokens: + frequencyPenalty: 0 + presencePenalty: 0 + user: eventMesh + stop: [] + logitBias: { + + } + +openaiProxyConfig: + host: 127.0.0.1 + port: 7890 diff --git a/settings.gradle b/settings.gradle index 645e6fb365..6a8e27bf98 100644 --- a/settings.gradle +++ b/settings.gradle @@ -77,6 +77,7 @@ include 'eventmesh-connectors:eventmesh-connector-wecom' include 'eventmesh-connectors:eventmesh-connector-slack' include 'eventmesh-connectors:eventmesh-connector-wechat' include 'eventmesh-connectors:eventmesh-connector-http' +include 'eventmesh-connectors:eventmesh-connector-chatgpt' include 'eventmesh-storage-plugin:eventmesh-storage-api' include 'eventmesh-storage-plugin:eventmesh-storage-standalone' diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index 05c99f3d37..73feb5fd3b 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -1,6 +1,7 @@ FastInfoset-1.2.15.jar ST4-4.3.4.jar accessors-smart-2.4.7.jar +adapter-rxjava2-2.9.0.jar alibabacloud-gateway-spi-0.0.1.jar amqp-client-5.16.0.jar animal-sniffer-annotations-1.19.jar @@ -12,6 +13,7 @@ antlr4-4.13.0.jar antlr4-runtime-4.13.0.jar aopalliance-1.0.jar apache-client-2.20.29.jar +api-0.18.2.jar arns-2.20.29.jar asm-9.1.jar asm-9.2.jar @@ -43,7 +45,9 @@ byte-buddy-1.11.0.jar byte-buddy-1.12.18.jar cache-api-1.1.1.jar checker-qual-3.12.0.jar +classgraph-4.8.21.jar classmate-1.5.1.jar +client-0.18.2.jar cloudevents-api-2.4.2.jar cloudevents-core-2.4.2.jar cloudevents-http-vertx-2.3.0.jar @@ -63,6 +67,7 @@ commons-logging-1.2.jar commons-text-1.9.jar commons-validator-1.7.jar consul-api-1.4.5.jar +converter-jackson-2.9.0.jar credentials-java-0.2.4.jar crt-core-2.20.29.jar curator-client-5.4.0.jar @@ -155,6 +160,7 @@ json-path-2.7.0.jar json-smart-2.4.7.jar json-utils-2.20.29.jar jsr305-3.0.2.jar +jtokkit-0.5.1.jar kafka-clients-3.0.0.jar listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar log4j-api-2.22.1.jar @@ -162,6 +168,7 @@ log4j-core-2.22.1.jar log4j-slf4j2-impl-2.22.1.jar lz4-java-1.7.1.jar lz4-java-1.8.0.jar +mbknor-jackson-jsonschema_2.12-1.0.34.jar metrics-annotation-4.1.0.jar metrics-core-4.1.0.jar metrics-healthchecks-4.1.0.jar @@ -306,6 +313,7 @@ reactor-core-3.4.13.jar redisson-3.17.3.jar regions-2.20.29.jar relaxngDatatype-20020414.jar +retrofit-2.9.0.jar rocketmq-acl-4.9.5.jar rocketmq-broker-4.9.5.jar rocketmq-client-4.9.5.jar @@ -317,9 +325,12 @@ rocketmq-remoting-4.9.5.jar rocketmq-srvutil-4.9.5.jar rocketmq-store-4.9.5.jar rocketmq-tools-4.9.5.jar +rxjava-2.0.0.jar rxjava-3.0.12.jar s3-2.20.29.jar +scala-library-2.12.8.jar sdk-core-2.20.29.jar +service-0.18.2.jar simpleclient-0.12.0.jar simpleclient_tracer_common-0.12.0.jar simpleclient_tracer_otel-0.12.0.jar @@ -352,6 +363,7 @@ tomcat-embed-el-9.0.56.jar txw2-2.3.1.jar utils-2.20.29.jar validation-api-1.1.0.Final.jar +validation-api-2.0.1.Final.jar vertx-auth-common-4.4.6.jar vertx-bridge-common-4.4.6.jar vertx-core-4.4.6.jar