Skip to content

Commit 5412bbb

Browse files
authored
[ISSUE apache#5105] Fix the retry mechanism of the HttpSinkConnector (apache#5106)
1 parent ee646a8 commit 5412bbb

File tree

7 files changed

+130
-102
lines changed

7 files changed

+130
-102
lines changed
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,82 @@
1717

1818
package org.apache.eventmesh.connector.http.sink.data;
1919

20-
import lombok.Data;
20+
import java.util.concurrent.atomic.AtomicInteger;
2121

2222
/**
23-
* Single HTTP retry event
23+
* Single HTTP attempt event
2424
*/
25-
@Data
26-
public class HttpRetryEvent {
25+
public class HttpAttemptEvent {
2726

28-
public static final String PREFIX = "http-retry-event-";
27+
public static final String PREFIX = "http-attempt-event-";
2928

30-
private String parentId;
29+
private final int maxAttempts;
3130

32-
private int maxRetries;
33-
34-
private int currentRetries;
31+
private final AtomicInteger attempts;
3532

3633
private Throwable lastException;
3734

35+
36+
public HttpAttemptEvent(int maxAttempts) {
37+
this.maxAttempts = maxAttempts;
38+
this.attempts = new AtomicInteger(0);
39+
}
40+
41+
/**
42+
* Increment the attempts
43+
*/
44+
public void incrementAttempts() {
45+
attempts.incrementAndGet();
46+
}
47+
3848
/**
39-
* Increase the current retries by 1
49+
* Update the event, incrementing the attempts and setting the last exception
50+
*
51+
* @param exception the exception to update, can be null
4052
*/
41-
public void increaseCurrentRetries() {
42-
this.currentRetries++;
53+
public void updateEvent(Throwable exception) {
54+
// increment the attempts
55+
incrementAttempts();
56+
57+
// update the last exception
58+
lastException = exception;
4359
}
4460

4561
/**
46-
* Check if the current retries is greater than or equal to the max retries
47-
* @return true if the current retries is greater than or equal to the max retries
62+
* Check if the attempts are less than the maximum attempts
63+
*
64+
* @return true if the attempts are less than the maximum attempts, false otherwise
4865
*/
49-
public boolean isMaxRetriesReached() {
50-
return this.currentRetries >= this.maxRetries;
66+
public boolean canAttempt() {
67+
return attempts.get() < maxAttempts;
68+
}
69+
70+
public boolean isComplete() {
71+
if (attempts.get() == 0) {
72+
// No start yet
73+
return false;
74+
}
75+
76+
// If no attempt can be made or the last exception is null, the event completed
77+
return !canAttempt() || lastException == null;
78+
}
79+
80+
81+
public int getMaxAttempts() {
82+
return maxAttempts;
83+
}
84+
85+
public int getAttempts() {
86+
return attempts.get();
87+
}
88+
89+
public Throwable getLastException() {
90+
return lastException;
5191
}
5292

5393
/**
5494
* Get the limited exception message with the default limit of 256
95+
*
5596
* @return the limited exception message
5697
*/
5798
public String getLimitedExceptionMessage() {
@@ -60,6 +101,7 @@ public String getLimitedExceptionMessage() {
60101

61102
/**
62103
* Get the limited exception message with the specified limit
104+
*
63105
* @param maxLimit the maximum limit of the exception message
64106
* @return the limited exception message
65107
*/

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ public class MultiHttpRequestContext {
3434

3535
/**
3636
* The last failed event.
37-
* If there are no retries or retries are not enabled, it will be null.
3837
* If retries occur but still fail, it will be logged, and only the last one will be retained.
3938
*/
40-
private HttpRetryEvent lastFailedEvent;
39+
private HttpAttemptEvent lastFailedEvent;
4140

4241
public MultiHttpRequestContext(int remainingEvents) {
4342
this.remainingRequests = new AtomicInteger(remainingEvents);
@@ -50,15 +49,24 @@ public void decrementRemainingRequests() {
5049
remainingRequests.decrementAndGet();
5150
}
5251

52+
/**
53+
* Check if all requests have been processed.
54+
*
55+
* @return true if all requests have been processed, false otherwise.
56+
*/
57+
public boolean isAllRequestsProcessed() {
58+
return remainingRequests.get() == 0;
59+
}
60+
5361
public int getRemainingRequests() {
5462
return remainingRequests.get();
5563
}
5664

57-
public HttpRetryEvent getLastFailedEvent() {
65+
public HttpAttemptEvent getLastFailedEvent() {
5866
return lastFailedEvent;
5967
}
6068

61-
public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
69+
public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) {
6270
this.lastFailedEvent = lastFailedEvent;
6371
}
6472
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.eventmesh.connector.http.sink.handler;
1919

2020
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
21+
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
2122
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
22-
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
2323
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
2424
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2525

@@ -75,10 +75,9 @@ public void handle(ConnectRecord record) {
7575
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
7676
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
7777

78-
// add retry event to attributes
79-
HttpRetryEvent retryEvent = new HttpRetryEvent();
80-
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
81-
attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent);
78+
// add AttemptEvent to the attributes
79+
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
80+
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);
8281

8382
// deliver the record
8483
deliver(url, httpConnectRecord, attributes, record);

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java

+17-30
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
2121
import org.apache.eventmesh.common.utils.JsonUtils;
22+
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
2223
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
23-
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
2424
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
2525
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
2626
import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -176,13 +176,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
176176
* @param attributes additional attributes to be used in processing
177177
*/
178178
private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) {
179-
// get the retry event
180-
HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
179+
// get and update the attempt event
180+
HttpAttemptEvent attemptEvent = (HttpAttemptEvent) attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
181+
attemptEvent.updateEvent(e);
181182

182-
// get the multi http request context
183-
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
183+
// get and update the multiHttpRequestContext
184+
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, attemptEvent);
184185

185-
if (multiHttpRequestContext.getRemainingRequests() == 0) {
186+
if (multiHttpRequestContext.isAllRequestsProcessed()) {
186187
// do callback
187188
if (record.getCallback() == null) {
188189
if (log.isDebugEnabled()) {
@@ -193,7 +194,8 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
193194
return;
194195
}
195196

196-
HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
197+
// get the last failed event
198+
HttpAttemptEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
197199
if (lastFailedEvent == null) {
198200
// success
199201
record.getCallback().onSuccess(convertToSendResult(record));
@@ -204,41 +206,26 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
204206
}
205207
}
206208

207-
/**
208-
* Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
209-
*
210-
* @param attributes the attributes to use
211-
* @param httpConnectRecord the HttpConnectRecord to use
212-
* @param e the exception thrown during the request, may be null
213-
* @return the updated retry event
214-
*/
215-
private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object> attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
216-
// get the retry event
217-
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
218-
// update the retry event
219-
retryEvent.setLastException(e);
220-
return retryEvent;
221-
}
222-
223209

224210
/**
225211
* Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
226212
*
227-
* @param attributes the attributes to use
228-
* @param retryEvent the retry event to use
213+
* @param attributes the attributes to use
214+
* @param attemptEvent the HttpAttemptEvent to use
229215
* @return the updated multi http request context
230216
*/
231-
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpRetryEvent retryEvent) {
217+
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpAttemptEvent attemptEvent) {
232218
// get the multi http request context
233219
MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
234220

235-
if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
221+
// Check if the current attempted event has completed
222+
if (attemptEvent.isComplete()) {
236223
// decrement the counter
237224
multiHttpRequestContext.decrementRemainingRequests();
238225

239-
// try set failed event
240-
if (retryEvent.getLastException() != null) {
241-
multiHttpRequestContext.setLastFailedEvent(retryEvent);
226+
if (attemptEvent.getLastException() != null) {
227+
// if all attempts are exhausted, set the last failed event
228+
multiHttpRequestContext.setLastFailedEvent(attemptEvent);
242229
}
243230
}
244231

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java

+28-29
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
2121
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
2222
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
23-
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
2423
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
2524
import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
2625
import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {
5150

5251
private final HttpSinkHandler sinkHandler;
5352

53+
private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;
54+
5455
public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
5556
super(sinkConnectorConfig);
5657
this.sinkHandler = sinkHandler;
5758
this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
59+
this.retryPolicy = buildRetryPolicy();
60+
}
61+
62+
private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
63+
return RetryPolicy.<HttpResponse<Buffer>>builder()
64+
.handleIf(e -> e instanceof ConnectException)
65+
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
66+
.withMaxRetries(httpRetryConfig.getMaxRetries())
67+
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
68+
.onRetry(event -> {
69+
if (log.isDebugEnabled()) {
70+
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}",
71+
event.getAttemptCount(), httpRetryConfig.getInterval(), event.getLastException());
72+
} else {
73+
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.",
74+
event.getAttemptCount(), httpRetryConfig.getInterval());
75+
}
76+
}).onFailure(event -> {
77+
if (log.isDebugEnabled()) {
78+
log.error("Failed to deliver message after {} attempts. Error: {}",
79+
event.getAttemptCount(), event.getException());
80+
} else {
81+
log.error("Failed to deliver message after {} attempts.",
82+
event.getAttemptCount());
83+
}
84+
}).build();
5885
}
5986

6087
/**
@@ -78,36 +105,8 @@ public void start() {
78105
@Override
79106
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
80107
ConnectRecord connectRecord) {
81-
82-
// Build the retry policy
83-
RetryPolicy<HttpResponse<Buffer>> retryPolicy = RetryPolicy.<HttpResponse<Buffer>>builder()
84-
.handleIf(e -> e instanceof ConnectException)
85-
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
86-
.withMaxRetries(httpRetryConfig.getMaxRetries())
87-
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
88-
.onRetry(event -> {
89-
if (log.isDebugEnabled()) {
90-
log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
91-
} else {
92-
log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
93-
}
94-
// update the retry event
95-
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
96-
retryEvent.increaseCurrentRetries();
97-
})
98-
.onFailure(event -> {
99-
if (log.isDebugEnabled()) {
100-
log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
101-
httpConnectRecord, event.getException());
102-
} else {
103-
log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
104-
}
105-
}).build();
106-
107-
// Handle the ConnectRecord with retry policy
108108
Failsafe.with(retryPolicy)
109109
.getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes, connectRecord).toCompletionStage());
110-
111110
return null;
112111
}
113112

0 commit comments

Comments
 (0)