Skip to content

Commit 298f3b5

Browse files
authored
[ISSUE apache#5069] Enhancement for http source/sink connector (apache#5070)
* [ISSUE apache#5069] Enhancement for http source/sink connector * update http source connector & config * fix checkstyle error
1 parent ea9e7a1 commit 298f3b5

File tree

4 files changed

+100
-55
lines changed

4 files changed

+100
-55
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ public class SourceConnectorConfig {
5555

5656
// extra config, e.g. GitHub secret
5757
private Map<String, String> extraConfig = new HashMap<>();
58+
59+
// data consistency enabled, default true
60+
private boolean dataConsistencyEnabled = true;
5861
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java

+62-39
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
2222
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
2323
import org.apache.eventmesh.connector.http.util.HttpUtils;
24+
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
2426
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2527

2628
import java.net.URI;
@@ -111,14 +113,70 @@ public void handle(ConnectRecord record) {
111113
// convert ConnectRecord to HttpConnectRecord
112114
String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
113115
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
114-
deliver(url, httpConnectRecord);
116+
// get timestamp and offset
117+
Long timestamp = httpConnectRecord.getData().getTimestamp();
118+
Map<String, ?> offset = null;
119+
try {
120+
// May throw NullPointerException.
121+
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
122+
} catch (NullPointerException e) {
123+
// ignore null pointer exception
124+
}
125+
final Map<String, ?> finalOffset = offset;
126+
Future<HttpResponse<Buffer>> responseFuture = deliver(url, httpConnectRecord);
127+
responseFuture.onSuccess(res -> {
128+
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
129+
// log the response
130+
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
131+
if (log.isDebugEnabled()) {
132+
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
133+
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
134+
} else {
135+
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
136+
finalOffset);
137+
}
138+
record.getCallback().onSuccess(convertToSendResult(record));
139+
} else {
140+
if (log.isDebugEnabled()) {
141+
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
142+
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
143+
} else {
144+
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
145+
finalOffset);
146+
}
147+
record.getCallback()
148+
.onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode())));
149+
}
150+
}).onFailure(err -> {
151+
log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err);
152+
record.getCallback().onException(buildSendExceptionContext(record, err));
153+
});
154+
}
155+
}
156+
157+
private SendResult convertToSendResult(ConnectRecord record) {
158+
SendResult result = new SendResult();
159+
result.setMessageId(record.getRecordId());
160+
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
161+
result.setTopic(record.getExtension("topic"));
162+
}
163+
return result;
164+
}
165+
166+
private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
167+
SendExceptionContext sendExceptionContext = new SendExceptionContext();
168+
sendExceptionContext.setMessageId(record.getRecordId());
169+
sendExceptionContext.setCause(e);
170+
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
171+
sendExceptionContext.setTopic(record.getExtension("topic"));
115172
}
173+
return sendExceptionContext;
116174
}
117175

118176

