Skip to content

[ISSUE #5105] Fix the retry mechanism of the HttpSinkConnector #5106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,82 @@

package org.apache.eventmesh.connector.http.sink.data;

import lombok.Data;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Single HTTP retry event
* Single HTTP attempt event
*/
@Data
public class HttpRetryEvent {
public class HttpAttemptEvent {

public static final String PREFIX = "http-retry-event-";
public static final String PREFIX = "http-attempt-event-";

private String parentId;
private final int maxAttempts;

private int maxRetries;

private int currentRetries;
private final AtomicInteger attempts;

private Throwable lastException;


public HttpAttemptEvent(int maxAttempts) {
this.maxAttempts = maxAttempts;
this.attempts = new AtomicInteger(0);
}

/**
* Increment the attempts
*/
public void incrementAttempts() {
attempts.incrementAndGet();
}

/**
* Increase the current retries by 1
* Update the event, incrementing the attempts and setting the last exception
*
* @param exception the exception to update, can be null
*/
public void increaseCurrentRetries() {
this.currentRetries++;
public void updateEvent(Throwable exception) {
// increment the attempts
incrementAttempts();

// update the last exception
lastException = exception;
}

/**
* Check if the current retries is greater than or equal to the max retries
* @return true if the current retries is greater than or equal to the max retries
* Check if the attempts are less than the maximum attempts
*
* @return true if the attempts are less than the maximum attempts, false otherwise
*/
public boolean isMaxRetriesReached() {
return this.currentRetries >= this.maxRetries;
public boolean canAttempt() {
return attempts.get() < maxAttempts;
}

public boolean isComplete() {
if (attempts.get() == 0) {
// No start yet
return false;
}

// If no attempt can be made or the last exception is null, the event completed
return !canAttempt() || lastException == null;
}


public int getMaxAttempts() {
return maxAttempts;
}

public int getAttempts() {
return attempts.get();
}

public Throwable getLastException() {
return lastException;
}

/**
* Get the limited exception message with the default limit of 256
*
* @return the limited exception message
*/
public String getLimitedExceptionMessage() {
Expand All @@ -60,6 +101,7 @@ public String getLimitedExceptionMessage() {

/**
* Get the limited exception message with the specified limit
*
* @param maxLimit the maximum limit of the exception message
* @return the limited exception message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ public class MultiHttpRequestContext {

/**
* The last failed event.
* If there are no retries or retries are not enabled, it will be null.
* If retries occur but still fail, it will be logged, and only the last one will be retained.
*/
private HttpRetryEvent lastFailedEvent;
private HttpAttemptEvent lastFailedEvent;

public MultiHttpRequestContext(int remainingEvents) {
this.remainingRequests = new AtomicInteger(remainingEvents);
Expand All @@ -50,15 +49,24 @@ public void decrementRemainingRequests() {
remainingRequests.decrementAndGet();
}

/**
* Check if all requests have been processed.
*
* @return true if all requests have been processed, false otherwise.
*/
public boolean isAllRequestsProcessed() {
return remainingRequests.get() == 0;
}

public int getRemainingRequests() {
return remainingRequests.get();
}

public HttpRetryEvent getLastFailedEvent() {
public HttpAttemptEvent getLastFailedEvent() {
return lastFailedEvent;
}

public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) {
this.lastFailedEvent = lastFailedEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.eventmesh.connector.http.sink.handler;

import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -75,10 +75,9 @@ public void handle(ConnectRecord record) {
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add retry event to attributes
HttpRetryEvent retryEvent = new HttpRetryEvent();
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent);
// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
Expand Down Expand Up @@ -176,13 +176,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
* @param attributes additional attributes to be used in processing
*/
private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) {
// get the retry event
HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
// get and update the attempt event
HttpAttemptEvent attemptEvent = (HttpAttemptEvent) attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
attemptEvent.updateEvent(e);

// get the multi http request context
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
// get and update the multiHttpRequestContext
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, attemptEvent);

if (multiHttpRequestContext.getRemainingRequests() == 0) {
if (multiHttpRequestContext.isAllRequestsProcessed()) {
// do callback
if (record.getCallback() == null) {
if (log.isDebugEnabled()) {
Expand All @@ -193,7 +194,8 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
return;
}

HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
// get the last failed event
HttpAttemptEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
if (lastFailedEvent == null) {
// success
record.getCallback().onSuccess(convertToSendResult(record));
Expand All @@ -204,41 +206,26 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
}
}

/**
* Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
*
* @param attributes the attributes to use
* @param httpConnectRecord the HttpConnectRecord to use
* @param e the exception thrown during the request, may be null
* @return the updated retry event
*/
private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object> attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
// get the retry event
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
// update the retry event
retryEvent.setLastException(e);
return retryEvent;
}


/**
* Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
*
* @param attributes the attributes to use
* @param retryEvent the retry event to use
* @param attributes the attributes to use
* @param attemptEvent the HttpAttemptEvent to use
* @return the updated multi http request context
*/
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpRetryEvent retryEvent) {
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpAttemptEvent attemptEvent) {
// get the multi http request context
MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);

if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
// Check if the current attempted event has completed
if (attemptEvent.isComplete()) {
// decrement the counter
multiHttpRequestContext.decrementRemainingRequests();

// try set failed event
if (retryEvent.getLastException() != null) {
multiHttpRequestContext.setLastFailedEvent(retryEvent);
if (attemptEvent.getLastException() != null) {
// if all attempts are exhausted, set the last failed event
multiHttpRequestContext.setLastFailedEvent(attemptEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
Expand Down Expand Up @@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {

private final HttpSinkHandler sinkHandler;

private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;

public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
super(sinkConnectorConfig);
this.sinkHandler = sinkHandler;
this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
this.retryPolicy = buildRetryPolicy();
}

private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
return RetryPolicy.<HttpResponse<Buffer>>builder()
.handleIf(e -> e instanceof ConnectException)
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
.withMaxRetries(httpRetryConfig.getMaxRetries())
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
.onRetry(event -> {
if (log.isDebugEnabled()) {
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}",
event.getAttemptCount(), httpRetryConfig.getInterval(), event.getLastException());
} else {
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.",
event.getAttemptCount(), httpRetryConfig.getInterval());
}
}).onFailure(event -> {
if (log.isDebugEnabled()) {
log.error("Failed to deliver message after {} attempts. Error: {}",
event.getAttemptCount(), event.getException());
} else {
log.error("Failed to deliver message after {} attempts.",
event.getAttemptCount());
}
}).build();
}

/**
Expand All @@ -78,36 +105,8 @@ public void start() {
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
ConnectRecord connectRecord) {

// Build the retry policy
RetryPolicy<HttpResponse<Buffer>> retryPolicy = RetryPolicy.<HttpResponse<Buffer>>builder()
.handleIf(e -> e instanceof ConnectException)
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
.withMaxRetries(httpRetryConfig.getMaxRetries())
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
.onRetry(event -> {
if (log.isDebugEnabled()) {
log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
} else {
log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
}
// update the retry event
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
retryEvent.increaseCurrentRetries();
})
.onFailure(event -> {
if (log.isDebugEnabled()) {
log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
httpConnectRecord, event.getException());
} else {
log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
}
}).build();

// Handle the ConnectRecord with retry policy
Failsafe.with(retryPolicy)
.getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes, connectRecord).toCompletionStage());

return null;
}

Expand Down
Loading
Loading