Skip to content

Commit 2199959

Browse files
vivetiwavivetiwa
andauthored
Integration Test and Deadletter queue support (#23)
* 1.Deadletter topic support to send data to deadtopic in case AEP connector fails to send data to AEP after trying n retries. 2. Integration test for verifying common scenario to check if main use case is not effected due to changes * Fix java 8 issue to support backward compatible Co-authored-by: vivetiwa <[email protected]>
1 parent cb79cdb commit 2199959

25 files changed

+1043
-18
lines changed

DEVELOPER_GUIDE.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,21 @@ To install the connector, drop the jar file into the plug in directory of Kafka
2727
AEP Sink connector configurations can be supplied in the call register the connector.
2828

2929

30-
| Config Name | Description | Default | Required | Example |
31-
|-----------------------------------|-----------------------------------------------|---------------------------------------------------------|----------|---------|
32-
| topics | comma separated list of topics | | yes | |
33-
| connector.class | classname of impl | com.adobe.platform.streaming.sink.impl.AEPSinkConnector | yes | |
34-
| key.converter.schemas.enable | enables conversion of schemas | false | no | |
35-
| value.converter.schemas.enable | enables conversion of schemas | false | no | |
36-
| aep.endpoint | aep streaming endpoint url | | yes | |
37-
| aep.connection.auth.enabled | required for authenticated streaming endpoint | false | no | |
38-
| aep.connection.auth.token.type | always set to access_token | access_token | no | |
39-
| aep.connection.auth.client.id | IMS client id | | no | |
40-
| aep.connection.auth.client.code | IMS client code | | no | |
41-
| aep.connection.auth.client.secret | IME client secret | | | |
42-
| aep.flush.bytes.kb | bytes threshold to determine the batch | 4 | no | |
30+
| Config Name | Description | Default | Required | Example |
31+
|-----------------------------------|-------------------------------------------------|---------------------------------------------------------|----------|-------------------------|
32+
| topics | comma separated list of topics | | yes | |
33+
| connector.class | classname of impl | com.adobe.platform.streaming.sink.impl.AEPSinkConnector | yes | |
34+
| key.converter.schemas.enable | enables conversion of schemas | false | no | |
35+
| value.converter.schemas.enable | enables conversion of schemas | false | no | |
36+
| aep.endpoint | aep streaming endpoint url | | yes | |
37+
| aep.connection.auth.enabled | required for authenticated streaming endpoint | false | no | |
38+
| aep.connection.auth.token.type | always set to access_token | access_token | no | |
39+
| aep.connection.auth.client.id | IMS client id | | no | |
40+
| aep.connection.auth.client.code | IMS client code | | no | |
41+
| aep.connection.auth.client.secret | IME client secret | | no | |
42+
| aep.flush.bytes.kb | bytes threshold to determine the batch | 4 | no | |
43+
| aep.error.logger | put failed message to dead letter topic or log | none | no | kafka, log, both, none |
44+
| aep.error.topic | deadletter topic name | none | no | |
4345

4446
## Step-by-Step Workflow
4547

dependencies.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ versions.orgJson = '20180130'
2929
versions.kafkaConnect = '2.8.0'
3030
versions.slf4j = '1.7.25'
3131
versions.jaxb = '2.3.0'
32+
versions.wiremock = '2.27.2'
3233

3334
libraries.collections4 = [
3435
"org.apache.commons:commons-collections4:$versions.collections4"
@@ -70,6 +71,10 @@ libraries.junitJupiter = [
7071
"org.junit.jupiter:junit-jupiter-params:$versions.junitJupiter"
7172
]
7273

74+
libraries.wiremock = [
75+
"com.github.tomakehurst:wiremock:$versions.wiremock"
76+
]
77+
7378
libraries.jjwt = [
7479
"io.jsonwebtoken:jjwt:$versions.jjwt"
7580
]
@@ -78,6 +83,17 @@ libraries.kafkaConnect = [
7883
"org.apache.kafka:connect-api:$versions.kafkaConnect"
7984
]
8085

86+
libraries.kafkaConnectRuntime = [
87+
"org.apache.kafka:connect-runtime:$versions.kafkaConnect",
88+
"org.apache.kafka:kafka_2.12:$versions.kafkaConnect",
89+
]
90+
91+
libraries.kafkaConnectTest = [
92+
"org.apache.kafka:connect-runtime:$versions.kafkaConnect:test",
93+
"org.apache.kafka:kafka-clients:$versions.kafkaConnect:test",
94+
"org.apache.kafka:kafka_2.12:$versions.kafkaConnect:test"
95+
]
96+
8197
libraries.orgJson = [
8298
"org.json:json:$versions.orgJson"
8399
]
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2021 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming;
14+
15+
import java.util.Optional;
16+
17+
/**
18+
* @author Adobe Inc.
19+
*/
20+
public class AEPStreamingRuntimeException extends RuntimeException {
21+
22+
private final Object errorContext;
23+
24+
public AEPStreamingRuntimeException(String message) {
25+
super(message);
26+
this.errorContext = null;
27+
}
28+
29+
public AEPStreamingRuntimeException(String message, Throwable t) {
30+
super(message, t);
31+
this.errorContext = null;
32+
}
33+
34+
public AEPStreamingRuntimeException(String message, Object errorContext) {
35+
super(message);
36+
this.errorContext = errorContext;
37+
}
38+
39+
public AEPStreamingRuntimeException(String message, Throwable t, Object errorContext) {
40+
super(message, t);
41+
this.errorContext = errorContext;
42+
}
43+
44+
public Optional<?> getErrorContext() {
45+
return Optional.ofNullable(errorContext);
46+
}
47+
}

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ void close() {
169169
is.close();
170170
}
171171
} catch (IOException ioe) {
172-
LOG.info("close(): failed to close input stream: {}", ioe.getMessage());
172+
LOG.error("close(): failed to close input stream: {}", ioe.getMessage());
173173
}
174174

175175
conn = null;

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
package com.adobe.platform.streaming.http;
1414

15+
import com.google.common.annotations.VisibleForTesting;
16+
1517
import java.io.BufferedReader;
1618
import java.io.IOException;
1719
import java.io.InputStream;
@@ -39,7 +41,7 @@ public static boolean isUnauthorized(int responseCode) {
3941
return responseCode == 401 || responseCode == 403;
4042
}
4143

42-
static String streamToString(InputStream in) throws HttpException {
44+
public static String streamToString(InputStream in) throws HttpException {
4345
StringBuilder sb = new StringBuilder(128);
4446
try (BufferedReader r = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
4547
String str;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2021 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.http.serializer;
14+
15+
import com.adobe.platform.streaming.AEPStreamingRuntimeException;
16+
17+
import com.fasterxml.jackson.core.JsonProcessingException;
18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.fasterxml.jackson.databind.node.ObjectNode;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* @author Adobe Inc.
29+
*/
30+
public class SerializerDeserializerUtil {
31+
32+
private static final Logger errorLogger = LoggerFactory.getLogger(SerializerDeserializerUtil.class);
33+
private static final ObjectMapper MAPPER = new ObjectMapper();
34+
35+
public static String serialize(Object value) {
36+
try {
37+
return MAPPER.writeValueAsString(value);
38+
} catch (JsonProcessingException e) {
39+
errorLogger.error("Failed to serialize value : {}", value);
40+
throw new AEPStreamingRuntimeException("Failed to serialize value", value);
41+
}
42+
}
43+
44+
public static JsonNode convertToJsonNode(Object value) {
45+
return MAPPER.convertValue(value, JsonNode.class);
46+
}
47+
48+
public static JsonNode convertStringToJsonNode(Object value) {
49+
try {
50+
return MAPPER.readTree(value.toString());
51+
} catch (IOException e) {
52+
errorLogger.error("Failed to serialize value : {}", value);
53+
throw new AEPStreamingRuntimeException("Failed to serialize value", value);
54+
}
55+
}
56+
57+
public static ObjectNode createObjectNode() {
58+
return MAPPER.createObjectNode();
59+
}
60+
}

streaming-connect-sink/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ dependencies {
2323
libraries.jackson, libraries.httpClient,
2424
libraries.kafkaConnect, libraries.orgJson,
2525
libraries.slf4j
26+
compileOnly libraries.kafkaConnectRuntime
2627
jmxAgent libraries.jmxPrometheusJavaAgent
27-
testImplementation libraries.jmockit, libraries.junitJupiter
28+
testImplementation libraries.jmockit, libraries.junitJupiter,
29+
libraries.wiremock, libraries.kafkaConnectTest,
30+
libraries.kafkaConnectRuntime
2831
}
2932

3033
task copyJmxAgent(type: Copy) {

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/AbstractAEPPublisher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public abstract class AbstractAEPPublisher implements DataPublisher {
5757

5858
private static final String AEP_CONNECTION_AUTH_ENABLED_VALUE = "true";
5959
private static final String AEP_CONNECTION_AUTH_DISABLED_VALUE = "false";
60+
public static final String AEP_ERROR_LOGGER = "aep.error.logger";
61+
public static final String AEP_ERROR_TOPIC = "aep.error.topic";
6062

6163
protected HttpProducer getHttpProducer(Map<String, String> props) throws AEPStreamingException {
6264
return HttpProducer.newBuilder(getAepEndpoint(props.get(AEP_ENDPOINT)))

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/AbstractSinkTask.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,27 @@
1414

1515
import com.adobe.platform.streaming.AEPStreamingException;
1616
import com.adobe.platform.streaming.sink.utils.SinkUtils;
17+
1718
import com.google.gson.Gson;
1819
import com.google.gson.GsonBuilder;
20+
1921
import org.apache.commons.collections4.CollectionUtils;
2022
import org.apache.kafka.common.utils.AppInfoParser;
23+
import org.apache.kafka.common.utils.Utils;
2124
import org.apache.kafka.connect.errors.ConnectException;
25+
import org.apache.kafka.connect.runtime.WorkerConfig;
2226
import org.apache.kafka.connect.sink.SinkRecord;
2327
import org.apache.kafka.connect.sink.SinkTask;
28+
import org.apache.kafka.connect.sink.SinkTaskContext;
29+
import org.codehaus.plexus.util.ReflectionUtils;
2430
import org.slf4j.Logger;
2531
import org.slf4j.LoggerFactory;
2632

2733
import java.util.ArrayList;
2834
import java.util.Collection;
2935
import java.util.List;
3036
import java.util.Map;
37+
import java.util.Objects;
3138

3239
/**
3340
* @author Adobe Inc.
@@ -48,15 +55,25 @@ public abstract class AbstractSinkTask<T> extends SinkTask {
4855
private int flushIntervalMillis;
4956
private int flushBytesCount;
5057
private long lastFlushMilliSec = System.currentTimeMillis();
58+
private String bootstrapServers;
5159

5260
@Override
5361
public String version() {
5462
return AppInfoParser.getVersion();
5563
}
5664

65+
@Override
66+
public void initialize(SinkTaskContext context) {
67+
super.initialize(context);
68+
bootstrapServers = getBootstrapServers(context);
69+
}
70+
5771
@Override
5872
public void start(Map<String, String> props) {
5973
LOG.info("Started Sink Task with props: {}", props);
74+
if (Objects.nonNull(bootstrapServers)) {
75+
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
76+
}
6077

6178
try {
6279
flushIntervalMillis = SinkUtils.getProperty(props, FLUSH_INTERVAL_SECS, DEFAULT_FLUSH_INTERVAL, MILLIS_IN_A_SEC);
@@ -133,6 +150,19 @@ private void publishAndLogIfRequired(List<T> eventsToPublish) {
133150
}
134151
}
135152

153+
private String getBootstrapServers(SinkTaskContext context) {
154+
try {
155+
Object workerSinkTask = ReflectionUtils.getValueIncludingSuperclasses("sinkTask", context);
156+
WorkerConfig workerConfig = (WorkerConfig) ReflectionUtils
157+
.getValueIncludingSuperclasses("workerConfig", workerSinkTask);
158+
159+
return Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",");
160+
} catch (IllegalAccessException exception) {
161+
LOG.error("Failed to get bootstrap server.", exception);
162+
}
163+
return null;
164+
}
165+
136166
private void reset(List<T> eventsToPublish, long tempCurrentTime) {
137167
lastFlushMilliSec = tempCurrentTime;
138168
bytesRead = 0;

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.adobe.platform.streaming.http.HttpProducer;
1919
import com.adobe.platform.streaming.http.HttpUtil;
2020
import com.adobe.platform.streaming.sink.AbstractAEPPublisher;
21+
import com.adobe.platform.streaming.sink.impl.reporter.CompositeErrorReporter;
22+
import com.adobe.platform.streaming.sink.impl.reporter.ErrorReporter;
2123
import org.apache.commons.collections4.CollectionUtils;
2224
import org.apache.commons.lang3.StringUtils;
2325
import org.apache.http.entity.ContentType;
@@ -28,21 +30,24 @@
2830

2931
import java.util.List;
3032
import java.util.Map;
33+
import java.util.concurrent.Future;
3134

3235
/**
3336
* @author Adobe Inc.
3437
*/
35-
class AEPPublisher extends AbstractAEPPublisher {
38+
public class AEPPublisher extends AbstractAEPPublisher {
3639

3740
private static final Logger LOG = LoggerFactory.getLogger(AEPPublisher.class);
3841
private static final String MESSAGES_KEY = "messages";
3942

4043
private int count;
4144
private final HttpProducer producer;
45+
private final ErrorReporter<List<Future<?>>> errorReporter;
4246

4347
AEPPublisher(Map<String, String> props) throws AEPStreamingException {
4448
count = 0;
4549
producer = getHttpProducer(props);
50+
errorReporter = new CompositeErrorReporter(props);
4651
}
4752

4853
@Override
@@ -69,6 +74,7 @@ public void publishData(List<String> messages) throws AEPStreamingException {
6974
LOG.debug("Successfully published data to Adobe Experience Platform: {}", response);
7075
} catch (HttpException httpException) {
7176
LOG.error("Failed to publish data to Adobe Experience Platform", httpException);
77+
errorReporter.report(messages, httpException);
7278
if (HttpUtil.is500(httpException.getResponseCode()) || HttpUtil.isUnauthorized(httpException.getResponseCode())) {
7379
throw new AEPStreamingException("Failed to publish", httpException);
7480
}

0 commit comments

Comments
 (0)