Skip to content

Commit c776465

Browse files
maniskumvivetiwa
andauthored
Improved logging for the message published in AEP (#68)
* Improved logging for the message published in AEP * Remove HttpException --------- Co-authored-by: vivetiwa <[email protected]>
1 parent a050275 commit c776465

File tree

5 files changed

+57
-6
lines changed

5 files changed

+57
-6
lines changed

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class AEPPublisher extends AbstractAEPPublisher {
4444
private static final Logger LOG = LoggerFactory.getLogger(AEPPublisher.class);
4545

4646
private static final String MESSAGES_KEY = "messages";
47+
private static final String RESPONSES_KEY = "responses";
48+
private static final String STATUS_KEY = "status";
4749

4850
private int count;
4951
private final HttpProducer producer;
@@ -61,6 +63,9 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
6163
LOG.debug("No messages to publish");
6264
return;
6365
}
66+
int failedMessageCount = 0;
67+
int successMessageCount = 0;
68+
int totalMessageCount;
6469

6570
final ArrayNode jsonMessages = JacksonFactory.OBJECT_MAPPER.createArrayNode();
6671
try {
@@ -80,14 +85,32 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
8085
final JsonNode payload = JacksonFactory.OBJECT_MAPPER.createObjectNode()
8186
.set(MESSAGES_KEY, jsonMessages);
8287

88+
totalMessageCount = jsonMessages.size();
89+
8390
final JsonNode response = producer.post(
8491
StringUtils.EMPTY,
8592
JacksonFactory.OBJECT_MAPPER.writeValueAsBytes(payload),
8693
ContentHandler.jsonHandler()
8794
);
8895

8996
count++;
90-
LOG.debug("Successfully published data to Adobe Experience Platform: {}", response);
97+
98+
if (!response.isNull() && !response.isEmpty()) {
99+
JsonNode publishMessagesResponses = response.get(RESPONSES_KEY);
100+
for (JsonNode messageResponse : publishMessagesResponses) {
101+
if (messageResponse.hasNonNull(STATUS_KEY)) {
102+
failedMessageCount++;
103+
LOG.debug("Failed to publish message to Adobe Experience Platform: {}", messageResponse);
104+
} else {
105+
successMessageCount++;
106+
}
107+
}
108+
LOG.debug("Total publish message count = {}. Success message count = {}. Failed " +
109+
"message count = {}.", totalMessageCount, successMessageCount, failedMessageCount);
110+
} else {
111+
LOG.error("Invalid Response received while publishing data to Adobe Experience Platform: {}", response);
112+
}
113+
91114
} catch (JsonProcessingException jsonException) {
92115
LOG.error("Failed to publish data to Adobe Experience Platform", jsonException);
93116
if (Objects.nonNull(errorReporter)) {

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class AEPSinkConnectorErrorReporterTest extends AbstractConnectorTest {
5454

5555
@BeforeEach
5656
@Override
57-
public void setup() throws JsonProcessingException {
57+
public void setup() throws IOException {
5858
super.setup();
5959
inletFailedResponse();
6060
}

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import java.io.IOException;
3031
import java.util.Map;
3132

3233
import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
@@ -54,7 +55,7 @@ public class AEPSinkConnectorTest extends AbstractConnectorTest {
5455

5556
@BeforeEach
5657
@Override
57-
public void setup() throws JsonProcessingException {
58+
public void setup() throws IOException {
5859
super.setup();
5960
inletSuccessfulResponse();
6061
inletSuccessfulResponseViaProxy();

streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.junit.jupiter.api.extension.ExtendWith;
2929
import org.junit.jupiter.api.extension.RegisterExtension;
3030

31+
import java.io.IOException;
3132
import java.util.HashMap;
3233
import java.util.Map;
3334
import java.util.Properties;
@@ -52,6 +53,9 @@ public abstract class AbstractConnectorTest {
5253
private static final String AUTH_TOKEN_RESPONSE_OAUTH2 = "{\"access_token\":\"accessToken\"," +
5354
"\"token_type\":\"bearer\",\"expires_in\":86400}";
5455

56+
private static final String AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE =
57+
"aep-connector-inlet-successful-response.json";
58+
5559
protected static final int TOPIC_PARTITION = 1;
5660
protected static final int NUMBER_OF_TASKS = 1;
5761
protected static final String CONNECTOR_NAME = "aep-sink-connector";
@@ -75,7 +79,7 @@ public abstract class AbstractConnectorTest {
7579
public static final WiremockExtension wiremockExtensionViaProxy = new WiremockExtension(PORT_VIA_PROXY);
7680

7781
@BeforeEach
78-
public void setup() throws JsonProcessingException {
82+
public void setup() throws IOException {
7983
connect = new EmbeddedConnectCluster.Builder()
8084
.name("aep-connect-cluster")
8185
.numWorkers(numberOfWorkers)
@@ -119,12 +123,13 @@ public void stop() {
119123
connect.stop();
120124
}
121125

122-
public void inletSuccessfulResponse() throws JsonProcessingException {
126+
public void inletSuccessfulResponse() throws IOException {
123127
wiremockExtension.getWireMockServer()
124128
.stubFor(WireMock
125129
.post(WireMock.urlEqualTo(getRelativeUrl()))
126130
.willReturn(ResponseDefinitionBuilder.responseDefinition()
127-
.withJsonBody(JacksonFactory.OBJECT_MAPPER.readTree("{\"payloadReceived\": true}"))));
131+
.withJsonBody(JacksonFactory.OBJECT_MAPPER.readTree(this.getClass().getClassLoader()
132+
.getResourceAsStream(AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE)))));
128133
}
129134

130135
public void inletIMSAuthenticationSuccessfulResponse() throws JsonProcessingException {
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"inletId": "9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5",
3+
"batchId": "1565638336649:1750:244",
4+
"receivedTimeMs": 1565638336705,
5+
"responses": [
6+
{
7+
"xactionId": "1565650704337:2124:92:3"
8+
},
9+
{
10+
"status": 400,
11+
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
12+
},
13+
{
14+
"status": 400,
15+
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has an absent or wrong ims org in the header"
16+
},
17+
{
18+
"status": 400,
19+
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
20+
}
21+
]
22+
}

0 commit comments

Comments
 (0)