Skip to content

Commit 6b0a4c0

Browse files
authored
[ISSUE apache#4824] Add HTTP Sink Connector (apache#4837)
* feat: Add HTTP Sink Connector * refactor: Replace okHttpClient with vertx.WebClient * fix: Resolving dependency conflicts * test: Add HttpSinkConnectorTest * fix: Add License * fix: Solving dependency issues * fix: License Check * feat: Add HTTPS/SSL support * fix: Optimize logging * feat: Add webhook functionality * fix: Fix some bugs * test: add callback test * refactor: Add webhook Support * fix: Optimization tests and configuration additions * fix: code style * feat: rebuild WebhookHttpSinkHandler and add RetryHttpSinkHandler * fix: fix ci * refactor: Use failsafe alternative resilience4j and optimize webhook functionality * fix: fix License Check * fix: update something * fix: fix ci * fix: update something * fix: Optimized naming * fix: fix ci * fix: fix style check error * test: update HttpSinkConnectorTest
1 parent 046c177 commit 6b0a4c0

File tree

20 files changed

+1595
-2
lines changed

20 files changed

+1595
-2
lines changed

Diff for: eventmesh-connectors/eventmesh-connector-http/build.gradle

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
dependencies {
1919
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
2020
implementation project(":eventmesh-common")
21-
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
21+
22+
implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
2223
implementation 'io.vertx:vertx-web:4.4.6'
24+
implementation 'io.vertx:vertx-web-client:4.4.6'
25+
implementation 'dev.failsafe:failsafe:3.3.2'
2326

2427
testImplementation "org.apache.httpcomponents:httpclient"
28+
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
29+
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
2530
compileOnly 'org.projectlombok:lombok'
2631
annotationProcessor 'org.projectlombok:lombok'
2732
}

Diff for: eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.eventmesh.connector.http.server;
1919

2020
import org.apache.eventmesh.connector.http.config.HttpServerConfig;
21+
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
2122
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
2223
import org.apache.eventmesh.openconnect.Application;
2324
import org.apache.eventmesh.openconnect.util.ConfigUtil;
@@ -33,7 +34,8 @@ public static void main(String[] args) throws Exception {
3334
}
3435

3536
if (serverConfig.isSinkEnable()) {
36-
// TODO support sink connector
37+
Application httpSinkApp = new Application();
38+
httpSinkApp.run(HttpSinkConnector.class);
3739
}
3840
}
3941

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink;
19+
20+
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
21+
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
22+
import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
23+
import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
24+
import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler;
25+
import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
26+
import org.apache.eventmesh.openconnect.api.config.Config;
27+
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
28+
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
29+
import org.apache.eventmesh.openconnect.api.sink.Sink;
30+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
31+
32+
import java.util.List;
33+
import java.util.Objects;
34+
35+
import lombok.Getter;
36+
import lombok.SneakyThrows;
37+
import lombok.extern.slf4j.Slf4j;
38+
39+
@Slf4j
40+
public class HttpSinkConnector implements Sink {
41+
42+
private HttpSinkConfig httpSinkConfig;
43+
44+
@Getter
45+
private HttpSinkHandler sinkHandler;
46+
47+
@Override
48+
public Class<? extends Config> configClass() {
49+
return HttpSinkConfig.class;
50+
}
51+
52+
@Override
53+
public void init(Config config) throws Exception {
54+
this.httpSinkConfig = (HttpSinkConfig) config;
55+
doInit();
56+
}
57+
58+
@Override
59+
public void init(ConnectorContext connectorContext) throws Exception {
60+
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
61+
this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig();
62+
doInit();
63+
}
64+
65+
@SneakyThrows
66+
private void doInit() {
67+
// Fill default values if absent
68+
SinkConnectorConfig.populateFieldsWithDefaults(this.httpSinkConfig.connectorConfig);
69+
// Create different handlers for different configurations
70+
HttpSinkHandler nonRetryHandler;
71+
if (this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) {
72+
nonRetryHandler = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
73+
} else {
74+
nonRetryHandler = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
75+
}
76+
77+
int maxRetries = this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
78+
if (maxRetries == 0) {
79+
// Use the original sink handler
80+
this.sinkHandler = nonRetryHandler;
81+
} else if (maxRetries > 0) {
82+
// Wrap the sink handler with a retry handler
83+
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
84+
} else {
85+
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
86+
}
87+
}
88+
89+
@Override
90+
public void start() throws Exception {
91+
this.sinkHandler.start();
92+
}
93+
94+
@Override
95+
public void commit(ConnectRecord record) {
96+
97+
}
98+
99+
@Override
100+
public String name() {
101+
return this.httpSinkConfig.connectorConfig.getConnectorName();
102+
}
103+
104+
@Override
105+
public void stop() throws Exception {
106+
this.sinkHandler.stop();
107+
}
108+
109+
@Override
110+
public void put(List<ConnectRecord> sinkRecords) {
111+
for (ConnectRecord sinkRecord : sinkRecords) {
112+
try {
113+
if (Objects.isNull(sinkRecord)) {
114+
log.warn("ConnectRecord data is null, ignore.");
115+
continue;
116+
}
117+
// Handle the ConnectRecord
118+
this.sinkHandler.handle(sinkRecord);
119+
} catch (Exception e) {
120+
log.error("Failed to sink message via HTTP. ", e);
121+
}
122+
}
123+
}
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class HttpRetryConfig {
24+
// maximum number of retries, default 3, minimum 0
25+
private int maxRetries = 3;
26+
27+
// retry interval, default 2000ms
28+
private int interval = 2000;
29+
30+
// Default value is false, indicating that only requests with network-level errors will be retried.
31+
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
32+
private boolean retryOnNonSuccess = false;
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class HttpSinkConfig extends SinkConfig {
28+
29+
public SinkConnectorConfig connectorConfig;
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class HttpWebhookConfig {
24+
25+
private boolean activate = false;
26+
27+
// Path to display/export callback data
28+
private String exportPath = "/export";
29+
30+
private int port;
31+
32+
// timeunit: ms
33+
private int serverIdleTimeout = 5000;
34+
35+
// max size of the storage queue
36+
private int maxStorageSize = 5000;
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.config;
19+
20+
import io.vertx.core.http.HttpClientOptions;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class SinkConnectorConfig {
26+
27+
private String connectorName;
28+
29+
private String[] urls;
30+
31+
// keepAlive, default true
32+
private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE;
33+
34+
// timeunit: ms, default 60000ms
35+
private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent
36+
37+
// timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
38+
private int connectionTimeout = 5000;
39+
40+
// timeunit: ms, default 5000ms
41+
private int idleTimeout;
42+
43+
// maximum number of HTTP/1 connections a client will pool, default 5
44+
private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE;
45+
46+
// retry config
47+
private HttpRetryConfig retryConfig = new HttpRetryConfig();
48+
49+
// webhook config
50+
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();
51+
52+
53+
/**
54+
* Fill default values if absent (When there are multiple default values for a field)
55+
*
56+
* @param config SinkConnectorConfig
57+
*/
58+
public static void populateFieldsWithDefaults(SinkConnectorConfig config) {
59+
/*
60+
* set default values for idleTimeout
61+
* recommended scope: common(5s - 10s), webhook(15s - 30s)
62+
*/
63+
final int commonHttpIdleTimeout = 5000;
64+
final int webhookHttpIdleTimeout = 15000;
65+
66+
// Set default values for idleTimeout
67+
if (config.getIdleTimeout() == 0) {
68+
int idleTimeout = config.webhookConfig.isActivate() ? webhookHttpIdleTimeout : commonHttpIdleTimeout;
69+
config.setIdleTimeout(idleTimeout);
70+
}
71+
72+
}
73+
}

0 commit comments

Comments
 (0)