From ead510baa524a68a0229fa705fda6f6688635724 Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 14 Aug 2024 15:12:41 +0800 Subject: [PATCH 1/4] feat: Support CallBack for ConnectRecord --- .../common/SynchronizedCircularFifoQueue.java | 3 +- .../http/sink/config/HttpRetryConfig.java | 4 +- .../http/sink/data/HttpRetryEvent.java | 67 ++++++++++ .../sink/handle/CommonHttpSinkHandler.java | 121 ++++++++++++------ .../http/sink/handle/HttpSinkHandler.java | 12 +- .../sink/handle/RetryHttpSinkHandler.java | 71 ++++------ .../sink/handle/WebhookHttpSinkHandler.java | 69 +++++++--- .../http/sink/HttpSinkConnectorTest.java | 2 +- 8 files changed, 233 insertions(+), 116 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java index 439a9f3d78..0564e58734 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java @@ -120,9 +120,10 @@ public synchronized int size() { */ public synchronized List fetchRange(int start, int end, boolean removed) { - if (start < 0 || end > this.size() || start > end) { + if (start < 0 || start > end) { throw new IllegalArgumentException("Invalid range"); } + end = Math.min(end, this.size()); Iterator iterator = this.iterator(); List items = new ArrayList<>(end - start); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java index 0bceac7d47..08c3a323e7 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java @@ -24,8 +24,8 @@ public class HttpRetryConfig { // maximum number of retries, default 2, minimum 0 private int maxRetries = 2; - // retry interval, default 2000ms - private int interval = 2000; + // retry interval, default 1000ms + private int interval = 1000; // Default value is false, indicating that only requests with network-level errors will be retried. // If set to true, all failed requests will be retried, including network-level errors and non-2xx responses. diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java new file mode 100644 index 0000000000..1666ede576 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +public class HttpRetryEvent { + + public static final String NAME = "http-retry-event"; + + private String parentId; + + private int maxRetries; + + private int currentRetries; + + private Throwable lastException; + + public void increaseCurrentRetries() { + this.currentRetries++; + } + + public boolean isMaxRetriesReached() { + return this.currentRetries >= this.maxRetries; + } + + public String getLimitedExceptionMessage() { + return getLimitedExceptionMessage(256); + } + + + public String getLimitedExceptionMessage(int maxLimit) { + if (lastException == null) { + return ""; + } + String message = lastException.getMessage(); + if (message == null) { + return ""; + } + if (message.length() > maxLimit) { + return message.substring(0, maxLimit); + } + return message; + } + + public void clearException() { + this.lastException = null; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java index 4bc365a139..8b5a45f948 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java @@ -20,6 +20,7 @@ import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.util.HttpUtils; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; @@ -27,6 +28,7 @@ import java.net.URI; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -113,19 +115,50 @@ public void handle(ConnectRecord record) { // convert ConnectRecord to HttpConnectRecord String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common"); HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - // get timestamp and offset - Long timestamp = httpConnectRecord.getData().getTimestamp(); - Map offset = null; - try { - // May throw NullPointerException. - offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); - } catch (NullPointerException e) { - // ignore null pointer exception - } - final Map finalOffset = offset; - Future> responseFuture = deliver(url, httpConnectRecord); - responseFuture.onSuccess(res -> { + deliver(url, httpConnectRecord, Collections.emptyMap()); + } + } + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified + * URL using the WebClient. + * + * @param url URI to which the HttpConnectRecord should be sent + * @param httpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to be used in processing + * @return processing chain + */ + @Override + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) { + // create headers + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); + + // get timestamp and offset + Long timestamp = httpConnectRecord.getData().getTimestamp(); + Map offset = null; + try { + // May throw NullPointerException. + offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); + } catch (NullPointerException e) { + // ignore null pointer exception + } + final Map finalOffset = offset; + + // send the request + return this.webClient.post(url.getPath()) + .host(url.getHost()) + .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) + .putHeaders(headers) + .ssl(Objects.equals(url.getScheme(), "https")) + .sendJson(httpConnectRecord) + .onSuccess(res -> { log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset); + + Exception e = null; + // log the response if (HttpUtils.is2xxSuccessful(res.statusCode())) { if (log.isDebugEnabled()) { @@ -135,7 +168,6 @@ public void handle(ConnectRecord record) { log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, finalOffset); } - record.getCallback().onSuccess(convertToSendResult(record)); } else { if (log.isDebugEnabled()) { log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", @@ -144,16 +176,49 @@ public void handle(ConnectRecord record) { log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, finalOffset); } - record.getCallback() - .onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode()))); + + e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode()); } + + // try callback + tryCallback(httpConnectRecord.getData(), e, attributes); }).onFailure(err -> { log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err); - record.getCallback().onException(buildSendExceptionContext(record, err)); + + // try callback + tryCallback(httpConnectRecord.getData(), err, attributes); }); + } + + /** + * Tries to call the callback based on the result of the request. + * + * @param record the ConnectRecord to process + * @param e the exception thrown during the request, may be null + * @param attributes additional attributes to be used in processing + */ + private void tryCallback(ConnectRecord record, Throwable e, Map attributes) { + // update the retry event + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); + if (retryEvent != null) { + retryEvent.setLastException(e); + } + + // check if the callback is null + if (record.getCallback() == null) { + return; + } + + // If no exception, call onSuccess directly + if (e == null) { + record.getCallback().onSuccess(convertToSendResult(record)); + } else if (retryEvent == null || retryEvent.isMaxRetriesReached()) { + // Determine if you need to continue retrying + record.getCallback().onException(buildSendExceptionContext(record, e)); } } + private SendResult convertToSendResult(ConnectRecord record) { SendResult result = new SendResult(); result.setMessageId(record.getRecordId()); @@ -174,30 +239,6 @@ private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Thr } - /** - * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified - * URL using the WebClient. - * - * @param url URI to which the HttpConnectRecord should be sent - * @param httpConnectRecord HttpConnectRecord to process - * @return processing chain - */ - @Override - public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { - // create headers - MultiMap headers = HttpHeaders.headers() - .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") - .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); - // send the request - return this.webClient.post(url.getPath()) - .host(url.getHost()) - .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) - .putHeaders(headers) - .ssl(Objects.equals(url.getScheme(), "https")) - .sendJson(httpConnectRecord); - } - - /** * Cleans up and releases resources used by the HTTP/HTTPS handler. */ diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java index 09fd66a762..3ced8b0653 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.net.URI; +import java.util.Map; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -32,14 +33,14 @@ * *

Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface. * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)}, - * {@link #deliver(URI, HttpConnectRecord)}, and {@link #stop()} methods.

+ * {@link #deliver(URI, HttpConnectRecord, Map)}, and {@link #stop()} methods.

* *

Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently. * The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a - * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL - * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.

+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map)} method processes HttpConnectRecord on specified + * URL while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.

* - *

It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method + *

It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map)} method * to prevent message loss or processing interruptions.

