Skip to content

Commit da5fdbf

Browse files
authored
[ISSUE #5101] Define and standardize some common configurations for all Sources(#5102)
1 parent 510cf2d commit da5fdbf

File tree

25 files changed

+267
-112
lines changed

25 files changed

+267
-112
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java

+17
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,21 @@ public class Constants {
3030
public static final int DEFAULT_ATTEMPT = 3;
3131

3232
public static final int DEFAULT_PORT = 8080;
33+
34+
// ======================== Source Constants ========================
35+
/**
36+
* Default capacity
37+
*/
38+
public static final int DEFAULT_CAPACITY = 1024;
39+
40+
/**
41+
* Default poll batch size
42+
*/
43+
public static final int DEFAULT_POLL_BATCH_SIZE = 10;
44+
45+
/**
46+
* Default poll timeout (unit: ms)
47+
*/
48+
public static final long DEFAULT_POLL_TIMEOUT = 5000L;
49+
3350
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector;
19+
20+
import lombok.Data;
21+
22+
/**
23+
* Source Poll Config
24+
*/
25+
@Data
26+
public class PollConfig {
27+
28+
/**
29+
* Capacity of the poll queue
30+
*/
31+
private int capacity = Constants.DEFAULT_CAPACITY;
32+
33+
/**
34+
* Max batch size of the poll
35+
*/
36+
private int maxBatchSize = Constants.DEFAULT_POLL_BATCH_SIZE;
37+
38+
/**
39+
* Max wait time of the poll
40+
*/
41+
private long maxWaitTime = Constants.DEFAULT_POLL_TIMEOUT;
42+
43+
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,7 @@ public abstract class SourceConfig extends Config {
3030

3131
private OffsetStorageConfig offsetStorageConfig;
3232

33+
// Polling configuration, e.g. capacity, batch size, wait time, etc.
34+
private PollConfig pollConfig = new PollConfig();
35+
3336
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,7 @@ public class SourceConnectorConfig {
4444
*/
4545
private int maxFormAttributeSize = 1024 * 1024;
4646

47-
// max size of the queue, default 1000
48-
private int maxStorageSize = 1000;
49-
50-
// batch size, default 10
51-
private int batchSize = 10;
52-
53-
// protocol, default CloudEvent
47+
// protocol, default Common
5448
private String protocol = "Common";
5549

5650
// extra config, e.g. GitHub secret

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java

-1
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,4 @@ public class SourceConnectorConfig {
3232
private String enableAutoCommit = "false";
3333
private String sessionTimeoutMS = "10000";
3434
private String maxPollRecords = "1000";
35-
private int pollTimeOut = 100;
3635
}

Diff for: eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@
5050

5151
@Slf4j
5252
public class CanalSourceCheckConnector extends AbstractComponent implements Source, ConnectorCreateService<Source> {
53+
5354
private CanalSourceFullConfig config;
5455
private CanalFullPositionMgr positionMgr;
5556
private RdbTableMgr tableMgr;
5657
private ThreadPoolExecutor executor;
57-
private final BlockingQueue<List<ConnectRecord>> queue = new LinkedBlockingQueue<>();
58+
private BlockingQueue<List<ConnectRecord>> queue;
5859
private final AtomicBoolean flag = new AtomicBoolean(true);
60+
private long maxPollWaitTime;
5961

6062
@Override
6163
protected void run() throws Exception {
@@ -140,6 +142,8 @@ private void init() {
140142
DatabaseConnection.initSourceConnection();
141143
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource);
142144
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
145+
this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
146+
this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
143147
}
144148

145149
@Override
@@ -168,7 +172,7 @@ public void onException(ConnectRecord record) {
168172
public List<ConnectRecord> poll() {
169173
while (flag.get()) {
170174
try {
171-
List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
175+
List<ConnectRecord> records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS);
172176
if (records == null || records.isEmpty()) {
173177
continue;
174178
}

Diff for: eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ public class CanalSourceFullConnector extends AbstractComponent implements Sourc
5656
private CanalFullPositionMgr positionMgr;
5757
private RdbTableMgr tableMgr;
5858
private ThreadPoolExecutor executor;
59-
private final BlockingQueue<List<ConnectRecord>> queue = new LinkedBlockingQueue<>();
59+
private BlockingQueue<List<ConnectRecord>> queue;
6060
private final AtomicBoolean flag = new AtomicBoolean(true);
61+
private long maxPollWaitTime;
6162

6263
@Override
6364
protected void run() throws Exception {
@@ -137,6 +138,8 @@ private void init() {
137138
DatabaseConnection.initSourceConnection();
138139
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource);
139140
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
141+
this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
142+
this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
140143
}
141144

142145
@Override
@@ -166,7 +169,7 @@ public void onException(ConnectRecord record) {
166169
public List<ConnectRecord> poll() {
167170
while (flag.get()) {
168171
try {
169-
List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
172+
List<ConnectRecord> records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS);
170173
if (records == null || records.isEmpty()) {
171174
continue;
172175
}

Diff for: eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@
6161
@Slf4j
6262
public class ChatGPTSourceConnector implements Source {
6363

64-
private static final int DEFAULT_BATCH_SIZE = 10;
65-
6664
private ChatGPTSourceConfig sourceConfig;
6765
private BlockingQueue<CloudEvent> queue;
6866
private HttpServer server;
@@ -79,6 +77,9 @@ public class ChatGPTSourceConnector implements Source {
7977
private static final String APPLICATION_JSON = "application/json";
8078
private static final String TEXT_PLAIN = "text/plain";
8179

80+
private int maxBatchSize;
81+
private long maxPollWaitTime;
82+
8283

8384
@Override
8485
public Class<? extends Config> configClass() {
@@ -129,7 +130,9 @@ private void doInit() {
129130
if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
130131
this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr);
131132
}
132-
this.queue = new LinkedBlockingQueue<>(1024);
133+
this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
134+
this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
135+
this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
133136
final Vertx vertx = Vertx.vertx();
134137
final Router router = Router.router(vertx);
135138
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> {
@@ -239,14 +242,21 @@ public void stop() {
239242

240243
@Override
241244
public List<ConnectRecord> poll() {
242-
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
243-
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
245+
long startTime = System.currentTimeMillis();
246+
long remainingTime = maxPollWaitTime;
247+
248+
List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
249+
for (int i = 0; i < maxBatchSize; i++) {
244250
try {
245-
CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
251+
CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
246252
if (event == null) {
247253
break;
248254
}
249255
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
256+
257+
// calculate elapsed time and update remaining time for next poll
258+
long elapsedTime = System.currentTimeMillis() - startTime;
259+
remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0;
250260
} catch (InterruptedException e) {
251261
break;
252262
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java

-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ public synchronized List<E> fetchRange(int start, int end, boolean removed) {
142142
count++;
143143
}
144144
return items;
145-
146145
}
147146

148147

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java

+30-19
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.eventmesh.common.config.connector.Config;
2121
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
2222
import org.apache.eventmesh.common.exception.EventMeshException;
23-
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
2423
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
2524
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
2625
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
@@ -30,8 +29,9 @@
3029
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
3130

3231
import java.util.ArrayList;
33-
import java.util.Collections;
3432
import java.util.List;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.LinkedBlockingQueue;
3535
import java.util.concurrent.TimeUnit;
3636

3737
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -50,9 +50,11 @@ public class HttpSourceConnector implements Source, ConnectorCreateService<Sourc
5050

5151
private HttpSourceConfig sourceConfig;
5252

53-
private SynchronizedCircularFifoQueue<Object> queue;
53+
private BlockingQueue<Object> queue;
5454

55-
private int batchSize;
55+
private int maxBatchSize;
56+
57+
private long maxPollWaitTime;
5658

5759
private Route route;
5860

@@ -92,11 +94,11 @@ public void init(ConnectorContext connectorContext) {
9294

9395
private void doInit() {
9496
// init queue
95-
int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize();
96-
this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
97+
this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
9798

98-
// init batch size
99-
this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
99+
// init poll batch size and timeout
100+
this.maxBatchSize = this.sourceConfig.getPollConfig().getMaxBatchSize();
101+
this.maxPollWaitTime = this.sourceConfig.getPollConfig().getMaxWaitTime();
100102

101103
// init protocol
102104
String protocolName = this.sourceConfig.getConnectorConfig().getProtocol();
@@ -183,20 +185,29 @@ public void stop() {
183185

184186
@Override
185187
public List<ConnectRecord> poll() {
186-
// if queue is empty, return empty list
187-
if (queue.isEmpty()) {
188-
return Collections.emptyList();
189-
}
188+
// record current time
189+
long startTime = System.currentTimeMillis();
190+
long remainingTime = maxPollWaitTime;
191+
190192
// poll from queue
191-
List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
192-
for (int i = 0; i < batchSize; i++) {
193-
Object obj = queue.poll();
194-
if (obj == null) {
193+
List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
194+
for (int i = 0; i < maxBatchSize; i++) {
195+
try {
196+
Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
197+
if (obj == null) {
198+
break;
199+
}
200+
// convert to ConnectRecord
201+
ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
202+
connectRecords.add(connectRecord);
203+
204+
// calculate elapsed time and update remaining time for next poll
205+
long elapsedTime = System.currentTimeMillis() - startTime;
206+
remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0;
207+
} catch (Exception e) {
208+
log.error("Failed to poll from queue.", e);
195209
break;
196210
}
197-
// convert to ConnectRecord
198-
ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
199-
connectRecords.add(connectRecord);
200211
}
201212
return connectRecords;
202213
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package org.apache.eventmesh.connector.http.source.protocol;
1919

2020
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
21-
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
2221
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2322

23+
import java.util.concurrent.BlockingQueue;
24+
2425
import io.vertx.ext.web.Route;
2526

2627

@@ -45,7 +46,7 @@ public interface Protocol {
4546
* @param route route
4647
* @param queue queue info
4748
*/
48-
void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
49+
void setHandler(Route route, BlockingQueue<Object> queue);
4950

5051

5152
/**

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.eventmesh.connector.http.source.protocol.impl;
1919

2020
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
21-
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
2221
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
2322
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
2423
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2524
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
2625

26+
import java.util.concurrent.BlockingQueue;
27+
2728
import io.cloudevents.CloudEvent;
2829
import io.cloudevents.http.vertx.VertxMessageFactory;
2930
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -60,7 +61,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
6061
* @param queue queue info
6162
*/
6263
@Override
63-
public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue) {
64+
public void setHandler(Route route, BlockingQueue<Object> queue) {
6465
route.method(HttpMethod.POST)
6566
.handler(ctx -> VertxMessageFactory.createReader(ctx.request())
6667
.map(reader -> {

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import org.apache.eventmesh.common.Constants;
2121
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
2222
import org.apache.eventmesh.common.utils.JsonUtils;
23-
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
2423
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
2524
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
2625
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
2726
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2827

2928
import java.util.Base64;
3029
import java.util.Map;
30+
import java.util.concurrent.BlockingQueue;
3131
import java.util.stream.Collectors;
3232

3333
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -66,7 +66,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
6666
* @param queue queue info
6767
*/
6868
@Override
69-
public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue) {
69+
public void setHandler(Route route, BlockingQueue<Object> queue) {
7070
route.method(HttpMethod.POST)
7171
.handler(BodyHandler.create())
7272
.handler(ctx -> {

0 commit comments

Comments
 (0)