Skip to content

Commit 4ecdb6e

Browse files
committed
PLAT-188185:Send all the failed message to the error reporter topic and throw exception
1 parent c776465 commit 4ecdb6e

File tree

6 files changed

+187
-24
lines changed

6 files changed

+187
-24
lines changed

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class AEPPublisher extends AbstractAEPPublisher {
4646
private static final String MESSAGES_KEY = "messages";
4747
private static final String RESPONSES_KEY = "responses";
4848
private static final String STATUS_KEY = "status";
49+
private static final String XACTIONID_KEY = "xactionId";
50+
private static final String DASH = "-";
4951

5052
private int count;
5153
private final HttpProducer producer;
@@ -68,6 +70,7 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
6870
int totalMessageCount;
6971

7072
final ArrayNode jsonMessages = JacksonFactory.OBJECT_MAPPER.createArrayNode();
73+
7174
try {
7275
messages.stream()
7376
.map(Pair::getKey)
@@ -100,7 +103,14 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
100103
for (JsonNode messageResponse : publishMessagesResponses) {
101104
if (messageResponse.hasNonNull(STATUS_KEY)) {
102105
failedMessageCount++;
103-
LOG.debug("Failed to publish message to Adobe Experience Platform: {}", messageResponse);
106+
final Pair<String, SinkRecord> failedMessage = messages.get(getFailedMessageIndex(messageResponse));
107+
LOG.debug("Failed to publish message: {} to Adobe Experience Platform due to the error: {}",
108+
failedMessage, messageResponse);
109+
if (Objects.nonNull(errorReporter)) {
110+
final int responseCode = messageResponse.get(STATUS_KEY).asInt();
111+
errorReporter.report(failedMessage.getRight(),
112+
new HttpException(String.format("error response= %s", messageResponse), responseCode));
113+
}
104114
} else {
105115
successMessageCount++;
106116
}
@@ -110,13 +120,11 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
110120
} else {
111121
LOG.error("Invalid Response received while publishing data to Adobe Experience Platform: {}", response);
112122
}
113-
114123
} catch (JsonProcessingException jsonException) {
115124
LOG.error("Failed to publish data to Adobe Experience Platform", jsonException);
116125
if (Objects.nonNull(errorReporter)) {
117126
messages.forEach(message -> errorReporter.report(message.getValue(), jsonException));
118127
}
119-
throw new AEPStreamingException("Failed to publish invalid JSON", jsonException);
120128
} catch (HttpException httpException) {
121129
LOG.error("Failed to publish data to Adobe Experience Platform", httpException);
122130
if (Objects.nonNull(errorReporter)) {
@@ -129,6 +137,15 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
129137
}
130138
}
131139

140+
private Integer getFailedMessageIndex(final JsonNode messageResponse) throws HttpException {
141+
if (messageResponse.hasNonNull(XACTIONID_KEY)) {
142+
final String xactionId = messageResponse.get(XACTIONID_KEY).asText();
143+
return Integer.parseInt(xactionId.substring(xactionId.lastIndexOf(DASH) + 1));
144+
}
145+
throw new HttpException(String.format("xactionId is missing in the failed message error response : %s",
146+
messageResponse));
147+
}
148+
132149
public void stop() {
133150
LOG.info("Stopping AEP Data Publisher after publishing {} messages", count);
134151
}

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java

+100-9
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class AEPSinkConnectorErrorReporterTest extends AbstractConnectorTest {
4747
private static final String AEP_KAFKA_ERROR_CONNECTOR_CONFIG = "aep-connector-error-reporter.json";
4848
private static final String AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG = "aep-connector-error-reporter-header.json";
4949
private static final String XDM_PAYLOAD_FILE = "xdm-data.json";
50+
private static final String XDM_MULTI_MESSAGE_PAYLOAD_FILE = "xdm-data-multiple-messages.json";
5051
private static final String DEAD_LETTER_TOPIC = "errors.deadletterqueue.topic.name";
5152
private static final String ERROR_CLASS_NAME = "__connect.errors.exception.class.name";
5253
private static final String ERROR_HEADER_MESSAGE = "__connect.errors.exception.message";
@@ -70,7 +71,7 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup
7071
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
7172
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);
7273

73-
String xdmData = xdmData();
74+
String xdmData = xdmData(XDM_PAYLOAD_FILE);
7475
getConnect().kafka().produce(TOPIC_NAME, xdmData);
7576
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);
7677