*/ public interface HttpSinkHandler { @@ -62,9 +63,10 @@ public interface HttpSinkHandler { * * @param url URI to which the HttpConnectRecord should be sent * @param httpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to be used in processing * @return processing chain */ - Future> deliver(URI url, HttpConnectRecord httpConnectRecord); + Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes); /** * Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed. diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java index bc2a536107..7f4220e413 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java @@ -20,8 +20,7 @@ import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig; import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; -import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; -import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.util.HttpUtils; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; @@ -30,7 +29,9 @@ import java.time.Duration; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -43,7 +44,6 @@ import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import dev.failsafe.RetryPolicyBuilder; -import dev.failsafe.event.ExecutionEvent; @Slf4j @@ -106,8 +106,17 @@ public void handle(ConnectRecord record) { this.connectorConfig.getConnectorName(), url.getScheme(), this.connectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + + // build the retry event + HttpRetryEvent retryEvent = new HttpRetryEvent(); + retryEvent.setMaxRetries(connectorConfig.getRetryConfig().getMaxRetries()); + + // add the retry event to the attributes + HashMap attributes = new HashMap<>(); + attributes.put(HttpRetryEvent.NAME, retryEvent); + // handle the HttpConnectRecord - deliver(url, httpConnectRecord); + deliver(url, httpConnectRecord, attributes); } } @@ -118,34 +127,27 @@ public void handle(ConnectRecord record) { * * @param url URI to which the HttpConnectRecord should be sent * @param httpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to pass to the processing chain * @return processing chain */ @Override - public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) { // Only webhook mode needs to use the UUID to identify the request String id = httpConnectRecord.getUuid(); // Build the retry policy RetryPolicy> retryPolicy = retryPolicyBuilder - .onSuccess(event -> { - if (connectorConfig.getWebhookConfig().isActivate()) { - // convert the result to an HttpExportRecord - HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id); - // add the data to the queue - ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord); - } - }) .onRetry(event -> { if (log.isDebugEnabled()) { - log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, event.getAttemptCount(), httpConnectRecord); + log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord); } else { log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount()); } - if (connectorConfig.getWebhookConfig().isActivate()) { - HttpExportRecord exportRecord = - covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id); - ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord); - } + // update the retry event + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); + retryEvent.increaseCurrentRetries(); + retryEvent.setParentId(id); + // update the HttpConnectRecord httpConnectRecord.setTime(LocalDateTime.now().toString()); httpConnectRecord.setUuid(UUID.randomUUID().toString()); @@ -157,44 +159,15 @@ public Future> deliver(URI url, HttpConnectRecord httpConne } else { log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException()); } - if (connectorConfig.getWebhookConfig().isActivate()) { - HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id); - ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord); - } }).build(); // Handle the HttpConnectRecord with retry Failsafe.with(retryPolicy) - .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord).toCompletionStage()); + .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes).toCompletionStage()); return null; } - /** - * Converts the ExecutionCompletedEvent to an HttpExportRecord. - * - * @param httpConnectRecord HttpConnectRecord - * @param event ExecutionEvent - * @param response the response of the request, may be null - * @param e the exception thrown during the request, may be null - * @param url the URL the request was sent to - * @param id UUID - * @return the converted HttpExportRecord - */ - private HttpExportRecord covertToExportRecord(HttpConnectRecord httpConnectRecord, ExecutionEvent event, HttpResponse response, - Throwable e, URI url, String id) { - - HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder() - .url(url.toString()) - .code(response != null ? response.statusCode() : -1) - .message(response != null ? response.statusMessage() : e.getMessage()) - .receivedTime(LocalDateTime.now()) - .uuid(httpConnectRecord.getUuid()) - .retriedBy(event.getAttemptCount() > 1 ? id : null) - .retryNum(event.getAttemptCount() - 1).build(); - - return new HttpExportRecord(httpExportMetadata, response == null ? null : response.bodyAsString()); - } /** * Cleans up and releases resources used by the HTTP/HTTPS handler. diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java index 4e64126a9d..5e09520d5d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java @@ -25,13 +25,16 @@ import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage; +import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang3.StringUtils; import java.net.URI; import java.time.LocalDateTime; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -214,7 +217,7 @@ public void handle(ConnectRecord record) { String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook"); HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); // handle the HttpConnectRecord - deliver(url, httpConnectRecord); + deliver(url, httpConnectRecord, Collections.emptyMap()); } } @@ -225,30 +228,27 @@ public void handle(ConnectRecord record) { * * @param url URI to which the HttpConnectRecord should be sent * @param httpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to be used in processing * @return processing chain */ @Override - public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) { // send the request - Future> responseFuture = super.deliver(url, httpConnectRecord); + Future> responseFuture = super.deliver(url, httpConnectRecord, attributes); // store the received data return responseFuture.onComplete(arr -> { - // If open retry, return directly and handled by RetryHttpSinkHandler - if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) { - return; + // get tryEvent from attributes + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); + + HttpResponse response = null; + if (arr.succeeded()) { + response = arr.result(); + } else if (retryEvent != null) { + retryEvent.setLastException(arr.cause()); } - // create ExportMetadataBuilder - HttpResponse response = arr.succeeded() ? arr.result() : null; - - HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder() - .url(url.toString()) - .code(response != null ? response.statusCode() : -1) - .message(response != null ? response.statusMessage() : arr.cause().getMessage()) - .receivedTime(LocalDateTime.now()) - .retriedBy(null) - .uuid(httpConnectRecord.getUuid()) - .retryNum(0) - .build(); + + // create ExportMetadata + HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent); // create ExportRecord HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null); @@ -257,6 +257,39 @@ public Future> deliver(URI url, HttpConnectRecord httpConne }); } + /** + * Builds the HttpExportMetadata object based on the response, HttpConnectRecord, and HttpRetryEvent. + * + * @param url the URI to which the HttpConnectRecord was sent + * @param response the response received from the URI + * @param httpConnectRecord the HttpConnectRecord that was sent + * @param retryEvent the HttpRetryEvent associated with the request + * @return the HttpExportMetadata object + */ + private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord, + HttpRetryEvent retryEvent) { + + + String msg = null; + // order of precedence: lastException > response > null + if (retryEvent != null && retryEvent.getLastException() != null) { + msg = retryEvent.getLimitedExceptionMessage(); + retryEvent.clearException(); + } else if (response != null) { + msg = response.statusMessage(); + } + + return HttpExportMetadata.builder() + .url(url.toString()) + .code(response != null ? response.statusCode() : -1) + .message(msg) + .receivedTime(LocalDateTime.now()) + .retriedBy(retryEvent != null ? retryEvent.getParentId() : null) + .uuid(httpConnectRecord.getUuid()) + .retryNum(retryEvent != null ? retryEvent.getCurrentRetries() : 0) + .build(); + } + /** * Cleans up and releases resources used by the HTTP/HTTPS handler. diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java index 3e724627c0..7ddba511c4 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java @@ -86,7 +86,7 @@ void before() throws Exception { JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString()); return HttpResponse.response() .withContentType(MediaType.APPLICATION_JSON) - .withStatusCode(200) + .withStatusCode(HttpStatus.SC_OK) .withBody(new JSONObject() .fluentPut("code", 0) .fluentPut("message", "success") From 5b32bd76c241603770ac2de6e7e337932f7fbd1a Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 14 Aug 2024 15:21:03 +0800 Subject: [PATCH 2/4] doc: Improve some documentation --- .../http/sink/data/HttpRetryEvent.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java index 1666ede576..0a09f713c9 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java @@ -34,19 +34,34 @@ public class HttpRetryEvent { private Throwable lastException; + /** + * Increase the current retries by 1 + */ public void increaseCurrentRetries() { this.currentRetries++; } + /** + * Check if the current retries is greater than or equal to the max retries + * @return true if the current retries is greater than or equal to the max retries + */ public boolean isMaxRetriesReached() { return this.currentRetries >= this.maxRetries; } + /** + * Get the limited exception message with the default limit of 256 + * @return the limited exception message + */ public String getLimitedExceptionMessage() { return getLimitedExceptionMessage(256); } - + /** + * Get the limited exception message with the specified limit + * @param maxLimit the maximum limit of the exception message + * @return the limited exception message + */ public String getLimitedExceptionMessage(int maxLimit) { if (lastException == null) { return ""; @@ -61,6 +76,9 @@ public String getLimitedExceptionMessage(int maxLimit) { return message; } + /** + * Clear the last exception + */ public void clearException() { this.lastException = null; } From f3dd4f36c5d0232546e6ee7543b368c97288f83d Mon Sep 17 00:00:00 2001 From: zaki Date: Wed, 14 Aug 2024 21:47:10 +0800 Subject: [PATCH 3/4] feat: Support for multi-server data callbacks --- .../http/sink/HttpSinkConnector.java | 10 +- .../http/sink/data/HttpRetryEvent.java | 13 +- .../sink/data/MultiHttpRequestContext.java | 64 ++++++++ .../sink/handler/AbstractHttpSinkHandler.java | 88 +++++++++++ .../{handle => handler}/HttpSinkHandler.java | 2 +- .../impl}/CommonHttpSinkHandler.java | 141 ++++++++++-------- .../impl/HttpSinkHandlerRetryWrapper.java} | 87 +++-------- .../impl}/WebhookHttpSinkHandler.java | 33 +--- 8 files changed, 269 insertions(+), 169 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java rename eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/{handle => handler}/HttpSinkHandler.java (98%) rename eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/{handle => handler/impl}/CommonHttpSinkHandler.java (66%) rename eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/{handle/RetryHttpSinkHandler.java => handler/impl/HttpSinkHandlerRetryWrapper.java} (64%) rename eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/{handle => handler/impl}/WebhookHttpSinkHandler.java (90%) diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java index 8a14756372..9b6038bdea 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -20,10 +20,10 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; -import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler; -import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler; -import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler; -import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper; +import org.apache.eventmesh.connector.http.sink.handler.impl.WebhookHttpSinkHandler; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; @@ -86,7 +86,7 @@ private void doInit() { this.sinkHandler = nonRetryHandler; } else if (maxRetries > 0) { // Wrap the sink handler with a retry handler - this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler); + this.sinkHandler = new HttpSinkHandlerRetryWrapper(this.httpSinkConfig.connectorConfig, nonRetryHandler); } else { throw new IllegalArgumentException("Max retries must be greater than or equal to 0."); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java index 0a09f713c9..4b229f9839 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java @@ -18,13 +18,14 @@ package org.apache.eventmesh.connector.http.sink.data; import lombok.Data; -import lombok.ToString; +/** + * Single HTTP retry event + */ @Data -@ToString public class HttpRetryEvent { - public static final String NAME = "http-retry-event"; + public static final String PREFIX = "http-retry-event-"; private String parentId; @@ -76,10 +77,4 @@ public String getLimitedExceptionMessage(int maxLimit) { return message; } - /** - * Clear the last exception - */ - public void clearException() { - this.lastException = null; - } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java new file mode 100644 index 0000000000..67ab943818 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Multi HTTP request context + */ +public class MultiHttpRequestContext { + + public static final String NAME = "multi-http-request-context"; + + /** + * The remaining requests to be processed. + */ + private final AtomicInteger remainingRequests; + + /** + * The last failed event. + * If there are no retries or retries are not enabled, it will be null. + * If retries occur but still fail, it will be logged, and only the last one will be retained. + */ + private HttpRetryEvent lastFailedEvent; + + public MultiHttpRequestContext(int remainingEvents) { + this.remainingRequests = new AtomicInteger(remainingEvents); + } + + /** + * Decrement the remaining requests by 1. + */ + public void decrementRemainingRequests() { + remainingRequests.decrementAndGet(); + } + + public int getRemainingRequests() { + return remainingRequests.get(); + } + + public HttpRetryEvent getLastFailedEvent() { + return lastFailedEvent; + } + + public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) { + this.lastFailedEvent = lastFailedEvent; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java new file mode 100644 index 0000000000..0989db4aff --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handler; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; +import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler. + */ +public abstract class AbstractHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig sinkConnectorConfig; + + private final List urls; + + protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + this.sinkConnectorConfig = sinkConnectorConfig; + // Initialize URLs + String[] urlStrings = sinkConnectorConfig.getUrls(); + this.urls = Arrays.stream(urlStrings) + .map(URI::create) + .collect(Collectors.toList()); + } + + public SinkConnectorConfig getSinkConnectorConfig() { + return sinkConnectorConfig; + } + + public List getUrls() { + return urls; + } + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + @Override + public void handle(ConnectRecord record) { + // build attributes + Map attributes = new ConcurrentHashMap<>(); + attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size())); + + // send the record to all URLs + for (URI url : urls) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", + this.sinkConnectorConfig.getConnectorName(), url.getScheme(), + this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + + // add retry event to attributes + HttpRetryEvent retryEvent = new HttpRetryEvent(); + retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries()); + attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid(), retryEvent); + + // deliver the record + deliver(url, httpConnectRecord, attributes); + } + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java similarity index 98% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java index 3ced8b0653..1731809ab9 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.handle; +package org.apache.eventmesh.connector.http.sink.handler; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java similarity index 66% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java index 8b5a45f948..811386f52c 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java @@ -15,25 +15,23 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.handle; +package org.apache.eventmesh.connector.http.sink.handler.impl; import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; +import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; +import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; import org.apache.eventmesh.connector.http.util.HttpUtils; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.net.URI; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import io.netty.handler.codec.http.HttpHeaderNames; import io.vertx.core.Future; @@ -62,22 +60,13 @@ */ @Slf4j @Getter -public class CommonHttpSinkHandler implements HttpSinkHandler { - - private final SinkConnectorConfig connectorConfig; - - private final List urls; +public class CommonHttpSinkHandler extends AbstractHttpSinkHandler { private WebClient webClient; public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { - this.connectorConfig = sinkConnectorConfig; - // Initialize URLs - String[] urlStrings = sinkConnectorConfig.getUrls(); - this.urls = Arrays.stream(urlStrings) - .map(URI::create) - .collect(Collectors.toList()); + super(sinkConnectorConfig); } /** @@ -93,33 +82,18 @@ public void start() { * Initializes the WebClient with the provided configuration options. */ private void doInitWebClient() { + SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig(); final Vertx vertx = Vertx.vertx(); WebClientOptions options = new WebClientOptions() - .setKeepAlive(this.connectorConfig.isKeepAlive()) - .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) - .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setKeepAlive(sinkConnectorConfig.isKeepAlive()) + .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(sinkConnectorConfig.getIdleTimeout()) .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) - .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) - .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); + .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout()) + .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize()); this.webClient = WebClient.create(vertx, options); } - /** - * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. - * - * @param record the ConnectRecord to process - */ - @Override - public void handle(ConnectRecord record) { - for (URI url : this.urls) { - // convert ConnectRecord to HttpConnectRecord - String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common"); - HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - deliver(url, httpConnectRecord, Collections.emptyMap()); - } - } - - /** * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified * URL using the WebClient. @@ -181,41 +155,92 @@ public Future> deliver(URI url, HttpConnectRecord httpConne } // try callback - tryCallback(httpConnectRecord.getData(), e, attributes); + tryCallback(httpConnectRecord, e, attributes); }).onFailure(err -> { log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err); // try callback - tryCallback(httpConnectRecord.getData(), err, attributes); + tryCallback(httpConnectRecord, err, attributes); }); } /** * Tries to call the callback based on the result of the request. * - * @param record the ConnectRecord to process - * @param e the exception thrown during the request, may be null - * @param attributes additional attributes to be used in processing + * @param httpConnectRecord the HttpConnectRecord to use + * @param e the exception thrown during the request, may be null + * @param attributes additional attributes to be used in processing */ - private void tryCallback(ConnectRecord record, Throwable e, Map attributes) { - // update the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); - if (retryEvent != null) { - retryEvent.setLastException(e); - } + private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes) { + // get the retry event + HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e); + + // get the multi http request context + MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent); - // check if the callback is null - if (record.getCallback() == null) { - return; + + if (multiHttpRequestContext.getRemainingRequests() == 0) { + // do callback + ConnectRecord record = httpConnectRecord.getData(); + if (record.getCallback() == null) { + if (log.isDebugEnabled()) { + log.warn("ConnectRecord callback is null. Ignoring callback. {}", record); + } else { + log.warn("ConnectRecord callback is null. Ignoring callback."); + } + return; + } + + HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent(); + if (lastFailedEvent == null || lastFailedEvent.getLastException() == null) { + // success + record.getCallback().onSuccess(convertToSendResult(record)); + } else { + // failure + record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException())); + } } + } - // If no exception, call onSuccess directly - if (e == null) { - record.getCallback().onSuccess(convertToSendResult(record)); - } else if (retryEvent == null || retryEvent.isMaxRetriesReached()) { - // Determine if you need to continue retrying - record.getCallback().onException(buildSendExceptionContext(record, e)); + + /** + * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord. + * + * @param attributes the attributes to use + * @param retryEvent the retry event to use + * @return the updated multi http request context + */ + private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpRetryEvent retryEvent) { + // get the multi http request context + MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME); + + if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) { + // decrement the counter + multiHttpRequestContext.decrementRemainingRequests(); + + // try set failed event + if (retryEvent.getLastException() != null) { + multiHttpRequestContext.setLastFailedEvent(retryEvent); + } } + + return multiHttpRequestContext; + } + + /** + * Gets and updates the retry event based on the provided attributes and HttpConnectRecord. + * + * @param attributes the attributes to use + * @param httpConnectRecord the HttpConnectRecord to use + * @param e the exception thrown during the request, may be null + * @return the updated retry event + */ + private HttpRetryEvent getAndUpdateRetryEvent(Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) { + // get the retry event + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); + // update the retry event + retryEvent.setLastException(e); + return retryEvent; } @@ -250,6 +275,4 @@ public void stop() { log.warn("WebClient is null, ignore."); } } - - } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java similarity index 64% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java index 7f4220e413..c013abe6cf 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java @@ -15,25 +15,22 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.handle; +package org.apache.eventmesh.connector.http.sink.handler.impl; import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig; import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; +import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler; import org.apache.eventmesh.connector.http.util.HttpUtils; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.net.ConnectException; import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -43,47 +40,24 @@ import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; -import dev.failsafe.RetryPolicyBuilder; +/** + * HttpSinkHandlerRetryWrapper is a wrapper class for the HttpSinkHandler that provides retry functionality for failed HTTP requests. + */ @Slf4j -public class RetryHttpSinkHandler implements HttpSinkHandler { - - private final SinkConnectorConfig connectorConfig; - - // Retry policy builder - private RetryPolicyBuilder> retryPolicyBuilder; +public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler { - private final List urls; + private final HttpRetryConfig httpRetryConfig; private final HttpSinkHandler sinkHandler; - - public RetryHttpSinkHandler(SinkConnectorConfig connectorConfig, HttpSinkHandler sinkHandler) { - this.connectorConfig = connectorConfig; + public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) { + super(sinkConnectorConfig); this.sinkHandler = sinkHandler; - - // Initialize retry - initRetry(); - - // Initialize URLs - String[] urlStrings = connectorConfig.getUrls(); - this.urls = Arrays.stream(urlStrings) - .map(URI::create) - .collect(Collectors.toList()); - } - - private void initRetry() { - HttpRetryConfig httpRetryConfig = this.connectorConfig.getRetryConfig(); - - this.retryPolicyBuilder = RetryPolicy.>builder() - .handleIf(e -> e instanceof ConnectException) - .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) - .withMaxRetries(httpRetryConfig.getMaxRetries()) - .withDelay(Duration.ofMillis(httpRetryConfig.getInterval())); + this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig(); } - /** * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig. */ @@ -93,34 +67,6 @@ public void start() { } - /** - * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. - * - * @param record the ConnectRecord to process - */ - @Override - public void handle(ConnectRecord record) { - for (URI url : this.urls) { - // convert ConnectRecord to HttpConnectRecord - String type = String.format("%s.%s.%s", - this.connectorConfig.getConnectorName(), url.getScheme(), - this.connectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); - HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - - // build the retry event - HttpRetryEvent retryEvent = new HttpRetryEvent(); - retryEvent.setMaxRetries(connectorConfig.getRetryConfig().getMaxRetries()); - - // add the retry event to the attributes - HashMap attributes = new HashMap<>(); - attributes.put(HttpRetryEvent.NAME, retryEvent); - - // handle the HttpConnectRecord - deliver(url, httpConnectRecord, attributes); - } - } - - /** * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the * HttpConnectRecord @@ -136,7 +82,11 @@ public Future> deliver(URI url, HttpConnectRecord httpConne String id = httpConnectRecord.getUuid(); // Build the retry policy - RetryPolicy> retryPolicy = retryPolicyBuilder + RetryPolicy> retryPolicy = RetryPolicy.>builder() + .handleIf(e -> e instanceof ConnectException) + .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) + .withMaxRetries(httpRetryConfig.getMaxRetries()) + .withDelay(Duration.ofMillis(httpRetryConfig.getInterval())) .onRetry(event -> { if (log.isDebugEnabled()) { log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord); @@ -144,13 +94,16 @@ public Future> deliver(URI url, HttpConnectRecord httpConne log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount()); } // update the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.remove(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); retryEvent.increaseCurrentRetries(); retryEvent.setParentId(id); // update the HttpConnectRecord httpConnectRecord.setTime(LocalDateTime.now().toString()); httpConnectRecord.setUuid(UUID.randomUUID().toString()); + + // update the attributes + attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid(), retryEvent); }) .onFailure(event -> { if (log.isDebugEnabled()) { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java similarity index 90% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java index 5e09520d5d..1da2f6c181 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.handle; +package org.apache.eventmesh.connector.http.sink.handler.impl; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; @@ -26,13 +26,11 @@ import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang3.StringUtils; import java.net.URI; import java.time.LocalDateTime; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,8 +62,6 @@ @Slf4j public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { - private final SinkConnectorConfig sinkConnectorConfig; - // the configuration for webhook private final HttpWebhookConfig webhookConfig; @@ -89,7 +85,7 @@ public boolean isExportDestroyed() { public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { super(sinkConnectorConfig); - this.sinkConnectorConfig = sinkConnectorConfig; + this.webhookConfig = sinkConnectorConfig.getWebhookConfig(); int maxQueueSize = this.webhookConfig.getMaxStorageSize(); this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize); @@ -97,9 +93,6 @@ public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { doInitExportServer(); } - public SynchronizedCircularFifoQueue getReceivedDataQueue() { - return receivedDataQueue; - } /** * Initialize the server for exporting the received data @@ -205,22 +198,6 @@ public void start() { }); } - /** - * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. - * - * @param record the ConnectRecord to process - */ - @Override - public void handle(ConnectRecord record) { - for (URI url : super.getUrls()) { - // convert ConnectRecord to HttpConnectRecord - String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook"); - HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - // handle the HttpConnectRecord - deliver(url, httpConnectRecord, Collections.emptyMap()); - } - } - /** * Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified @@ -238,7 +215,7 @@ public Future> deliver(URI url, HttpConnectRecord httpConne // store the received data return responseFuture.onComplete(arr -> { // get tryEvent from attributes - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.NAME); + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); HttpResponse response = null; if (arr.succeeded()) { @@ -263,7 +240,7 @@ public Future> deliver(URI url, HttpConnectRecord httpConne * @param url the URI to which the HttpConnectRecord was sent * @param response the response received from the URI * @param httpConnectRecord the HttpConnectRecord that was sent - * @param retryEvent the HttpRetryEvent associated with the request + * @param retryEvent the SingleHttpRetryEvent that was used for retries * @return the HttpExportMetadata object */ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord, @@ -274,7 +251,7 @@ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse // order of precedence: lastException > response > null if (retryEvent != null && retryEvent.getLastException() != null) { msg = retryEvent.getLimitedExceptionMessage(); - retryEvent.clearException(); + retryEvent.setLastException(null); } else if (response != null) { msg = response.statusMessage(); } From 4ee295d651cfef4b9d9ee71ae141b8c4722d24cc Mon Sep 17 00:00:00 2001 From: zaki Date: Fri, 16 Aug 2024 17:02:45 +0800 Subject: [PATCH 4/4] perf: Optimize some logic --- .../http/sink/data/HttpConnectRecord.java | 44 +++++++++++++++---- .../http/sink/data/HttpExportMetadata.java | 10 ++++- .../http/sink/data/HttpExportRecord.java | 6 ++- .../http/sink/data/HttpExportRecordPage.java | 5 ++- .../sink/handler/AbstractHttpSinkHandler.java | 2 +- .../handler/impl/CommonHttpSinkHandler.java | 36 +++++++-------- .../impl/HttpSinkHandlerRetryWrapper.java | 18 ++------ .../handler/impl/WebhookHttpSinkHandler.java | 13 +++--- 8 files changed, 79 insertions(+), 55 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java index a258c6ab53..95b40afe9e 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java @@ -20,31 +20,60 @@ import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import java.io.Serializable; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.UUID; import lombok.Builder; -import lombok.Data; +import lombok.Getter; /** * a special ConnectRecord for HttpSinkConnector */ -@Data +@Getter @Builder -public class HttpConnectRecord { +public class HttpConnectRecord implements Serializable { - private String type; + private static final long serialVersionUID = 5271462532332251473L; + + /** + * The unique identifier for the HttpConnectRecord + */ + private final String httpRecordId = UUID.randomUUID().toString(); - private String time; + /** + * The time when the HttpConnectRecord was created + */ + private LocalDateTime createTime; - private String uuid; + /** + * The type of the HttpConnectRecord + */ + private String type; + /** + * The event id of the HttpConnectRecord + */ private String eventId; + /** + * The ConnectRecord to be sent + */ private ConnectRecord data; + @Override + public String toString() { + return "HttpConnectRecord{" + + "createTime=" + createTime + + ", httpRecordId='" + httpRecordId + + ", type='" + type + + ", eventId='" + eventId + + ", data=" + data + + '}'; + } + /** * Convert ConnectRecord to HttpConnectRecord * @@ -62,11 +91,8 @@ public static HttpConnectRecord convertConnectRecord(ConnectRecord record, Strin } return HttpConnectRecord.builder() .type(type) - .time(LocalDateTime.now().toString()) - .uuid(UUID.randomUUID().toString()) .eventId(type + "-" + offset) .data(record) .build(); } - } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java index 848012f152..41a5087870 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.http.sink.data; +import java.io.Serializable; import java.time.LocalDateTime; import lombok.Builder; @@ -27,7 +28,10 @@ */ @Data @Builder -public class HttpExportMetadata { +public class HttpExportMetadata implements Serializable { + + private static final long serialVersionUID = 1121010466793041920L; + private String url; private int code; @@ -36,7 +40,9 @@ public class HttpExportMetadata { private LocalDateTime receivedTime; - private String uuid; + private String httpRecordId; + + private String recordId; private String retriedBy; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java index b6382aee7a..c6bdb02884 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.connector.http.sink.data; +import java.io.Serializable; + import lombok.AllArgsConstructor; import lombok.Data; @@ -25,7 +27,9 @@ */ @Data @AllArgsConstructor -public class HttpExportRecord { +public class HttpExportRecord implements Serializable { + + private static final long serialVersionUID = 6010283911452947157L; private HttpExportMetadata metadata; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java index 5c44eb3b7f..81e582c33a 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.http.sink.data; +import java.io.Serializable; import java.util.List; import lombok.AllArgsConstructor; @@ -27,7 +28,9 @@ */ @Data @AllArgsConstructor -public class HttpExportRecordPage { +public class HttpExportRecordPage implements Serializable { + + private static final long serialVersionUID = 1143791658357035990L; private int pageNum; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java index 0989db4aff..36d01115bb 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java @@ -78,7 +78,7 @@ public void handle(ConnectRecord record) { // add retry event to attributes HttpRetryEvent retryEvent = new HttpRetryEvent(); retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries()); - attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid(), retryEvent); + attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent); // deliver the record deliver(url, httpConnectRecord, attributes); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java index 811386f52c..0907847455 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java @@ -178,7 +178,6 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) { + // get the retry event + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); + // update the retry event + retryEvent.setLastException(e); + return retryEvent; + } + /** * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord. @@ -227,23 +242,6 @@ private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) { - // get the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); - // update the retry event - retryEvent.setLastException(e); - return retryEvent; - } - - private SendResult convertToSendResult(ConnectRecord record) { SendResult result = new SendResult(); result.setMessageId(record.getRecordId()); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java index c013abe6cf..268d0a0d6d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java @@ -28,9 +28,7 @@ import java.net.ConnectException; import java.net.URI; import java.time.Duration; -import java.time.LocalDateTime; import java.util.Map; -import java.util.UUID; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -78,8 +76,6 @@ public void start() { */ @Override public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) { - // Only webhook mode needs to use the UUID to identify the request - String id = httpConnectRecord.getUuid(); // Build the retry policy RetryPolicy> retryPolicy = RetryPolicy.>builder() @@ -94,27 +90,19 @@ public Future> deliver(URI url, HttpConnectRecord httpConne log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount()); } // update the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.remove(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); retryEvent.increaseCurrentRetries(); - retryEvent.setParentId(id); - - // update the HttpConnectRecord - httpConnectRecord.setTime(LocalDateTime.now().toString()); - httpConnectRecord.setUuid(UUID.randomUUID().toString()); - - // update the attributes - attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid(), retryEvent); }) .onFailure(event -> { if (log.isDebugEnabled()) { - log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, event.getAttemptCount(), + log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(), httpConnectRecord, event.getException()); } else { log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException()); } }).build(); - // Handle the HttpConnectRecord with retry + // Handle the ConnectRecord with retry policy Failsafe.with(retryPolicy) .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes).toCompletionStage()); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java index 1da2f6c181..ff8f69d45a 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java @@ -215,12 +215,12 @@ public Future> deliver(URI url, HttpConnectRecord httpConne // store the received data return responseFuture.onComplete(arr -> { // get tryEvent from attributes - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getUuid()); + HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); HttpResponse response = null; if (arr.succeeded()) { response = arr.result(); - } else if (retryEvent != null) { + } else { retryEvent.setLastException(arr.cause()); } @@ -246,10 +246,9 @@ public Future> deliver(URI url, HttpConnectRecord httpConne private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord, HttpRetryEvent retryEvent) { - String msg = null; // order of precedence: lastException > response > null - if (retryEvent != null && retryEvent.getLastException() != null) { + if (retryEvent.getLastException() != null) { msg = retryEvent.getLimitedExceptionMessage(); retryEvent.setLastException(null); } else if (response != null) { @@ -261,9 +260,9 @@ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse .code(response != null ? response.statusCode() : -1) .message(msg) .receivedTime(LocalDateTime.now()) - .retriedBy(retryEvent != null ? retryEvent.getParentId() : null) - .uuid(httpConnectRecord.getUuid()) - .retryNum(retryEvent != null ? retryEvent.getCurrentRetries() : 0) + .httpRecordId(httpConnectRecord.getHttpRecordId()) + .recordId(httpConnectRecord.getData().getRecordId()) + .retryNum(retryEvent.getCurrentRetries()) .build(); }