diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java index ccebe5a998..65fc8fe72d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java @@ -39,8 +39,8 @@ public class SinkConnectorConfig { // timeunit: ms, default 5000ms private int idleTimeout = 5000; - // maximum number of HTTP/1 connections a client will pool, default 5 - private int maxConnectionPoolSize = 5; + // maximum number of HTTP/1 connections a client will pool, default 50 + private int maxConnectionPoolSize = 50; // retry config private HttpRetryConfig retryConfig = new HttpRetryConfig(); @@ -48,6 +48,15 @@ public class SinkConnectorConfig { // webhook config private HttpWebhookConfig webhookConfig = new HttpWebhookConfig(); + private String deliveryStrategy = "ROUND_ROBIN"; + + private boolean skipDeliverException = false; + + // managed pipelining param, default true + private boolean isParallelized = true; + + private int parallelism = 2; + /** * Fill default values if absent (When there are multiple default values for a field) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java index 282f883332..2c091e321a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java @@ -44,12 +44,18 @@ public class SourceConnectorConfig { */ private int maxFormAttributeSize = 1024 * 1024; - // protocol, default Common + // max size of the queue, default 1000 + private int maxStorageSize = 1000; + + // batch size, default 10 + private int batchSize = 10; + + // protocol, default CloudEvent private String protocol = "Common"; // extra config, e.g. GitHub secret private Map extraConfig = new HashMap<>(); // data consistency enabled, default true - private boolean dataConsistencyEnabled = false; + private boolean dataConsistencyEnabled = true; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java index 9989552d1e..0564e58734 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java @@ -142,6 +142,7 @@ public synchronized List fetchRange(int start, int end, boolean removed) { count++; } return items; + } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java index 3df110f2e7..8e808ccc93 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.http.sink; +import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig; import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; @@ -32,6 +33,10 @@ import java.util.List; import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; import lombok.SneakyThrows; @@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink, ConnectorCreateService { @Getter private HttpSinkHandler sinkHandler; + private ThreadPoolExecutor executor; + + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(10000); + + private final AtomicBoolean isStart = new AtomicBoolean(true); + @Override public Class configClass() { return HttpSinkConfig.class; @@ -90,11 +101,30 @@ private void doInit() { } else { throw new IllegalArgumentException("Max retries must be greater than or equal to 0."); } + boolean isParallelized = this.httpSinkConfig.connectorConfig.isParallelized(); + int parallelism = isParallelized ? this.httpSinkConfig.connectorConfig.getParallelism() : 1; + executor = new ThreadPoolExecutor(parallelism, parallelism, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("http-sink-handler")); } @Override public void start() throws Exception { this.sinkHandler.start(); + for (int i = 0; i < this.httpSinkConfig.connectorConfig.getParallelism(); i++) { + executor.execute(() -> { + while (isStart.get()) { + ConnectRecord connectRecord = null; + try { + connectRecord = queue.poll(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (connectRecord != null) { + sinkHandler.handle(connectRecord); + } + } + }); + } } @Override @@ -114,7 +144,18 @@ public void onException(ConnectRecord record) { @Override public void stop() throws Exception { + isStart.set(false); + while (!queue.isEmpty()) { + ConnectRecord record = queue.poll(); + this.sinkHandler.handle(record); + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } this.sinkHandler.stop(); + log.info("All tasks completed, start shut down http sink connector"); } @Override @@ -125,8 +166,7 @@ public void put(List sinkRecords) { log.warn("ConnectRecord data is null, ignore."); continue; } - // Handle the ConnectRecord - this.sinkHandler.handle(sinkRecord); + queue.put(sinkRecord); } catch (Exception e) { log.error("Failed to sink message via HTTP. ", e); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java index 41a5087870..111ee6b3e9 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java @@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable { private LocalDateTime receivedTime; - private String httpRecordId; - private String recordId; private String retriedBy; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java index 28ba791127..9ef760617c 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java @@ -30,17 +30,26 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import lombok.Getter; + /** * AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler. */ public abstract class AbstractHttpSinkHandler implements HttpSinkHandler { + @Getter private final SinkConnectorConfig sinkConnectorConfig; + @Getter private final List urls; + private final HttpDeliveryStrategy deliveryStrategy; + + private int roundRobinIndex = 0; + protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { this.sinkConnectorConfig = sinkConnectorConfig; + this.deliveryStrategy = HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy()); // Initialize URLs String[] urlStrings = sinkConnectorConfig.getUrls(); this.urls = Arrays.stream(urlStrings) @@ -48,14 +57,6 @@ protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { .collect(Collectors.toList()); } - public SinkConnectorConfig getSinkConnectorConfig() { - return sinkConnectorConfig; - } - - public List getUrls() { - return urls; - } - /** * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. * @@ -65,23 +66,38 @@ public List getUrls() { public void handle(ConnectRecord record) { // build attributes Map attributes = new ConcurrentHashMap<>(); - attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size())); - - // send the record to all URLs - for (URI url : urls) { - // convert ConnectRecord to HttpConnectRecord - String type = String.format("%s.%s.%s", - this.sinkConnectorConfig.getConnectorName(), url.getScheme(), - this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); - HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - - // add AttemptEvent to the attributes - HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1); - attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent); - - // deliver the record - deliver(url, httpConnectRecord, attributes, record); + + switch (deliveryStrategy) { + case ROUND_ROBIN: + attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(1)); + URI url = urls.get(roundRobinIndex); + roundRobinIndex = (roundRobinIndex + 1) % urls.size(); + sendRecordToUrl(record, attributes, url); + break; + case BROADCAST: + for (URI broadcastUrl : urls) { + attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size())); + sendRecordToUrl(record, attributes, broadcastUrl); + } + break; + default: + throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy); } } + private void sendRecordToUrl(ConnectRecord record, Map attributes, URI url) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", + this.sinkConnectorConfig.getConnectorName(), url.getScheme(), + this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + + // add AttemptEvent to the attributes + HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1); + attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent); + + // deliver the record + deliver(url, httpConnectRecord, attributes, record); + } + } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java new file mode 100644 index 0000000000..2e770eb120 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handler; + +public enum HttpDeliveryStrategy { + ROUND_ROBIN, + BROADCAST +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java index 61bdc9f310..0b57cc06ef 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java @@ -93,7 +93,8 @@ private void doInitWebClient() { .setIdleTimeout(sinkConnectorConfig.getIdleTimeout()) .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout()) - .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize()); + .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize()) + .setPipelining(sinkConnectorConfig.isParallelized()); this.webClient = WebClient.create(vertx, options); } @@ -108,7 +109,7 @@ private void doInitWebClient() { */ @Override public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes, - ConnectRecord connectRecord) { + ConnectRecord connectRecord) { // create headers Map extensionMap = new HashMap<>(); Set extensionKeySet = httpConnectRecord.getExtensions().keySet(); @@ -203,6 +204,9 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map queue; - private int maxBatchSize; - - private long maxPollWaitTime; + private int batchSize; private Route route; @@ -94,11 +94,11 @@ public void init(ConnectorContext connectorContext) { private void doInit() { // init queue - this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize(); + this.queue = new LinkedBlockingQueue<>(maxQueueSize); - // init poll batch size and timeout - this.maxBatchSize = this.sourceConfig.getPollConfig().getMaxBatchSize(); - this.maxPollWaitTime = this.sourceConfig.getPollConfig().getMaxWaitTime(); + // init batch size + this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize(); // init protocol String protocolName = this.sourceConfig.getConnectorConfig().getProtocol(); @@ -136,14 +136,17 @@ public void start() { @Override public void commit(ConnectRecord record) { - if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) { - this.route.handler(ctx -> { - // Return 200 OK - ctx.response() + if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) { + log.debug("HttpSourceConnector commit record: {}", record.getRecordId()); + RoutingContext routingContext = (RoutingContext) record.getExtensionObj("routingContext"); + if (routingContext != null) { + routingContext.response() .putHeader("content-type", "application/json") .setStatusCode(HttpResponseStatus.OK.code()) - .end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}"); - }); + .end(CommonResponse.success().toJsonStr()); + } else { + log.error("Failed to commit the record, routingContext is null, recordId: {}", record.getRecordId()); + } } } @@ -185,13 +188,13 @@ public void stop() { @Override public List poll() { - // record current time long startTime = System.currentTimeMillis(); + long maxPollWaitTime = 5000; long remainingTime = maxPollWaitTime; // poll from queue - List connectRecords = new ArrayList<>(maxBatchSize); - for (int i = 0; i < maxBatchSize; i++) { + List connectRecords = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { try { Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (obj == null) { @@ -206,8 +209,9 @@ public List poll() { remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (Exception e) { log.error("Failed to poll from queue.", e); - break; + throw new RuntimeException(e); } + } return connectRecords; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java index a44ed0e90c..10158f6eba 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java @@ -57,7 +57,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) { /** * Handle the protocol message for CloudEvent. * - * @param route route + * @param route route * @param queue queue info */ @Override