@@ -87,7 +88,89 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup
8788

8889
// Verify inlet endpoint received 1 XDM record
8990
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
90-
.withRequestBody(equalToJson(payloadReceivedXdmData())));
91+
.withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE))));
92+
}
93+
94+
@Test
95+
public void kafkaErrorReporterMultiMessageTest() throws HttpException, IOException, InterruptedException {
96+
inletMultiStatusSuccessfulResponse();
97+
getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION);
98+
99+
// Create error topic to dump failed data
100+
Map<String, String> connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_CONFIG);
101+
getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION);
102+
103+
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
104+
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);
105+
106+
String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE);
107+
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
108+
String failedMessage = xdmDataValues.get(0).toString();
109+
String successMessage = xdmDataValues.get(1).toString();
110+
111+
getConnect().kafka().produce(TOPIC_NAME, failedMessage);
112+
getConnect().kafka().produce(TOPIC_NAME, successMessage);
113+
114+
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);
115+
116+
// Check if error record sent to error topic
117+
ConsumerRecords<byte[], byte[]> consumerRecords = getConnect().kafka()
118+
.consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC));
119+
120+
Assertions.assertEquals(1, consumerRecords.count());
121+
122+
ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecords.iterator().next();
123+
JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value());
124+
125+
Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString());
126+
127+
// Verify inlet endpoint received 2 XDM record
128+
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
129+
.withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE))));
130+
}
131+
132+
@Test
133+
public void kafkaErrorReporterMultiMessageWithHeadersTest() throws HttpException, IOException, InterruptedException {
134+
inletMultiStatusSuccessfulResponse();
135+
136+
getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION);
137+
138+
// Create error topic to dump failed data
139+
Map<String, String> connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG);
140+
getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION);
141+
142+
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
143+
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);
144+
145+
String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE);
146+
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
147+
String failedMessage = xdmDataValues.get(0).toString();
148+
String successMessage = xdmDataValues.get(1).toString();
149+
getConnect().kafka().produce(TOPIC_NAME, failedMessage);
150+
getConnect().kafka().produce(TOPIC_NAME, successMessage);
151+
152+
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);
153+
154+
// Check if error record sent to error topic
155+
ConsumerRecords<byte[], byte[]> consumerRecords = getConnect().kafka()
156+
.consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC));
157+
158+
Assertions.assertEquals(1, consumerRecords.count());
159+
160+
ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecords.iterator().next();
161+
JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value());
162+
163+
Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString());
164+
165+
final Headers errorHeaders = consumerRecord.headers();
166+
errorHeaders.headers(ERROR_CLASS_NAME)
167+
.forEach(header -> Assertions.assertEquals(EXPECTED_EXCEPTION_CLASS, new String(header.value())));
168+
errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header ->
169+
Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_BAD_REQUEST_ERROR_CODE))));
170+
171+
// Verify inlet endpoint received 2 XDM record
172+
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
173+
.withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE))));
91174
}
92175

93176
@Test
@@ -101,7 +184,7 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
101184
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
102185
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);
103186

104-
String xdmData = xdmData();
187+
String xdmData = xdmData(XDM_PAYLOAD_FILE);
105188
getConnect().kafka().produce(TOPIC_NAME, xdmData);
106189
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);
107190

@@ -122,13 +205,21 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
122205
errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header ->
123206
Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_SERVER_SIDE_ERROR_CODE))));
124207

125-
// Verify inlet endpoint received 1 XDM record
208+
// Verify inlet endpoint received 2 XDM record
126209
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
127-
.withRequestBody(equalToJson(payloadReceivedXdmData())));
210+
.withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE))));
128211
}
129212

130-
public String payloadReceivedXdmData() throws HttpException, JsonProcessingException {
131-
String xdmData = xdmData();
213+
public String payloadReceivedMultiMessageXdmData(String payloadfile) throws HttpException, JsonProcessingException {
214+
String xdmData = xdmData(payloadfile);
215+
ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode();
216+
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
217+
messageNode.set("messages", xdmDataValues);
218+
return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode);
219+
}
220+
221+
public String payloadReceivedXdmData(String payloadfile) throws HttpException, JsonProcessingException {
222+
String xdmData = xdmData(payloadfile);
132223
ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode();
133224
ArrayNode xdmDataValues = JacksonFactory.OBJECT_MAPPER.createArrayNode();
134225
xdmDataValues.add(JacksonFactory.OBJECT_MAPPER.readTree(xdmData));
@@ -137,8 +228,8 @@ public String payloadReceivedXdmData() throws HttpException, JsonProcessingExcep
137228
return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode);
138229
}
139230

