Skip to content

Commit d76ee25

Browse files
authored
[ISSUE apache#4869] Add Webhook support for HTTP Source Connector (apache#4913)
* feat: add webhook support * feat: add CommonProtocol and optimize some logic * doc: update some javaDoc * fix: update something * fix: Optimize queue related operations * fix: Optimize queue related operations * fix: Optimize queue related operations * fix: update something * fix: Remove some meaningless code * fix: update something * perf: Updates to protocol-related code, with a focus on request handling * perf: use CircularFifoQueue * perf: Use OkHttp Client to completely replace Apache Client and resolve code conflicts.
1 parent c67116e commit d76ee25

File tree

17 files changed

+1065
-169
lines changed

17 files changed

+1065
-169
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.eventmesh.common.config.connector.http;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
22+
2023
import lombok.Data;
2124

2225
@Data
@@ -28,5 +31,28 @@ public class SourceConnectorConfig {
2831

2932
private int port;
3033

31-
private int idleTimeout;
34+
// timeunit: ms, default 5000ms
35+
private int idleTimeout = 5000;
36+
37+
/**
38+
* <ul>
39+
* <li>The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data </li>
40+
* <li>Default is 1MB (1024 * 1024 bytes). </li>
41+
* <li>If you receive a "size exceed allowed maximum capacity" error, you can increase this value. </li>
42+
* <li>Note: This applies only when handling form data submissions.</li>
43+
* </ul>
44+
*/
45+
private int maxFormAttributeSize = 1024 * 1024;
46+
47+
// max size of the queue, default 1000
48+
private int maxStorageSize = 1000;
49+
50+
// batch size, default 10
51+
private int batchSize = 10;
52+
53+
// protocol, default CloudEvent
54+
private String protocol = "CloudEvent";
55+
56+
// extra config, e.g. GitHub secret
57+
private Map<String, String> extraConfig = new HashMap<>();
3258
}

Diff for: eventmesh-connectors/eventmesh-connector-http/build.gradle

-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ dependencies {
2424
implementation 'io.vertx:vertx-web-client:4.5.8'
2525
implementation 'dev.failsafe:failsafe:3.3.2'
2626

27-
testImplementation "org.apache.httpcomponents:httpclient"
2827
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
2928
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
3029
compileOnly 'org.projectlombok:lombok'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.common;
19+
20+
import org.apache.commons.collections4.queue.CircularFifoQueue;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.Iterator;
25+
import java.util.List;
26+
27+
28+
/**
29+
* SynchronizedCircularFifoQueue is a synchronized version of CircularFifoQueue.
30+
*/
31+
public class SynchronizedCircularFifoQueue<E> extends CircularFifoQueue<E> {
32+
33+
/**
34+
* <p>Default constructor. capacity = 32</p>
35+
*/
36+
public SynchronizedCircularFifoQueue() {
37+
super();
38+
}
39+
40+
public SynchronizedCircularFifoQueue(Collection<? extends E> coll) {
41+
super(coll);
42+
}
43+
44+
public SynchronizedCircularFifoQueue(int size) {
45+
super(size);
46+
}
47+
48+
@Override
49+
public synchronized boolean add(E element) {
50+
return super.add(element);
51+
}
52+
53+
@Override
54+
public synchronized void clear() {
55+
super.clear();
56+
}
57+
58+
@Override
59+
public synchronized E element() {
60+
return super.element();
61+
}
62+
63+
@Override
64+
public synchronized E get(int index) {
65+
return super.get(index);
66+
}
67+
68+
@Override
69+
public synchronized boolean isAtFullCapacity() {
70+
return super.isAtFullCapacity();
71+
}
72+
73+
@Override
74+
public synchronized boolean isEmpty() {
75+
return super.isEmpty();
76+
}
77+
78+
@Override
79+
public synchronized boolean isFull() {
80+
return super.isFull();
81+
}
82+
83+
@Override
84+
public synchronized int maxSize() {
85+
return super.maxSize();
86+
}
87+
88+
@Override
89+
public synchronized boolean offer(E element) {
90+
return super.offer(element);
91+
}
92+
93+
@Override
94+
public synchronized E peek() {
95+
return super.peek();
96+
}
97+
98+
@Override
99+
public synchronized E poll() {
100+
return super.poll();
101+
}
102+
103+
@Override
104+
public synchronized E remove() {
105+
return super.remove();
106+
}
107+
108+
@Override
109+
public synchronized int size() {
110+
return super.size();
111+
}
112+
113+
/**
114+
* <p>Fetch a range of elements from the queue.</p>
115+
*
116+
* @param start start index
117+
* @param end end index
118+
* @param removed whether to remove the elements from the queue
119+
* @return list of elements
120+
*/
121+
public synchronized List<E> fetchRange(int start, int end, boolean removed) {
122+
123+
if (start < 0 || end > this.size() || start > end) {
124+
throw new IllegalArgumentException("Invalid range");
125+
}
126+
127+
Iterator<E> iterator = this.iterator();
128+
List<E> items = new ArrayList<>(end - start);
129+
130+
int count = 0;
131+
while (iterator.hasNext() && count < end) {
132+
E item = iterator.next();
133+
if (item != null && count >= start) {
134+
// Add the element to the list
135+
items.add(item);
136+
if (removed) {
137+
// Remove the element from the queue
138+
iterator.remove();
139+
}
140+
}
141+
count++;
142+
}
143+
return items;
144+
145+
}
146+
147+
148+
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
132132
// convert the result to an HttpExportRecord
133133
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
134134
// add the data to the queue
135-
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
135+
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
136136
}
137137
})
138138
.onRetry(event -> {
@@ -144,7 +144,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
144144
if (connectorConfig.getWebhookConfig().isActivate()) {
145145
HttpExportRecord exportRecord =
146146
covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id);
147-
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
147+
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
148148
}
149149
// update the HttpConnectRecord
150150
httpConnectRecord.setTime(LocalDateTime.now().toString());
@@ -159,7 +159,7 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
159159
}
160160
if (connectorConfig.getWebhookConfig().isActivate()) {
161161
HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
162-
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
162+
((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
163163
}
164164
}).build();
165165

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java

