Skip to content

Commit 38b7414

Browse files
vivetiwaswapdesaivivetiwa
authored
Added support for http proxy (#28)
* Added support to use proxy configs for HTTP connection * Added tets case for porxy support * Removed unnecessary functions * Refactored test case to use mock extension port as proxy config * Corrected log input * 1. PORT parsing set to fail fast in case not integer value passed to connector configuration 2. Remove unused variable 3. AEPSinkConnectorTest to verify call to baseUrl 4. Update readMe for proxy configuration * Review Comments : 1. Example to export http proxy configuration 2. Add reference to oracle documentation on proxy configuration Co-authored-by: Swapnil Desai <[email protected]> Co-authored-by: vivetiwa <[email protected]>
1 parent 2199959 commit 38b7414

File tree

7 files changed

+189
-2
lines changed

7 files changed

+189
-2
lines changed

DEVELOPER_GUIDE.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ AEP Sink connector configurations can be supplied in the call register the conne
3434
| key.converter.schemas.enable | enables conversion of schemas | false | no | |
3535
| value.converter.schemas.enable | enables conversion of schemas | false | no | |
3636
| aep.endpoint | aep streaming endpoint url | | yes | |
37+
| aep.connection.proxy.host | address of the proxy host to connect through | | no | |
38+
| aep.connection.proxy.port | port of the proxy host to connect through | 443 | no | |
39+
| aep.connection.proxy.user | username for the proxy host | | no | |
40+
| aep.connection.proxy.password | password for the proxy host | | no | |
3741
| aep.connection.auth.enabled | required for authenticated streaming endpoint | false | no | |
3842
| aep.connection.auth.token.type | always set to access_token | access_token | no | |
3943
| aep.connection.auth.client.id | IMS client id | | no | |
@@ -281,6 +285,27 @@ curl -s -X POST \
281285
}' http://localhost:8083/connectors
282286
```
283287

288+
#### Poxy host configuration
289+
There are 2 ways to route request to aep endpoint through proxy server :
290+
1. **Using Environment Variable** : Export poxyHost and proxyPort on each kafka node, then restart kafka connect node.
291+
292+
For HTTPS use following :
293+
```
294+
export KAFKA_OPTS="-Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=8085 -Dhttps.proxyUser=proxyUsername -Dhttps.proxyPassword=proxyPassword"
295+
```
296+
For HTTP use following:
297+
```
298+
export KAFKA_OPTS="-Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=8085 -Dhttp.proxyUser=proxyUsername -Dhttp.proxyPassword=proxyPassword"
299+
```
300+
2. **Using Connector Properties** : While creating connector set following properties, default values mentioned in [connect configurations](#configuration-options).
301+
```
302+
aep.connection.proxy.host
303+
aep.connection.proxy.port
304+
aep.connection.proxy.user
305+
aep.connection.proxy.password
306+
```
307+
For reference, more details are in oracle documentation on configuring proxy settings in java : https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html
308+
284309
#### Use the Kafka Topics UI to view your topics
285310
286311
The docker setup comes with Topics UI to view the topic and messages within.

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpConnection.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,21 @@
1414

1515
import com.adobe.platform.streaming.auth.AuthException;
1616
import com.adobe.platform.streaming.auth.AuthProvider;
17+
18+
import org.apache.commons.lang3.StringUtils;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
1921

2022
import java.io.ByteArrayOutputStream;
2123
import java.io.IOException;
2224
import java.io.InputStream;
2325
import java.io.OutputStream;
26+
import java.net.Authenticator;
2427
import java.net.HttpURLConnection;
28+
import java.net.InetSocketAddress;
2529
import java.net.MalformedURLException;
30+
import java.net.PasswordAuthentication;
31+
import java.net.Proxy;
2632
import java.net.URL;
2733
import java.util.HashMap;
2834
import java.util.Map;
@@ -42,6 +48,12 @@ public class HttpConnection {
4248

4349
private String endpoint;
4450
private String url;
51+
52+
private String proxyHost;
53+
private int proxyPort;
54+
private String proxyUser;
55+
private String proxyPassword;
56+
4557
private Map<String, String> headers;
4658
private byte[] postData;
4759
private int maxRetries;
@@ -69,9 +81,29 @@ HttpURLConnection connect() throws HttpException {
6981

7082
while (retries++ < maxRetries) {
7183
try {
72-
URL request = new URL(new URL(endpoint), url);
84+
final URL request = new URL(new URL(endpoint), url);
7385
LOG.debug("opening connection for: {}", request);
74-
conn = (HttpURLConnection) request.openConnection();
86+
87+
if (isBasicProxyConfigured()) {
88+
if (isProxyWithAuthenticationConfigured()) {
89+
LOG.debug("proxyUser: {}, proxyPassword: {}", proxyUser, proxyPassword);
90+
Authenticator.setDefault(
91+
new Authenticator() {
92+
@Override
93+
protected PasswordAuthentication getPasswordAuthentication() {
94+
return new PasswordAuthentication(proxyUser, proxyPassword.toCharArray());
95+
}
96+
}
97+
);
98+
}
99+
100+
LOG.debug("proxyHost: {}, proxyPort: {}", proxyHost, proxyPort);
101+
final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
102+
conn = (HttpURLConnection) request.openConnection(proxy);
103+
} else {
104+
conn = (HttpURLConnection) request.openConnection();
105+
}
106+
75107
conn.setDoInput(true);
76108
conn.setUseCaches(false);
77109
conn.setConnectTimeout(connectTimeout);
@@ -176,6 +208,15 @@ void close() {
176208
}
177209
}
178210

211+
public boolean isBasicProxyConfigured() {
212+
return StringUtils.isNotEmpty(proxyHost);
213+
}
214+
215+
public boolean isProxyWithAuthenticationConfigured() {
216+
return StringUtils.isNotEmpty(proxyUser) &&
217+
StringUtils.isNotEmpty(proxyPassword);
218+
}
219+
179220
public InputStream getInputStream() throws HttpException {
180221
try {
181222
if (isGzip()) {
@@ -205,6 +246,26 @@ HttpConnectionBuilder withUrl(String url) {
205246
return this;
206247
}
207248

249+
HttpConnectionBuilder withProxyHost(String proxyHost) {
250+
instance.proxyHost = proxyHost;
251+
return this;
252+
}
253+
254+
HttpConnectionBuilder withProxyPort(int proxyPort) {
255+
instance.proxyPort = proxyPort;
256+
return this;
257+
}
258+
259+
HttpConnectionBuilder withProxyUser(String proxyUser) {
260+
instance.proxyUser = proxyUser;
261+
return this;
262+
}
263+
264+
HttpConnectionBuilder withProxyPassword(String proxyPassword) {
265+
instance.proxyPassword = proxyPassword;
266+
return this;
267+
}
268+
208269
HttpConnectionBuilder withHeaders(Map<String, String> headers) {
209270
instance.headers.putAll(headers);
210271
return this;

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpProducer.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ public class HttpProducer implements Serializable {
3030
private static final String CONTENT_TYPE = "Content-type";
3131

3232
private final String endpoint;
33+
34+
private String proxyHost;
35+
private int proxyPort;
36+
private String proxyUser;
37+
private String proxyPassword;
38+
3339
private final boolean enableGzip;
3440
private int connectTimeout;
3541
private transient AuthProvider auth;
@@ -75,6 +81,10 @@ private <T> T post(String url, byte[] postData, Map<String, String> headers, Con
7581
private HttpConnection.HttpConnectionBuilder newConnectionBuilder() {
7682
return new HttpConnection.HttpConnectionBuilder()
7783
.withEndpoint(endpoint)
84+
.withProxyHost(proxyHost)
85+
.withProxyPort(proxyPort)
86+
.withProxyUser(proxyUser)
87+
.withProxyPassword(proxyPassword)
7888
.withConnectTimeout(connectTimeout)
7989
.withAuth(auth)
8090
.withReadTimeout(readTimeout)
@@ -96,6 +106,26 @@ public static class HttpProducerBuilder {
96106
this.instance = instance;
97107
}
98108

109+
public HttpProducerBuilder withProxyHost(String proxyHost) {
110+
instance.proxyHost = proxyHost;
111+
return this;
112+
}
113+
114+
public HttpProducerBuilder withProxyPort(int proxyPort) {
115+
instance.proxyPort = proxyPort;
116+
return this;
117+
}
118+
119+
public HttpProducerBuilder withProxyUser(String proxyUser) {
120+
instance.proxyUser = proxyUser;
121+
return this;
122+
}
123+
124+
public HttpProducerBuilder withProxyPassword(String proxyPassword) {
125+
instance.proxyPassword = proxyPassword;
126+
return this;
127+
}
128+
99129
public HttpProducerBuilder withReadTimeout(int readTimeout) {
100130
instance.readTimeout = readTimeout;
101131
return this;

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/AbstractAEPPublisher.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ public abstract class AbstractAEPPublisher implements DataPublisher {
3838
private static final Logger LOG = LoggerFactory.getLogger(AbstractAEPPublisher.class);
3939

4040
private static final String AEP_ENDPOINT = "aep.endpoint";
41+
42+
private static final String AEP_CONNECTION_PROXY_HOST = "aep.connection.proxy.host";
43+
private static final String AEP_CONNECTION_PROXY_PORT = "aep.connection.proxy.port";
44+
private static final String AEP_CONNECTION_PROXY_USER = "aep.connection.proxy.user";
45+
private static final String AEP_CONNECTION_PROXY_PASSWORD = "aep.connection.proxy.password";
46+
4147
private static final String AEP_CONNECTION_TIMEOUT = "aep.connection.timeout";
4248
private static final String AEP_CONNECTION_MAX_RETRIES = "aep.connection.maxRetries";
4349
private static final String AEP_CONNECTION_MAX_RETRIES_BACKOFF = "aep.connection.retryBackoff";
@@ -62,6 +68,10 @@ public abstract class AbstractAEPPublisher implements DataPublisher {
6268

6369
protected HttpProducer getHttpProducer(Map<String, String> props) throws AEPStreamingException {
6470
return HttpProducer.newBuilder(getAepEndpoint(props.get(AEP_ENDPOINT)))
71+
.withProxyHost(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_HOST, null))
72+
.withProxyPort(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PORT, 443))
73+
.withProxyUser(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_USER, null))
74+
.withProxyPassword(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PASSWORD, null))
6575
.withConnectTimeout(SinkUtils.getProperty(props, AEP_CONNECTION_TIMEOUT, 5000))
6676
.withReadTimeout(SinkUtils.getProperty(props, AEP_CONNECTION_READ_TIMEOUT, 60000))
6777
.withMaxRetries(SinkUtils.getProperty(props, AEP_CONNECTION_MAX_RETRIES, 3))

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ public class AEPSinkConnectorTest extends AbstractConnectorTest {
4242
private static final Logger LOG = LoggerFactory.getLogger(AEPSinkConnectorTest.class);
4343

4444
private static final String AEP_CONNECTOR_CONFIG = "aep-connector.json";
45+
private static final String AEP_CONNECTOR_CONFIG_WITH_PROXY = "aep-connector-with-proxy.json";
4546
private static final String XDM_PAYLOAD_FILE = "xdm-data.json";
4647

4748
@BeforeEach
4849
@Override
4950
public void setup() throws JsonProcessingException {
5051
super.setup();
5152
inletSuccessfulResponse();
53+
inletSuccessfulResponseViaProxy();
5254
}
5355

5456
@Test
@@ -68,6 +70,25 @@ public void aepSinkConnectorTest() throws HttpException, JsonProcessingException
6870
.withRequestBody(equalToJson(payloadReceivedXdmData())));
6971
}
7072

73+
@Test
74+
public void aepSinkConnectorTestViaProxy() throws HttpException, JsonProcessingException, InterruptedException {
75+
getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION);
76+
77+
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
78+
getConnect().configureConnector(CONNECTOR_NAME, connectorConfigWithProxy());
79+
80+
// Send single XDM data to aep sink connector
81+
getConnect().kafka().produce(TOPIC_NAME, xdmData());
82+
83+
waitForConnectorStart(CONNECTOR_NAME, 1, 1000);
84+
85+
// Verify inlet endpoint received 1 XDM record
86+
getWiremockServerViaProxy().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
87+
.withRequestBody(equalToJson(payloadReceivedXdmData())));
88+
// Check if request from proxy server forward to AEP endpoint
89+
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl())));
90+
}
91+
7192
@AfterEach
7293
@Override
7394
public void stop() {
@@ -101,4 +122,17 @@ public Map<String, String> connectorConfig() throws HttpException, JsonProcessin
101122
return connectorConfig;
102123
}
103124

125+
public Map<String, String> connectorConfigWithProxy() throws HttpException, JsonProcessingException {
126+
String connectorProperties = String.format(HttpUtil.streamToString(this.getClass().getClassLoader()
127+
.getResourceAsStream(AEP_CONNECTOR_CONFIG_WITH_PROXY)),
128+
NUMBER_OF_TASKS,
129+
getInletUrl(),
130+
PORT_VIA_PROXY);
131+
132+
Map<String, String> connectorConfig = MAPPER.readValue(connectorProperties,
133+
new TypeReference<Map<String, String>>() {});
134+
connectorConfig.put("name", CONNECTOR_NAME);
135+
return connectorConfig;
136+
}
137+
104138
}

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public abstract class AbstractConnectorTest {
5050
protected static final String CONNECTOR_NAME = "aep-sink-connector";
5151
protected static final String TOPIC_NAME = "connect-test";
5252
protected static final int PORT = 8089;
53+
protected static final int PORT_VIA_PROXY = 8090;
5354
private EmbeddedConnectCluster connect;
5455
private int numberOfWorkers = 1;
5556
private String inletId = "876e1041c16801b8b3038ec86bb4510e8c89356152191b587367b592e79d91d5";
@@ -59,6 +60,9 @@ public abstract class AbstractConnectorTest {
5960
@RegisterExtension
6061
public static final WiremockExtension wiremockExtension = new WiremockExtension(PORT);
6162

63+
@RegisterExtension
64+
public static final WiremockExtension wiremockExtensionViaProxy = new WiremockExtension(PORT_VIA_PROXY);
65+
6266
@BeforeEach
6367
public void setup() throws JsonProcessingException {
6468
connect = new EmbeddedConnectCluster.Builder()
@@ -109,6 +113,13 @@ public void inletSuccessfulResponse() throws JsonProcessingException {
109113
.withJsonBody(MAPPER.readTree("{\"payloadReceived\": true}"))));
110114
}
111115

116+
public void inletSuccessfulResponseViaProxy() {
117+
wiremockExtensionViaProxy.getWireMockServer()
118+
.stubFor(WireMock
119+
.post(WireMock.urlEqualTo(getRelativeUrl()))
120+
.willReturn(ResponseDefinitionBuilder.responseDefinition().proxiedFrom(baseUrl)));
121+
}
122+
112123
public void inletFailedResponse() {
113124
wiremockExtension.getWireMockServer()
114125
.stubFor(WireMock
@@ -120,6 +131,10 @@ protected WireMockServer getWiremockServer() {
120131
return wiremockExtension.getWireMockServer();
121132
}
122133

134+
protected WireMockServer getWiremockServerViaProxy() {
135+
return wiremockExtensionViaProxy.getWireMockServer();
136+
}
137+
123138
protected String getRelativeUrl() {
124139
return relativePath.replace("/collection/", "/collection/batch/");
125140
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"topics": "connect-test",
3+
"tasks.max": "%s",
4+
"aep.flush.interval.seconds": "1",
5+
"aep.flush.bytes.kb": "4",
6+
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
7+
"key.converter.schemas.enable": "false",
8+
"value.converter.schemas.enable": "false",
9+
"aep.endpoint": "%s",
10+
"aep.connection.proxy.host": "localhost",
11+
"aep.connection.proxy.port": "%s"
12+
}

0 commit comments

Comments
 (0)