140-
public String xdmData() throws HttpException {
141-
return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(XDM_PAYLOAD_FILE));
231+
public String xdmData(String payloadfile) throws HttpException {
232+
return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(payloadfile));
142233
}
143234

144235
public Map<String, String> connectorConfig(String configFile) throws HttpException, JsonProcessingException {

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public abstract class AbstractConnectorTest {
4848
.constructMapLikeType(TreeMap.class, String.class, String.class);
4949
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5);
5050
protected static final int HTTP_SERVER_SIDE_ERROR_CODE = 500;
51+
protected static final int HTTP_BAD_REQUEST_ERROR_CODE = 400;
52+
5153
private static final String AUTH_TOKEN_RESPONSE = "{\"access_token\":\"accessToken\"," +
5254
"\"refresh_token\":\"refreshToken\",\"token_type\":\"bearer\",\"expires_in\":82399996}";
5355
private static final String AUTH_TOKEN_RESPONSE_OAUTH2 = "{\"access_token\":\"accessToken\"," +
@@ -56,6 +58,8 @@ public abstract class AbstractConnectorTest {
5658
private static final String AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE =
5759
"aep-connector-inlet-successful-response.json";
5860

61+
private static final String AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE =
62+
"aep-connector-inlet-multi-message-response.json";
5963
protected static final int TOPIC_PARTITION = 1;
6064
protected static final int NUMBER_OF_TASKS = 1;
6165
protected static final String CONNECTOR_NAME = "aep-sink-connector";
@@ -132,6 +136,15 @@ public void inletSuccessfulResponse() throws IOException {
132136
.getResourceAsStream(AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE)))));
133137
}
134138

139+
public void inletMultiStatusSuccessfulResponse() throws IOException {
140+
wiremockExtension.getWireMockServer()
141+
.stubFor(WireMock
142+
.post(WireMock.urlEqualTo(getRelativeUrl()))
143+
.willReturn(ResponseDefinitionBuilder.responseDefinition()
144+
.withJsonBody(JacksonFactory.OBJECT_MAPPER.readTree(this.getClass().getClassLoader()
145+
.getResourceAsStream(AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE)))));
146+
}
147+
135148
public void inletIMSAuthenticationSuccessfulResponse() throws JsonProcessingException {
136149
wiremockExtension.getWireMockServer()
137150
.stubFor(WireMock
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"inletId": "9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5",
3+
"batchId": "1565638336649:1750:244",
4+
"receivedTimeMs": 1565638336705,
5+
"responses": [
6+
{
7+
"xactionId":"9341f8eb-494a-4c89-9879-4d06a58d2dc7-0",
8+
"status":400,
9+
"message":"The 'header' field is mandatory. Provide a valid 'header' value and try again."
10+
},
11+
{
12+
"xactionId": "9341f8eb-494a-4c89-9879-4d06a58d2dc7-1"
13+
}
14+
]
15+
}

streaming-connect-sink/src/test/resources/aep-connector-inlet-successful-response.json

-12
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,6 @@
55
"responses": [
66
{
77
"xactionId": "1565650704337:2124:92:3"
8-
},
9-
{
10-
"status": 400,
11-
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
12-
},
13-
{
14-
"status": 400,
15-
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has an absent or wrong ims org in the header"
16-
},
17-
{
18-
"status": 400,
19-
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
208
}
219
]
2210
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[
2+
{
3+
"body": {
4+
"xdmMeta": {
5+
"schemaRef": {
6+
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
7+
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
8+
}
9+
},
10+
"xdmEntity": {
11+
"firstname": "abc",
12+
"lastname": "def"
13+
}
14+
}
15+
},
16+
{
17+
"header": {
18+
"schemaRef": {
19+
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
20+
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
21+
},
22+
"source": {
23+
"name": "aep-sink-connector"
24+
}
25+
},
26+
"body": {
27+
"xdmMeta": {
28+
"schemaRef": {
29+
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
30+
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
31+
}
32+
},
33+
"xdmEntity": {
34+
"firstname": "abc",
35+
"lastname": "def"
36+
}
37+
}
38+
}
39+
]

0 commit comments

Comments
 (0)