+12-70
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.eventmesh.connector.http.sink.handle;
1919

2020
import org.apache.eventmesh.common.exception.EventMeshException;
21+
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
2122
import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
2223
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
2324
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
@@ -30,13 +31,9 @@
3031

3132
import java.net.URI;
3233
import java.time.LocalDateTime;
33-
import java.util.ArrayList;
34-
import java.util.Iterator;
3534
import java.util.List;
3635
import java.util.Objects;
37-
import java.util.concurrent.ConcurrentLinkedQueue;
3836
import java.util.concurrent.TimeUnit;
39-
import java.util.concurrent.atomic.AtomicInteger;
4037

4138
import io.netty.handler.codec.http.HttpResponseStatus;
4239
import io.vertx.core.Future;
@@ -73,25 +70,22 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
7370
private HttpServer exportServer;
7471

7572
// store the received data, when webhook is enabled
76-
private final ConcurrentLinkedQueue<HttpExportRecord> receivedDataQueue;
77-
78-
// the maximum queue size
79-
private final int maxQueueSize;
80-
81-
// the current queue size
82-
private final AtomicInteger currentQueueSize;
73+
private final SynchronizedCircularFifoQueue<HttpExportRecord> receivedDataQueue;
8374

8475
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
8576
super(sinkConnectorConfig);
8677
this.sinkConnectorConfig = sinkConnectorConfig;
8778
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
88-
this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
89-
this.currentQueueSize = new AtomicInteger(0);
90-
this.receivedDataQueue = new ConcurrentLinkedQueue<>();
79+
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
80+
this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
9181
// init the export server
9282
doInitExportServer();
9383
}
9484

