Skip to content

Commit 3436e56

Browse files
authored
[ISSUE #5020] Optimize unit tests and code (#5023)
1 parent c615f97 commit 3436e56

File tree

6 files changed

+238
-163
lines changed

6 files changed

+238
-163
lines changed

Diff for: eventmesh-connectors/eventmesh-connector-http/build.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ dependencies {
2424
implementation 'io.vertx:vertx-web-client:4.5.8'
2525
implementation 'dev.failsafe:failsafe:3.3.2'
2626

27+
28+
testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
29+
testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1'
2730
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
28-
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
2931
compileOnly 'org.projectlombok:lombok'
3032
annotationProcessor 'org.projectlombok:lombok'
3133
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
133133

134134
// get timestamp and offset
135135
Long timestamp = httpConnectRecord.getData().getTimestamp();
136-
Map<String, ?> offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
136+
Map<String, ?> offset = null;
137+
try {
138+
// May throw NullPointerException.
139+
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
140+
} catch (NullPointerException e) {
141+
// ignore null pointer exception
142+
}
143+
final Map<String, ?> finalOffset = offset;
137144

138145
// send the request
139146
return this.webClient.post(url.getPath())
@@ -143,26 +150,28 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
143150
.ssl(Objects.equals(url.getScheme(), "https"))
144151
.sendJson(httpConnectRecord)
145152
.onSuccess(res -> {
146-
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
153+
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
147154
// log the response
148155
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
149156
if (log.isDebugEnabled()) {
150157
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
151-
res.statusCode(), timestamp, offset, res.bodyAsString());
158+
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
152159
} else {
153-
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
160+
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
161+
finalOffset);
154162
}
155163
} else {
156164
if (log.isDebugEnabled()) {
157165
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
158-
res.statusCode(), timestamp, offset, res.bodyAsString());
166+
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
159167
} else {
160-
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
168+
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
169+
finalOffset);
161170
}
162171
}
163172

164173
})
165-
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err));
174+
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
166175
}
167176

168177

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java

+30-8
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
7272
// store the received data, when webhook is enabled
7373
private final SynchronizedCircularFifoQueue<HttpExportRecord> receivedDataQueue;
7474

75+
private volatile boolean exportStarted = false;
76+
77+
private volatile boolean exportDestroyed = false;
78+
79+
public boolean isExportStarted() {
80+
return exportStarted;
81+
}
82+
83+
public boolean isExportDestroyed() {
84+
return exportDestroyed;
85+
}
86+
7587
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
7688
super(sinkConnectorConfig);
7789
this.sinkConnectorConfig = sinkConnectorConfig;
@@ -179,10 +191,15 @@ public void start() {
179191
// start the webclient
180192
super.start();
181193
// start the export server
182-
Throwable t = this.exportServer.listen().cause();
183-
if (t != null) {
184-
throw new EventMeshException("Failed to start Vertx server. ", t);
185-
}
194+
this.exportServer.listen(res -> {
195+
if (res.succeeded()) {
196+
this.exportStarted = true;
197+
log.info("WebhookHttpExportServer started on port: {}", this.webhookConfig.getPort());
198+
} else {
199+
log.error("WebhookHttpExportServer failed to start on port: {}", this.webhookConfig.getPort());
200+
throw new EventMeshException("Failed to start Vertx server. ", res.cause());
201+
}
202+
});
186203
}
187204

