Skip to content

Commit 909f810

Browse files
vivetiwavivetiwa
andauthored
ISSUE-57 Add support for json patch transformation as connect SMT (#58)
# Conflicts: # dependencies.gradle Co-authored-by: vivetiwa <[email protected]>
1 parent 8b7fbd4 commit 909f810

File tree

6 files changed

+266
-0
lines changed

6 files changed

+266
-0
lines changed

dependencies.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ versions.jmxPrometheusJavaAgent = '0.12.0'
2626
versions.junitJupiter = '5.2.0'
2727
versions.jjwt = '0.9.0'
2828
versions.orgJson = '20230227'
29+
versions.jsonPatch = '0.4.4'
2930
versions.kafkaConnect = '2.8.0'
3031
versions.slf4j = '1.7.25'
3132
versions.jaxb = '2.3.0'
@@ -98,6 +99,10 @@ libraries.orgJson = [
9899
"org.json:json:$versions.orgJson"
99100
]
100101

102+
libraries.jsonPatch = [
103+
"com.flipkart.zjsonpatch:zjsonpatch:$versions.jsonPatch"
104+
]
105+
101106
libraries.slf4j = [
102107
"org.slf4j:slf4j-api:$versions.slf4j"
103108
]

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/serializer/SerializerDeserializerUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ public static JsonNode convertStringToJsonNode(Object value) {
5757
public static ObjectNode createObjectNode() {
5858
return MAPPER.createObjectNode();
5959
}
60+
61+
public static ObjectMapper getMapper() {
62+
return MAPPER;
63+
}
6064
}

streaming-connect-sink/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies {
2323
libraries.jackson, libraries.httpClient,
2424
libraries.kafkaConnect, libraries.orgJson,
2525
libraries.slf4j
26+
implementation libraries.jsonPatch
2627
compileOnly libraries.kafkaConnectRuntime
2728
jmxAgent libraries.jmxPrometheusJavaAgent
2829
testImplementation libraries.jmockit, libraries.junitJupiter,
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2023 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.sink.transformation;
14+
15+
import com.adobe.platform.streaming.AEPStreamingRuntimeException;
16+
import com.adobe.platform.streaming.http.serializer.SerializerDeserializerUtil;
17+
18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.databind.JsonNode;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import com.fasterxml.jackson.databind.node.ObjectNode;
22+
import com.flipkart.zjsonpatch.InvalidJsonPatchException;
23+
import com.flipkart.zjsonpatch.JsonPatch;
24+
25+
import org.apache.kafka.common.config.ConfigDef;
26+
import org.apache.kafka.connect.connector.ConnectRecord;
27+
import org.apache.kafka.connect.transforms.Transformation;
28+
import org.apache.kafka.connect.transforms.util.SimpleConfig;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.Map;
33+
import java.util.Objects;
34+
35+
/**
36+
* @author Adobe Inc.
37+
*/
38+
public class JsonPatchTransform<R extends ConnectRecord<R>> implements Transformation<R> {
39+
40+
private static final Logger LOG = LoggerFactory.getLogger(JsonPatchTransform.class);
41+
private static final ObjectMapper MAPPER = SerializerDeserializerUtil.getMapper();
42+
private static final ObjectNodeConverter objectNodeConverter = new ObjectNodeConverter(MAPPER);
43+
private static final String JSON_PATCH_CONFIG = "operations";
44+
private static final String JSON_PATHS_DOC = "Json pointers to evaluate for filtering";
45+
private static final ConfigDef CONFIG_DEF = new ConfigDef().define(
46+
JSON_PATCH_CONFIG,
47+
ConfigDef.Type.STRING,
48+
ConfigDef.NO_DEFAULT_VALUE,
49+
ConfigDef.Importance.HIGH,
50+
JSON_PATHS_DOC
51+
);
52+
private JsonNode jsonPatch;
53+
54+
@Override
55+
public R apply(R record) {
56+
if (LOG.isDebugEnabled()) {
57+
LOG.debug("Received record in Patch Transformation {} ", record.value());
58+
}
59+
60+
final Object value = record.value();
61+
if (Objects.isNull(value)) {
62+
return record;
63+
}
64+
try {
65+
final ObjectNode jsonValue = objectNodeConverter.convert(value);
66+
final JsonNode transformedValue = JsonPatch.apply(jsonPatch, jsonValue);
67+
Object out = MAPPER.writeValueAsString(transformedValue);
68+
if (LOG.isDebugEnabled()) {
69+
LOG.debug("Transformed record {}", jsonValue);
70+
}
71+
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
72+
record.key(), record.valueSchema(), out, System.currentTimeMillis());
73+
} catch (InvalidJsonPatchException patchAppException) {
74+
LOG.error("Message validation failed.", patchAppException);
75+
} catch (Exception ex) {
76+
LOG.error("Record failed during transformation.", ex);
77+
}
78+
return record;
79+
}
80+
81+
@Override
82+
public ConfigDef config() {
83+
return CONFIG_DEF;
84+
}
85+
86+
@Override
87+
public void close() {
88+
89+
}
90+
91+
@Override
92+
public void configure(Map<String, ?> configs) {
93+
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
94+
try {
95+
jsonPatch = MAPPER.readTree(config.getString(JSON_PATCH_CONFIG));
96+
} catch (JsonProcessingException e) {
97+
throw new AEPStreamingRuntimeException("Failed to read json patch config.", e);
98+
}
99+
}
100+
101+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.sink.transformation;
14+
15+
import com.adobe.platform.streaming.AEPStreamingRuntimeException;
16+
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import com.fasterxml.jackson.databind.node.ObjectNode;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.io.IOException;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
27+
/**
28+
* @author Adobe Inc.
29+
*/
30+
public class ObjectNodeConverter {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(ObjectNodeConverter.class);
33+
private final ObjectMapper objectMapper;
34+
35+
public ObjectNodeConverter(ObjectMapper objectMapper) {
36+
this.objectMapper = objectMapper;
37+
}
38+
39+
public ObjectNode convert(Object from) {
40+
if (Objects.isNull(from)) {
41+
return null;
42+
}
43+
ObjectNode convertedVal = null;
44+
try {
45+
if (from instanceof String) {
46+
convertedVal = (ObjectNode) objectMapper.readTree(from.toString());
47+
} else if (from instanceof byte[]) {
48+
convertedVal = (ObjectNode) objectMapper.readTree((byte[]) from);
49+
} else if (from instanceof Map) {
50+
convertedVal = objectMapper.convertValue(from, ObjectNode.class);
51+
} else if (from instanceof ObjectNode) {
52+
return (ObjectNode) from;
53+
} else {
54+
throw new AEPStreamingRuntimeException("Unsupported type for conversion to objectNode");
55+
}
56+
} catch (IOException e) {
57+
LOG.error("Exception occurred while de-serializing to ObjectNode", e);
58+
}
59+
return convertedVal;
60+
}
61+
62+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2023 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
package com.adobe.platform.streaming.unit;
14+
15+
import com.adobe.platform.streaming.http.serializer.SerializerDeserializerUtil;
16+
import com.adobe.platform.streaming.sink.transformation.JsonPatchTransform;
17+
18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.databind.JsonNode;
20+
21+
import mockit.Mocked;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.data.ConnectSchema;
24+
import org.apache.kafka.connect.header.ConnectHeaders;
25+
import org.apache.kafka.connect.source.SourceRecord;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import static org.apache.kafka.connect.data.Schema.Type.STRING;
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertNotNull;
35+
36+
/**
37+
* @author Adobe Inc.
38+
*/
39+
public class JsonPatchTransformTest {
40+
41+
@Mocked
42+
protected ConnectHeaders headers;
43+
44+
private final String jsonPatchString = "[\n" +
45+
" {\n" +
46+
" \"op\": \"add\",\n" +
47+
" \"path\": \"/header\",\n" +
48+
" \"value\": {\"operations\":{\"data\":\"merge\", \"identity\":\"create\", " +
49+
"\"identityDatasetId\":\"2474b6c6c8bb6a1c72643ac8\"}}\n" +
50+
" }\n" +
51+
"]";
52+
53+
private String recordObjectString = "{\n" +
54+
" \"payload\": {\n" +
55+
" \"crmId\": \"VM-121-SHARE\",\n" +
56+
" \"id\": 2388398\n" +
57+
" }\n" +
58+
"}";
59+
60+
private static JsonPatchTransform jsonPatchTransform;
61+
private static SourceRecord record;
62+
63+
@BeforeEach
64+
public void initJsonPatchTransform() throws Exception {
65+
final Map<String, String> configs = new HashMap<>();
66+
configs.put("operations", jsonPatchString);
67+
68+
jsonPatchTransform = new JsonPatchTransform();
69+
jsonPatchTransform.configure(configs);
70+
71+
record = new SourceRecord(new HashMap<>(), new HashMap<>(), "test-topic", 1, new ConnectSchema(STRING),
72+
"", new ConnectSchema(STRING), recordObjectString, 2L, headers);
73+
}
74+
75+
@Test
76+
public void testJsonPatch() throws JsonProcessingException {
77+
final ConnectRecord transformedRecord = jsonPatchTransform.apply(record);
78+
final JsonNode recordValueNode = SerializerDeserializerUtil.getMapper()
79+
.readTree(transformedRecord.value().toString());
80+
final JsonNode operations = recordValueNode.get("header").get("operations");
81+
final JsonNode dataNode = operations.get("data");
82+
final JsonNode identity = operations.get("identity");
83+
84+
assertNotNull(operations);
85+
assertNotNull(dataNode);
86+
assertNotNull(identity);
87+
88+
assertEquals("merge", dataNode.asText());
89+
assertEquals("create", identity.asText());
90+
91+
}
92+
93+
}

0 commit comments

Comments
 (0)