85+
public SynchronizedCircularFifoQueue<HttpExportRecord> getReceivedDataQueue() {
86+
return receivedDataQueue;
87+
}
88+
9589
/**
9690
* Initialize the server for exporting the received data
9791
*/
@@ -135,7 +129,7 @@ private void doInitExportServer() {
135129
int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : Integer.parseInt(pageNumStr);
136130
int pageSize = Integer.parseInt(pageSizeStr);
137131

138-
if (currentQueueSize.get() == 0) {
132+
if (receivedDataQueue.isEmpty()) {
139133
ctx.response()
140134
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
141135
.setStatusCode(HttpResponseStatus.NO_CONTENT.code())
@@ -148,12 +142,12 @@ private void doInitExportServer() {
148142
List<HttpExportRecord> exportRecords;
149143
if (Objects.equals(type, TypeEnum.POLL.getValue())) {
150144
// If the type is poll, only the first page of data is exported and removed
151-
exportRecords = getDataFromQueue(0, pageSize, true);
145+
exportRecords = receivedDataQueue.fetchRange(0, pageSize, true);
152146
} else {
153147
// If the type is peek, the specified page of data is exported without removing
154148
int startIndex = (pageNum - 1) * pageSize;
155149
int endIndex = startIndex + pageSize;
156-
exportRecords = getDataFromQueue(startIndex, endIndex, false);
150+
exportRecords = receivedDataQueue.fetchRange(startIndex, endIndex, false);
157151
}
158152

159153
// Create HttpExportRecordPage
@@ -242,63 +236,11 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
242236
// create ExportRecord
243237
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
244238
// add the data to the queue
245-
addDataToQueue(exportRecord);
239+
receivedDataQueue.offer(exportRecord);
246240
});
247241
}
248242

249243

250-
/**
251-
* Adds the received data to the queue.
252-
*
253-
* @param exportRecord the received data to add to the queue
254-
*/
255-
public void addDataToQueue(HttpExportRecord exportRecord) {
256-
// If the current queue size is greater than or equal to the maximum queue size, remove the oldest element
257-
if (currentQueueSize.get() >= maxQueueSize) {
258-
Object removedData = receivedDataQueue.poll();
259-
if (log.isDebugEnabled()) {
260-
log.debug("The queue is full, remove the oldest element: {}", removedData);
261-
} else {
262-
log.info("The queue is full, remove the oldest element");
263-
}
264-
currentQueueSize.decrementAndGet();
265-
}
266-
// Try to put the received data into the queue
267-
if (receivedDataQueue.offer(exportRecord)) {
268-
currentQueueSize.incrementAndGet();
269-
log.debug("Successfully put the received data into the queue: {}", exportRecord);
270-
} else {
271-
log.error("Failed to put the received data into the queue: {}", exportRecord);
272-
}
273-
}
274-
275-
/**
276-
* Gets the received data from the queue.
277-
*
278-
* @param startIndex the start index of the data to get
279-
* @param endIndex the end index of the data to get
280-
* @param removed whether to remove the data from the queue
281-
* @return the received data
282-
*/
283-
private List<HttpExportRecord> getDataFromQueue(int startIndex, int endIndex, boolean removed) {
284-
Iterator<HttpExportRecord> iterator = receivedDataQueue.iterator();
285-
286-
List<HttpExportRecord> pageItems = new ArrayList<>(endIndex - startIndex);
287-
int count = 0;
288-
while (iterator.hasNext() && count < endIndex) {
289-
HttpExportRecord item = iterator.next();
290-
if (count >= startIndex) {
291-
pageItems.add(item);
292-
if (removed) {
293-
iterator.remove();
294-
currentQueueSize.decrementAndGet();
295-
}
296-
}
297-
count++;
298-
}
299-
return pageItems;
300-
}
301-
302244
/**
303245
* Cleans up and releases resources used by the HTTP/HTTPS handler.
304246
*/

0 commit comments

Comments
 (0)