119177
/**
120-
* Processes HttpConnectRecord on specified URL while returning its own processing logic.
121-
* This method sends the HttpConnectRecord to the specified URL using the WebClient.
178+
* Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
179+
* URL using the WebClient.
122180
*
123181
* @param url URI to which the HttpConnectRecord should be sent
124182
* @param httpConnectRecord HttpConnectRecord to process
@@ -130,48 +188,13 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
130188
MultiMap headers = HttpHeaders.headers()
131189
.set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
132190
.set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
133-
134-
// get timestamp and offset
135-
Long timestamp = httpConnectRecord.getData().getTimestamp();
136-
Map<String, ?> offset = null;
137-
try {
138-
// May throw NullPointerException.
139-
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
140-
} catch (NullPointerException e) {
141-
// ignore null pointer exception
142-
}
143-
final Map<String, ?> finalOffset = offset;
144-
145191
// send the request
146192
return this.webClient.post(url.getPath())
147193
.host(url.getHost())
148194
.port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
149195
.putHeaders(headers)
150196
.ssl(Objects.equals(url.getScheme(), "https"))
151-
.sendJson(httpConnectRecord)
152-
.onSuccess(res -> {
153-
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
154-
// log the response
155-
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
156-
if (log.isDebugEnabled()) {
157-
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
158-
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
159-
} else {
160-
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
161-
finalOffset);
162-
}
163-
} else {
164-
if (log.isDebugEnabled()) {
165-
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
166-
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
167-
} else {
168-
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
169-
finalOffset);
170-
}
171-
}
172-
173-
})
174-
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
197+
.sendJson(httpConnectRecord);
175198
}
176199

177200

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java

+25-11
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
import java.util.List;
3535
import java.util.concurrent.TimeUnit;
3636

37+
import io.netty.handler.codec.http.HttpResponseStatus;
3738
import io.vertx.core.Vertx;
3839
import io.vertx.core.http.HttpServer;
3940
import io.vertx.core.http.HttpServerOptions;
4041
import io.vertx.ext.web.Route;
4142
import io.vertx.ext.web.Router;
4243
import io.vertx.ext.web.handler.LoggerHandler;
4344

45+
import lombok.Getter;
4446
import lombok.extern.slf4j.Slf4j;
4547

4648
@Slf4j
@@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, ConnectorCreateService<Sourc
5254

5355
private int batchSize;
5456

57+
private Route route;
58+
5559
private Protocol protocol;
5660

5761
private HttpServer server;
5862

63+
@Getter
5964
private volatile boolean started = false;
6065

66+
@Getter
6167
private volatile boolean destroyed = false;
6268

63-
public boolean isStarted() {
64-
return started;
65-
}
66-
67-
public boolean isDestroyed() {
68-
return destroyed;
69-
}
70-
7169

7270
@Override
7371
public Class<? extends Config> configClass() {
@@ -106,7 +104,7 @@ private void doInit() {
106104

107105
final Vertx vertx = Vertx.vertx();
108106
final Router router = Router.router(vertx);
109-
final Route route = router.route()
107+
route = router.route()
110108
.path(this.sourceConfig.connectorConfig.getPath())
111109
.handler(LoggerHandler.create());
112110

@@ -136,7 +134,15 @@ public void start() {
136134

137135
@Override
138136
public void commit(ConnectRecord record) {
139-
137+
if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
138+
this.route.handler(ctx -> {
139+
// Return 200 OK
140+
ctx.response()
141+
.putHeader("content-type", "application/json")
142+
.setStatusCode(HttpResponseStatus.OK.code())
143+
.end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}");
144+
});
145+
}
140146
}
141147

142148
@Override
@@ -146,7 +152,15 @@ public String name() {
146152

147153
@Override
148154
public void onException(ConnectRecord record) {
149-
155+
if (this.route != null) {
156+
this.route.failureHandler(ctx -> {
157+
log.error("Failed to handle the request, recordId {}. ", record.getRecordId(), ctx.failure());
158+
// Return Bad Response
159+
ctx.response()
160+
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
161+
.end("{\"status\":\"failed\",\"recordId\":\"" + record.getRecordId() + "\"}");
162+
});
163+
}
150164
}
151165

152166
@Override

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,16 @@ public class CommonProtocol implements Protocol {
4545

4646
public static final String PROTOCOL_NAME = "Common";
4747

48+
private SourceConnectorConfig sourceConnectorConfig;
49+
4850
/**
4951
* Initialize the protocol
5052
*
5153
* @param sourceConnectorConfig source connector config
5254
*/
5355
@Override
5456
public void initialize(SourceConnectorConfig sourceConnectorConfig) {
55-
57+
this.sourceConnectorConfig = sourceConnectorConfig;
5658
}
5759

5860
/**
@@ -77,10 +79,13 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue)
7779
throw new IllegalStateException("Failed to store the request.");
7880
}
7981

80-
// Return 200 OK
81-
ctx.response()
82-
.setStatusCode(HttpResponseStatus.OK.code())
83-
.end(CommonResponse.success().toJsonStr());
82+
if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
83+
// Return 200 OK
84+
ctx.response()
85+
.setStatusCode(HttpResponseStatus.OK.code())
86+
.end(CommonResponse.success().toJsonStr());
87+
}
88+
8489
})
8590
.failureHandler(ctx -> {
8691
log.error("Failed to handle the request. ", ctx.failure());

0 commit comments

Comments
 (0)