Skip to content

Commit 71648c9

Browse files
authored
[ISSUE apache#5077] HTTP Sink Connector supports result callback (apache#5078)
* feat: Support CallBack for ConnectRecord * doc: Improve some documentation * feat: Support for multi-server data callbacks * perf: Optimize some logic
1 parent 240ea62 commit 71648c9

16 files changed

+603
-344
lines changed

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ public synchronized int size() {
120120
*/
121121
public synchronized List<E> fetchRange(int start, int end, boolean removed) {
122122

123-
if (start < 0 || end > this.size() || start > end) {
123+
if (start < 0 || start > end) {
124124
throw new IllegalArgumentException("Invalid range");
125125
}
126+
end = Math.min(end, this.size());
126127

127128
Iterator<E> iterator = this.iterator();
128129
List<E> items = new ArrayList<>(end - start);

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import org.apache.eventmesh.common.config.connector.Config;
2121
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
2222
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
23-
import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
24-
import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
25-
import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler;
26-
import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
23+
import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
24+
import org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler;
25+
import org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper;
26+
import org.apache.eventmesh.connector.http.sink.handler.impl.WebhookHttpSinkHandler;
2727
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
2828
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
2929
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
@@ -86,7 +86,7 @@ private void doInit() {
8686
this.sinkHandler = nonRetryHandler;
8787
} else if (maxRetries > 0) {
8888
// Wrap the sink handler with a retry handler
89-
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
89+
this.sinkHandler = new HttpSinkHandlerRetryWrapper(this.httpSinkConfig.connectorConfig, nonRetryHandler);
9090
} else {
9191
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
9292
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public class HttpRetryConfig {
2424
// maximum number of retries, default 2, minimum 0
2525
private int maxRetries = 2;
2626

27-
// retry interval, default 2000ms
28-
private int interval = 2000;
27+
// retry interval, default 1000ms
28+
private int interval = 1000;
2929

3030
// Default value is false, indicating that only requests with network-level errors will be retried.
3131
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java

+35-9
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,60 @@
2020
import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
2121
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2222

23+
import java.io.Serializable;
2324
import java.time.LocalDateTime;
2425
import java.util.HashMap;
2526
import java.util.Map;
2627
import java.util.UUID;
2728

2829
import lombok.Builder;
29-
import lombok.Data;
30+
import lombok.Getter;
3031

3132
/**
3233
* a special ConnectRecord for HttpSinkConnector
3334
*/
34-
@Data
35+
@Getter
3536
@Builder
36-
public class HttpConnectRecord {
37+
public class HttpConnectRecord implements Serializable {
3738

38-
private String type;
39+
private static final long serialVersionUID = 5271462532332251473L;
40+
41+
/**
42+
* The unique identifier for the HttpConnectRecord
43+
*/
44+
private final String httpRecordId = UUID.randomUUID().toString();
3945

40-
private String time;
46+
/**
47+
* The time when the HttpConnectRecord was created
48+
*/
49+
private LocalDateTime createTime;
4150

42-
private String uuid;
51+
/**
52+
* The type of the HttpConnectRecord
53+
*/
54+
private String type;
4355

56+
/**
57+
* The event id of the HttpConnectRecord
58+
*/
4459
private String eventId;
4560

61+
/**
62+
* The ConnectRecord to be sent
63+
*/
4664
private ConnectRecord data;
4765

66+
@Override
67+
public String toString() {
68+
return "HttpConnectRecord{"
69+
+ "createTime=" + createTime
70+
+ ", httpRecordId='" + httpRecordId
71+
+ ", type='" + type
72+
+ ", eventId='" + eventId
73+
+ ", data=" + data
74+
+ '}';
75+
}
76+
4877
/**
4978
* Convert ConnectRecord to HttpConnectRecord
5079
*
@@ -62,11 +91,8 @@ public static HttpConnectRecord convertConnectRecord(ConnectRecord record, Strin
6291
}
6392
return HttpConnectRecord.builder()
6493
.type(type)
65-
.time(LocalDateTime.now().toString())
66-
.uuid(UUID.randomUUID().toString())
6794
.eventId(type + "-" + offset)
6895
.data(record)
6996
.build();
7097
}
71-
7298
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.http.sink.data;
1919

20+
import java.io.Serializable;
2021
import java.time.LocalDateTime;
2122

2223
import lombok.Builder;
@@ -27,7 +28,10 @@
2728
*/
2829
@Data
2930
@Builder
30-
public class HttpExportMetadata {
31+
public class HttpExportMetadata implements Serializable {
32+
33+
private static final long serialVersionUID = 1121010466793041920L;
34+
3135
private String url;
3236

3337
private int code;
@@ -36,7 +40,9 @@ public class HttpExportMetadata {
3640

3741
private LocalDateTime receivedTime;
3842

39-
private String uuid;
43+
private String httpRecordId;
44+
45+
private String recordId;
4046

4147
private String retriedBy;
4248

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.connector.http.sink.data;
1919

20+
import java.io.Serializable;
21+
2022
import lombok.AllArgsConstructor;
2123
import lombok.Data;
2224

@@ -25,7 +27,9 @@
2527
*/
2628
@Data
2729
@AllArgsConstructor
28-
public class HttpExportRecord {
30+
public class HttpExportRecord implements Serializable {
31+
32+
private static final long serialVersionUID = 6010283911452947157L;
2933

3034
private HttpExportMetadata metadata;
3135

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.http.sink.data;
1919

20+
import java.io.Serializable;
2021
import java.util.List;
2122

2223
import lombok.AllArgsConstructor;
@@ -27,7 +28,9 @@
2728
*/
2829
@Data
2930
@AllArgsConstructor
30-
public class HttpExportRecordPage {
31+
public class HttpExportRecordPage implements Serializable {
32+
33+
private static final long serialVersionUID = 1143791658357035990L;
3134

3235
private int pageNum;
3336

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.data;
19+
20+
import lombok.Data;
21+
22+
/**
23+
* Single HTTP retry event
24+
*/
25+
@Data
26+
public class HttpRetryEvent {
27+
28+
public static final String PREFIX = "http-retry-event-";
29+
30+
private String parentId;
31+
32+
private int maxRetries;
33+
34+
private int currentRetries;
35+
36+
private Throwable lastException;
37+
38+
/**
39+
* Increase the current retries by 1
40+
*/
41+
public void increaseCurrentRetries() {
42+
this.currentRetries++;
43+
}
44+
45+
/**
46+
* Check if the current retries is greater than or equal to the max retries
47+
* @return true if the current retries is greater than or equal to the max retries
48+
*/
49+
public boolean isMaxRetriesReached() {
50+
return this.currentRetries >= this.maxRetries;
51+
}
52+
53+
/**
54+
* Get the limited exception message with the default limit of 256
55+
* @return the limited exception message
56+
*/
57+
public String getLimitedExceptionMessage() {
58+
return getLimitedExceptionMessage(256);
59+
}
60+
61+
/**
62+
* Get the limited exception message with the specified limit
63+
* @param maxLimit the maximum limit of the exception message
64+
* @return the limited exception message
65+
*/
66+
public String getLimitedExceptionMessage(int maxLimit) {
67+
if (lastException == null) {
68+
return "";
69+
}
70+
String message = lastException.getMessage();
71+
if (message == null) {
72+
return "";
73+
}
74+
if (message.length() > maxLimit) {
75+
return message.substring(0, maxLimit);
76+
}
77+
return message;
78+
}
79+
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.data;
19+
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
23+
/**
24+
* Multi HTTP request context
25+
*/
26+
public class MultiHttpRequestContext {
27+
28+
public static final String NAME = "multi-http-request-context";
29+
30+
/**
31+
* The remaining requests to be processed.
32+
*/
33+
private final AtomicInteger remainingRequests;
34+
35+
/**
36+
* The last failed event.
37+
* If there are no retries or retries are not enabled, it will be null.
38+
* If retries occur but still fail, it will be logged, and only the last one will be retained.
39+
*/
40+
private HttpRetryEvent lastFailedEvent;
41+
42+
public MultiHttpRequestContext(int remainingEvents) {
43+
this.remainingRequests = new AtomicInteger(remainingEvents);
44+
}
45+
46+
/**
47+
* Decrement the remaining requests by 1.
48+
*/
49+
public void decrementRemainingRequests() {
50+
remainingRequests.decrementAndGet();
51+
}
52+
53+
public int getRemainingRequests() {
54+
return remainingRequests.get();
55+
}
56+
57+
public HttpRetryEvent getLastFailedEvent() {
58+
return lastFailedEvent;
59+
}
60+
61+
public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
62+
this.lastFailedEvent = lastFailedEvent;
63+
}
64+
}

0 commit comments

Comments
 (0)