@@ -47,6 +47,7 @@ public class AEPSinkConnectorErrorReporterTest extends AbstractConnectorTest {
47
47
private static final String AEP_KAFKA_ERROR_CONNECTOR_CONFIG = "aep-connector-error-reporter.json" ;
48
48
private static final String AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG = "aep-connector-error-reporter-header.json" ;
49
49
private static final String XDM_PAYLOAD_FILE = "xdm-data.json" ;
50
+ private static final String XDM_MULTI_MESSAGE_PAYLOAD_FILE = "xdm-data-multiple-messages.json" ;
50
51
private static final String DEAD_LETTER_TOPIC = "errors.deadletterqueue.topic.name" ;
51
52
private static final String ERROR_CLASS_NAME = "__connect.errors.exception.class.name" ;
52
53
private static final String ERROR_HEADER_MESSAGE = "__connect.errors.exception.message" ;
@@ -70,7 +71,7 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup
70
71
LOG .info ("Starting connector cluster with connector : {}" , CONNECTOR_NAME );
71
72
getConnect ().configureConnector (CONNECTOR_NAME , connectorConfig );
72
73
73
- String xdmData = xdmData ();
74
+ String xdmData = xdmData (XDM_PAYLOAD_FILE );
74
75
getConnect ().kafka ().produce (TOPIC_NAME , xdmData );
75
76
waitForConnectorStart (CONNECTOR_NAME , 1 , 8000 );
76
77
@@ -87,7 +88,39 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup
87
88
88
89
// Verify inlet endpoint received 1 XDM record
89
90
getWiremockServer ().verify (postRequestedFor (urlEqualTo (getRelativeUrl ()))
90
- .withRequestBody (equalToJson (payloadReceivedXdmData ())));
91
+ .withRequestBody (equalToJson (payloadReceivedXdmData (XDM_PAYLOAD_FILE ))));
92
+ }
93
+
94
+ @ Test
95
+ public void kafkaErrorReporterMultiMessageTest () throws HttpException , IOException , InterruptedException {
96
+ inletMultiStatusSuccessfulResponse ();
97
+ getConnect ().kafka ().createTopic (TOPIC_NAME , TOPIC_PARTITION );
98
+
99
+ // Create error topic to dump failed data
100
+ Map <String , String > connectorConfig = connectorConfig (AEP_KAFKA_ERROR_CONNECTOR_CONFIG );
101
+ getConnect ().kafka ().createTopic (connectorConfig .get (DEAD_LETTER_TOPIC ), TOPIC_PARTITION );
102
+
103
+ LOG .info ("Starting connector cluster with connector : {}" , CONNECTOR_NAME );
104
+ getConnect ().configureConnector (CONNECTOR_NAME , connectorConfig );
105
+
106
+ String xdmData = xdmData (XDM_MULTI_MESSAGE_PAYLOAD_FILE );
107
+ getConnect ().kafka ().produce (TOPIC_NAME , xdmData );
108
+ waitForConnectorStart (CONNECTOR_NAME , 1 , 8000 );
109
+
110
+ // Check if error record sent to error topic
111
+ ConsumerRecords <byte [], byte []> consumerRecords = getConnect ().kafka ()
112
+ .consume (1 , 8000 , connectorConfig .get (DEAD_LETTER_TOPIC ));
113
+
114
+ Assertions .assertEquals (1 , consumerRecords .count ());
115
+
116
+ ConsumerRecord <byte [], byte []> consumerRecord = consumerRecords .iterator ().next ();
117
+ JsonNode record = JacksonFactory .OBJECT_MAPPER .readTree (consumerRecord .value ());
118
+
119
+ Assertions .assertEquals (JacksonFactory .OBJECT_MAPPER .readTree (xdmData ).toString (), record .toString ());
120
+
121
+ // Verify inlet endpoint received 1 XDM record
122
+ getWiremockServer ().verify (postRequestedFor (urlEqualTo (getRelativeUrl ()))
123
+ .withRequestBody (equalToJson (payloadReceivedXdmData (XDM_MULTI_MESSAGE_PAYLOAD_FILE ))));
91
124
}
92
125
93
126
@ Test
@@ -101,7 +134,7 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
101
134
LOG .info ("Starting connector cluster with connector : {}" , CONNECTOR_NAME );
102
135
getConnect ().configureConnector (CONNECTOR_NAME , connectorConfig );
103
136
104
- String xdmData = xdmData ();
137
+ String xdmData = xdmData (XDM_PAYLOAD_FILE );
105
138
getConnect ().kafka ().produce (TOPIC_NAME , xdmData );
106
139
waitForConnectorStart (CONNECTOR_NAME , 1 , 8000 );
107
140
@@ -124,11 +157,11 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
124
157
125
158
// Verify inlet endpoint received 1 XDM record
126
159
getWiremockServer ().verify (postRequestedFor (urlEqualTo (getRelativeUrl ()))
127
- .withRequestBody (equalToJson (payloadReceivedXdmData ())));
160
+ .withRequestBody (equalToJson (payloadReceivedXdmData (XDM_PAYLOAD_FILE ))));
128
161
}
129
162
130
- public String payloadReceivedXdmData () throws HttpException , JsonProcessingException {
131
- String xdmData = xdmData ();
163
+ public String payloadReceivedXdmData (String payloadfile ) throws HttpException , JsonProcessingException {
164
+ String xdmData = xdmData (payloadfile );
132
165
ObjectNode messageNode = JacksonFactory .OBJECT_MAPPER .createObjectNode ();
133
166
ArrayNode xdmDataValues = JacksonFactory .OBJECT_MAPPER .createArrayNode ();
134
167
xdmDataValues .add (JacksonFactory .OBJECT_MAPPER .readTree (xdmData ));
@@ -137,8 +170,8 @@ public String payloadReceivedXdmData() throws HttpException, JsonProcessingExcep
137
170
return JacksonFactory .OBJECT_MAPPER .writeValueAsString (messageNode );
138
171
}
139
172
140
- public String xdmData () throws HttpException {
141
- return HttpUtil .streamToString (this .getClass ().getClassLoader ().getResourceAsStream (XDM_PAYLOAD_FILE ));
173
+ public String xdmData (String payloadfile ) throws HttpException {
174
+ return HttpUtil .streamToString (this .getClass ().getClassLoader ().getResourceAsStream (payloadfile ));
142
175
}
143
176
144
177
public Map <String , String > connectorConfig (String configFile ) throws HttpException , JsonProcessingException {
0 commit comments