Skip to content

Commit 2cbd9c7

Browse files
authored
Merge pull request #2 from rjrudin/feature/1-transform
#1 Adding support for a REST transform with parameters
2 parents 87ab7a8 + 4547f3a commit 2cbd9c7

File tree

3 files changed

+108
-0
lines changed

3 files changed

+108
-0
lines changed

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ public class MarkLogicSinkConfig extends AbstractConfig {
2626

2727
public static final String DMSDK_BATCH_SIZE = "ml.dmsdk.batchSize";
2828
public static final String DMSDK_THREAD_COUNT = "ml.dmsdk.threadCount";
29+
public static final String DMSDK_TRANSFORM = "ml.dmsdk.transform";
30+
public static final String DMSDK_TRANSFORM_PARAMS = "ml.dmsdk.transformParams";
31+
public static final String DMSDK_TRANSFORM_PARAMS_DELIMITER = "ml.dmsdk.transformParamsDelimiter";
2932

3033
public static final String DOCUMENT_COLLECTIONS = "ml.document.collections";
3134
public static final String DOCUMENT_PERMISSIONS = "ml.document.permissions";
@@ -48,6 +51,9 @@ public class MarkLogicSinkConfig extends AbstractConfig {
4851
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, Importance.LOW, "External name for Kerberos authentication")
4952
.define(DMSDK_BATCH_SIZE, Type.INT, 100, Importance.HIGH, "Number of documents to write in each batch")
5053
.define(DMSDK_THREAD_COUNT, Type.INT, 8, Importance.HIGH, "Number of threads for DMSDK to use")
54+
.define(DMSDK_TRANSFORM, Type.STRING, Importance.MEDIUM, "Name of a REST transform to use when writing documents")
55+
.define(DMSDK_TRANSFORM_PARAMS, Type.STRING, Importance.MEDIUM, "Delimited set of transform names and values")
56+
.define(DMSDK_TRANSFORM_PARAMS_DELIMITER, Type.STRING, ",", Importance.LOW, "Delimiter for transform parameter names and values; defaults to a comma")
5157
.define(DOCUMENT_COLLECTIONS, Type.STRING, Importance.MEDIUM, "String-delimited collections to add each document to")
5258
.define(DOCUMENT_FORMAT, Type.STRING, Importance.LOW, "Defines format of each document; can be one of json, xml, text, binary, or unknown")
5359
.define(DOCUMENT_MIMETYPE, Type.STRING, Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

+39
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.marklogic.client.DatabaseClient;
44
import com.marklogic.client.datamovement.DataMovementManager;
55
import com.marklogic.client.datamovement.WriteBatcher;
6+
import com.marklogic.client.document.ServerTransform;
67
import com.marklogic.kafka.connect.DefaultDatabaseClientCreator;
78
import org.apache.kafka.connect.sink.SinkRecord;
89
import org.apache.kafka.connect.sink.SinkTask;
@@ -37,11 +38,49 @@ public void start(final Map<String, String> config) {
3738
.withBatchSize(Integer.valueOf(config.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE)))
3839
.withThreadCount(Integer.valueOf(config.get(MarkLogicSinkConfig.DMSDK_THREAD_COUNT)));
3940

41+
ServerTransform transform = buildServerTransform(config);
42+
if (transform != null) {
43+
writeBatcher.withTransform(transform);
44+
}
45+
4046
dataMovementManager.startJob(writeBatcher);
4147

4248
logger.info("Started");
4349
}
4450

51+
/**
52+
* Builds a REST ServerTransform object based on the DMSDK parameters in the given config. If no transform name
53+
* is configured, then null will be returned.
54+
*
55+
* @param config
56+
* @return
57+
*/
58+
protected ServerTransform buildServerTransform(final Map<String, String> config) {
59+
String transform = config.get(MarkLogicSinkConfig.DMSDK_TRANSFORM);
60+
if (transform != null && transform.trim().length() > 0) {
61+
ServerTransform t = new ServerTransform(transform);
62+
String params = config.get(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS);
63+
if (params != null && params.trim().length() > 0) {
64+
String delimiter = config.get(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER);
65+
if (delimiter != null && delimiter.trim().length() > 0) {
66+
String[] tokens = params.split(delimiter);
67+
for (int i = 0; i < tokens.length; i += 2) {
68+
if (i + 1 >= tokens.length) {
69+
throw new IllegalArgumentException(String.format("The value of the %s property does not have an even number of " +
70+
"parameter names and values; property value: %s", MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS, params));
71+
}
72+
t.addParameter(tokens[i], tokens[i + 1]);
73+
}
74+
} else {
75+
logger.warn(String.format("Unable to apply transform parameters to transform: %s; please set the " +
76+
"delimiter via the %s property", transform, MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER));
77+
}
78+
}
79+
return t;
80+
}
81+
return null;
82+
}
83+
4584
/**
4685
* Creates a new DatabaseClient based on the configuration properties found in the given map.
4786
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.marklogic.client.document.ServerTransform;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
import static org.junit.jupiter.api.Assertions.*;
10+
11+
public class BuildServerTransformTest {
12+
13+
private MarkLogicSinkTask task = new MarkLogicSinkTask();
14+
private Map<String, String> config = new HashMap<>();
15+
16+
@Test
17+
public void noTransform() {
18+
assertNull(task.buildServerTransform(config));
19+
}
20+
21+
@Test
22+
public void noParams() {
23+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM, "noParams");
24+
ServerTransform t = task.buildServerTransform(config);
25+
assertEquals("noParams", t.getName());
26+
assertTrue(t.keySet().isEmpty());
27+
}
28+
29+
@Test
30+
public void oneParam() {
31+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM, "oneParam");
32+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS, "param1,value1");
33+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER, ",");
34+
ServerTransform t = task.buildServerTransform(config);
35+
assertEquals(1, t.keySet().size());
36+
assertEquals("value1", t.get("param1").get(0));
37+
}
38+
39+
@Test
40+
public void twoParamsWithCustomDelimiter() {
41+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM, "twoParams");
42+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS, "param1;value1;param2;value2");
43+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER, ";");
44+
ServerTransform t = task.buildServerTransform(config);
45+
assertEquals(2, t.keySet().size());
46+
assertEquals("value1", t.get("param1").get(0));
47+
assertEquals("value2", t.get("param2").get(0));
48+
}
49+
50+
@Test
51+
public void malformedParams() {
52+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM, "malformedParams");
53+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS, "param1,value1,param2");
54+
config.put(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS_DELIMITER, ",");
55+
try {
56+
task.buildServerTransform(config);
57+
fail("The call should have failed because the params property does not have an even number of parameter " +
58+
"names and values");
59+
} catch (IllegalArgumentException ex) {
60+
assertTrue(ex.getMessage().startsWith("The value of the ml.dmsdk.transformParams property"));
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)