188205
/**
@@ -250,10 +267,15 @@ public void stop() {
250267
super.stop();
251268
// stop the export server
252269
if (this.exportServer != null) {
253-
Throwable t = this.exportServer.close().cause();
254-
if (t != null) {
255-
throw new EventMeshException("Failed to stop Vertx server. ", t);
256-
}
270+
this.exportServer.close(res -> {
271+
if (res.succeeded()) {
272+
this.exportDestroyed = true;
273+
log.info("WebhookHttpExportServer stopped on port: {}", this.webhookConfig.getPort());
274+
} else {
275+
log.error("WebhookHttpExportServer failed to stop on port: {}", this.webhookConfig.getPort());
276+
throw new EventMeshException("Failed to stop Vertx server. ", res.cause());
277+
}
278+
});
257279
} else {
258280
log.warn("Callback server is null, ignore.");
259281
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java

+34-7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source {
5555

5656
private HttpServer server;
5757

58+
private volatile boolean started = false;
59+
60+
private volatile boolean destroyed = false;
61+
62+
public boolean isStarted() {
63+
return started;
64+
}
65+
66+
public boolean isDestroyed() {
67+
return destroyed;
68+
}
69+
5870

5971
@Override
6072
public Class<? extends Config> configClass() {
@@ -105,10 +117,15 @@ private void doInit() {
105117

106118
@Override
107119
public void start() {
108-
Throwable t = this.server.listen().cause();
109-
if (t != null) {
110-
throw new EventMeshException("failed to start Vertx server", t);
111-
}
120+
this.server.listen(res -> {
121+
if (res.succeeded()) {
122+
this.started = true;
123+
log.info("HttpSourceConnector started on port: {}", this.sourceConfig.getConnectorConfig().getPort());
124+
} else {
125+
log.error("HttpSourceConnector failed to start on port: {}", this.sourceConfig.getConnectorConfig().getPort());
126+
throw new EventMeshException("failed to start Vertx server", res.cause());
127+
}
128+
});
112129
}
113130

114131
@Override
@@ -123,9 +140,19 @@ public String name() {
123140

124141
@Override
125142
public void stop() {
126-
Throwable t = this.server.close().cause();
127-
if (t != null) {
128-
throw new EventMeshException("failed to stop Vertx server", t);
143+
if (this.server != null) {
144+
this.server.close(res -> {
145+
if (res.succeeded()) {
146+
this.destroyed = true;
147+
log.info("HttpSourceConnector stopped on port: {}", this.sourceConfig.getConnectorConfig().getPort());
148+
} else {
149+
log.error("HttpSourceConnector failed to stop on port: {}", this.sourceConfig.getConnectorConfig().getPort());
150+
throw new EventMeshException("failed to stop Vertx server", res.cause());
151+
}
152+
}
153+
);
154+
} else {
155+
log.warn("HttpSourceConnector server is null, ignore.");
129156
}
130157
}
131158

Diff for: eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java

+56-54
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.http.source.connector;
1919

20+
2021
import static org.mockserver.model.HttpRequest.request;
2122

2223
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
@@ -25,59 +26,64 @@
2526
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2627
import org.apache.eventmesh.openconnect.util.ConfigUtil;
2728

29+
import org.apache.hc.client5.http.fluent.Request;
30+
import org.apache.hc.core5.http.HttpStatus;
31+
import org.apache.hc.core5.net.URIBuilder;
32+
2833
import java.net.URI;
34+
import java.net.URL;
2935
import java.util.ArrayList;
3036
import java.util.List;
3137
import java.util.UUID;
38+
import java.util.concurrent.atomic.AtomicInteger;
3239

3340
import org.junit.jupiter.api.AfterEach;
41+
import org.junit.jupiter.api.Assertions;
3442
import org.junit.jupiter.api.BeforeEach;
3543
import org.junit.jupiter.api.Test;
3644
import org.mockserver.integration.ClientAndServer;
37-
import org.mockserver.model.HttpRequest;
3845
import org.mockserver.model.HttpResponse;
3946
import org.mockserver.model.MediaType;
4047

48+
4149
import com.alibaba.fastjson2.JSON;
4250
import com.alibaba.fastjson2.JSONArray;
4351
import com.alibaba.fastjson2.JSONObject;
4452

45-
import okhttp3.HttpUrl;
46-
import okhttp3.OkHttpClient;
47-
import okhttp3.Request;
48-
import okhttp3.Response;
49-
import okhttp3.ResponseBody;
5053

5154
public class HttpSinkConnectorTest {
5255

5356
private HttpSinkConnector sinkConnector;
5457

5558
private HttpSinkConfig sinkConfig;
5659

57-
private URI severUri;
60+
private URL url;
5861

5962
private ClientAndServer mockServer;
6063

64+
private static final AtomicInteger counter = new AtomicInteger(0);
6165

6266
@BeforeEach
6367
void before() throws Exception {
6468
// init sinkConnector
65-
this.sinkConnector = new HttpSinkConnector();
66-
this.sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
67-
this.sinkConnector.init(this.sinkConfig);
68-
this.sinkConnector.start();
69+
sinkConnector = new HttpSinkConnector();
70+
sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
71+
sinkConnector.init(this.sinkConfig);
72+
sinkConnector.start();
6973

70-
this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
74+
url = new URL(sinkConfig.connectorConfig.getUrls()[0]);
7175
// start mockServer
72-
mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
76+
mockServer = ClientAndServer.startClientAndServer(url.getPort());
7377
mockServer.reset()
7478
.when(
7579
request()
7680
.withMethod("POST")
77-
.withPath(severUri.getPath())
81+
.withPath(url.getPath())
7882
)
7983
.respond(
8084
httpRequest -> {
85+
// Increase the number of requests received
86+
counter.incrementAndGet();
8187
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
8288
return HttpResponse.response()
8389
.withContentType(MediaType.APPLICATION_JSON)
@@ -90,6 +96,7 @@ void before() throws Exception {
9096
); // .withDelay(TimeUnit.SECONDS, 10);
9197
}
9298
);
99+
93100
}
94101

95102
@AfterEach
@@ -101,62 +108,57 @@ void after() throws Exception {
101108
@Test
102109
void testPut() throws Exception {
103110
// Create a list of ConnectRecord
104-
final int times = 10;
111+
final int size = 10;
105112
List<ConnectRecord> connectRecords = new ArrayList<>();
106-
for (int i = 0; i < times; i++) {
113+
for (int i = 0; i < size; i++) {
107114
ConnectRecord record = createConnectRecord();
108115
connectRecords.add(record);
109116
}
110117
// Put ConnectRecord
111118
sinkConnector.put(connectRecords);
112119

113-
// sleep 5s
114-
Thread.sleep(5000);
115-
116-
// verify request
117-
HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null);
118-
// assert recordedRequests.length == times;
120+
// wait for receiving request
121+
final int times = 5000; // 5 seconds
122+
long start = System.currentTimeMillis();
123+
while (counter.get() < size) {
124+
if (System.currentTimeMillis() - start > times) {
125+
// timeout
126+
Assertions.fail("The number of requests received=" + counter.get() + " is less than the number of ConnectRecord=" + size);
127+
} else {
128+
Thread.sleep(100);
129+
}
130+
}
119131

120132
// verify response
121133
HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
122-
String url = new HttpUrl.Builder()
123-
.scheme("http")
124-
.host(severUri.getHost())
125-
.port(webhookConfig.getPort())
126-
.addPathSegments(webhookConfig.getExportPath())
127-
.addQueryParameter("pageNum", "1")
128-
.addQueryParameter("pageSize", "10")
129-
.addQueryParameter("type", "poll")
130-
.build().toString();
131-
132-
// build request
133-
Request request = new Request.Builder()
134-
.url(url)
135-
.addHeader("Content-Type", "application/json")
134+
135+
URI exportUrl = new URIBuilder()
136+
.setScheme("http")
137+
.setHost(url.getHost())
138+
.setPort(webhookConfig.getPort())
139+
.setPath(webhookConfig.getExportPath())
140+
.addParameter("pageNum", "1")
141+
.addParameter("pageSize", "10")
142+
.addParameter("type", "poll")
136143
.build();
137144

138-
OkHttpClient client = new OkHttpClient();
139-
try (Response response = client.newCall(request).execute()) {
140-
// check response code
141-
if (!response.isSuccessful()) {
142-
throw new RuntimeException("Unexpected response code: " + response);
143-
}
144-
// check response body
145-
ResponseBody responseBody = response.body();
146-
if (responseBody != null) {
147-
JSONObject jsonObject = JSON.parseObject(responseBody.string());
145+
Request.get(exportUrl)
146+
.execute()
147+
.handleResponse(response -> {
148+
// check response code
149+
Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
150+
// check response body
151+
JSONObject jsonObject = JSON.parseObject(response.getEntity().getContent());
148152
JSONArray pageItems = jsonObject.getJSONArray("pageItems");
149153

150-
assert pageItems != null && pageItems.size() == times;
151-
152-
for (int i = 0; i < times; i++) {
154+
Assertions.assertNotNull(pageItems);
155+
Assertions.assertEquals(size, pageItems.size());
156+
for (int i = 0; i < size; i++) {
153157
JSONObject pageItem = pageItems.getJSONObject(i);
154-
assert pageItem != null;
155-
// assert pageItem.getJSONObject("data") != null;
156-
// assert pageItem.getJSONObject("metadata") != null;
158+
Assertions.assertNotNull(pageItem);
157159
}
158-
}
159-
}
160+
return null;
161+
});
160162
}
161163

162164
private ConnectRecord createConnectRecord() {

0 commit comments

Comments
 (0)