Skip to content

Commit 2c14ff1

Browse files
feat: add case transform
1 parent ed40086 commit 2c14ff1

File tree

10 files changed

+889
-55
lines changed

10 files changed

+889
-55
lines changed

README.md

+26
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,32 @@ transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.Ex
196196
transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$
197197
```
198198

199+
### `CaseTransform`
200+
201+
This transformation transforms the case a string value from the record field to uppercase or lowercase.
202+
203+
This transform can modify fields of `STRING` type.
204+
205+
It supports fields with (e.g. Avro) or without schema (e.g. JSON).
206+
207+
Exists in two variants:
208+
- `io.aiven.kafka.connect.transforms.CaseTransform$Key` - works on keys;
209+
- `io.aiven.kafka.connect.transforms.CaseTransform$Value` - works on values.
210+
211+
The transformation defines the following configurations:
212+
213+
- `field.names` - The name of the fields which should be case transformed.
214+
- `case` - either `lower` or `upper` for transforming the case as desired.
215+
216+
Here is an example of this transformation configuration:
217+
218+
```properties
219+
transforms=caseTransform
220+
transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Value
221+
transforms.caseTransform.field.names=inner_field_name_1, inner_field_name_2
222+
```
223+
224+
199225
## License
200226

201227
This project is licensed under the [Apache License, Version 2.0](LICENSE).

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ sourceCompatibility = JavaVersion.VERSION_11
4242
targetCompatibility = JavaVersion.VERSION_11
4343

4444
ext {
45+
jacksonVersion = "2.18.2"
4546
kafkaVersion = "2.0.1"
4647
testcontainersVersion = "1.20.4"
4748
debeziumVersion = "1.3.0.Final"
@@ -114,6 +115,7 @@ dependencies {
114115
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"
115116

116117
integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
118+
integrationTestImplementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion")
117119
integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version
118120
// Make test utils from 'test' available in 'integration-test'
119121
integrationTestImplementation sourceSets.test.output
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.config.ConfigDef;
24+
import org.apache.kafka.connect.source.SourceConnector;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public abstract class AbstractTestSourceConnector extends SourceConnector {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTestSourceConnector.class);
32+
33+
@Override
34+
public void start(final Map<String, String> props) {
35+
// no-op
36+
}
37+
38+
@Override
39+
public void stop() {
40+
// no-op
41+
}
42+
43+
@Override
44+
public List<Map<String, String>> taskConfigs(final int maxTasks) {
45+
return Collections.singletonList(Collections.emptyMap());
46+
}
47+
48+
@Override
49+
public ConfigDef config() {
50+
return new ConfigDef();
51+
}
52+
53+
@Override
54+
public String version() {
55+
return null;
56+
}
57+
}

src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java

+58-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.io.File;
2020
import java.io.IOException;
2121
import java.nio.file.Files;
22+
import java.time.Duration;
2223
import java.util.Arrays;
24+
import java.util.Collections;
2325
import java.util.HashMap;
2426
import java.util.Map;
2527
import java.util.Properties;
@@ -30,9 +32,12 @@
3032
import org.apache.kafka.clients.admin.AdminClientConfig;
3133
import org.apache.kafka.clients.admin.NewTopic;
3234
import org.apache.kafka.clients.consumer.ConsumerConfig;
35+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3336
import org.apache.kafka.clients.consumer.KafkaConsumer;
3437
import org.apache.kafka.common.TopicPartition;
3538

39+
import com.fasterxml.jackson.core.type.TypeReference;
40+
import com.fasterxml.jackson.databind.ObjectMapper;
3641
import org.junit.jupiter.api.AfterEach;
3742
import org.junit.jupiter.api.BeforeAll;
3843
import org.junit.jupiter.api.BeforeEach;
@@ -130,6 +135,9 @@ void setUp() throws ExecutionException, InterruptedException {
130135
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
131136
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
132137
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
138+
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
139+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
140+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
133141
consumer = new KafkaConsumer<>(consumerProps);
134142

135143
final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1);
@@ -156,7 +164,7 @@ final void tearDown() {
156164

157165
@Test
158166
@Timeout(10)
159-
final void testExtractTopic() throws ExecutionException, InterruptedException, IOException {
167+
final void testExtractTopic() throws ExecutionException, InterruptedException {
160168
final Map<String, String> connectorConfig = new HashMap<>();
161169
connectorConfig.put("name", "test-source-connector");
162170
connectorConfig.put("connector.class", TestSourceConnector.class.getName());
@@ -194,6 +202,55 @@ final void testExtractTopicFromValueSchemaName() throws ExecutionException, Inte
194202

195203
}
196204

205+
@Test
206+
@Timeout(10)
207+
final void testCaseTransform() throws ExecutionException, InterruptedException, IOException {
208+
adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.SOURCE_TOPIC, 1, (short) 1)))
209+
.all().get();
210+
adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.TARGET_TOPIC, 1, (short) 1)))
211+
.all().get();
212+
213+
final Map<String, String> connectorConfig = new HashMap<>();
214+
connectorConfig.put("name", "test-source-connector");
215+
connectorConfig.put("connector.class", TestCaseTransformConnector.class.getName());
216+
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
217+
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
218+
connectorConfig.put("value.converter.value.subject.name.strategy",
219+
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
220+
connectorConfig.put("tasks.max", "1");
221+
connectorConfig.put("transforms", "regexRouteToTargetTopic, caseTransform");
222+
connectorConfig.put("transforms.caseTransform.case", "upper");
223+
connectorConfig.put("transforms.caseTransform.field.names", TestCaseTransformConnector.TRANSFORM_FIELD);
224+
connectorConfig.put("transforms.caseTransform.type", "io.aiven.kafka.connect.transforms.CaseTransform$Value");
225+
connectorConfig.put("transforms.regexRouteToTargetTopic.type",
226+
"org.apache.kafka.connect.transforms.RegexRouter");
227+
connectorConfig.put("transforms.regexRouteToTargetTopic.regex", "(.*)-source-(.*)");
228+
connectorConfig.put("transforms.regexRouteToTargetTopic.replacement", String.format("$1-target-$2"));
229+
230+
connectRunner.createConnector(connectorConfig);
231+
checkMessageTransformInTopic(
232+
new TopicPartition(TestCaseTransformConnector.TARGET_TOPIC, 0),
233+
TestCaseTransformConnector.MESSAGES_TO_PRODUCE
234+
);
235+
}
236+
237+
final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages)
238+
throws InterruptedException, IOException {
239+
waitForCondition(
240+
() -> consumer.endOffsets(Arrays.asList(topicPartition))
241+
.values().stream().reduce(Long::sum).map(s -> s == expectedNumberOfMessages)
242+
.orElse(false), 5000, "Messages appear in target topic"
243+
);
244+
consumer.subscribe(Collections.singletonList(topicPartition.topic()));
245+
final ObjectMapper objectMapper = new ObjectMapper();
246+
final TypeReference<Map<String, Object>> tr = new TypeReference<>() {};
247+
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
248+
final Map<String, Object> value = objectMapper.readValue(consumerRecord.value(), tr);
249+
final Map<String, String> payload = (Map<String, String>) value.get("payload");
250+
assertThat(payload.get("transform")).isEqualTo("LOWER-CASE-DATA-TRANSFORMS-TO-UPPERCASE");
251+
}
252+
}
253+
197254
final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition)
198255
throws InterruptedException {
199256
waitForCondition(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.connect.connector.Task;
25+
import org.apache.kafka.connect.data.Schema;
26+
import org.apache.kafka.connect.data.SchemaBuilder;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.apache.kafka.connect.source.SourceRecord;
29+
import org.apache.kafka.connect.source.SourceTask;
30+
31+
public class TestCaseTransformConnector extends AbstractTestSourceConnector {
32+
static final long MESSAGES_TO_PRODUCE = 10L;
33+
34+
static final String SOURCE_TOPIC = "case-transform-source-topic";
35+
static final String TARGET_TOPIC = "case-transform-target-topic";
36+
static final String TRANSFORM_FIELD = "transform";
37+
38+
@Override
39+
public Class<? extends Task> taskClass() {
40+
return TestCaseTransformConnector.TestSourceConnectorTask.class;
41+
}
42+
43+
public static class TestSourceConnectorTask extends SourceTask {
44+
private int counter = 0;
45+
46+
private final Schema valueSchema = SchemaBuilder.struct()
47+
.field(TRANSFORM_FIELD, SchemaBuilder.STRING_SCHEMA)
48+
.schema();
49+
private final Struct value =
50+
new Struct(valueSchema).put(TRANSFORM_FIELD, "lower-case-data-transforms-to-uppercase");
51+
52+
@Override
53+
public void start(final Map<String, String> props) {
54+
}
55+
56+
@Override
57+
public List<SourceRecord> poll() {
58+
if (counter >= MESSAGES_TO_PRODUCE) {
59+
return null; // indicate pause
60+
}
61+
62+
final Map<String, String> sourcePartition = new HashMap<>();
63+
sourcePartition.put("partition", "0");
64+
final Map<String, String> sourceOffset = new HashMap<>();
65+
sourceOffset.put("offset", Integer.toString(counter));
66+
67+
counter += 1;
68+
69+
return Collections.singletonList(
70+
new SourceRecord(sourcePartition, sourceOffset,
71+
SOURCE_TOPIC,
72+
valueSchema, value)
73+
);
74+
}
75+
76+
@Override
77+
public void stop() {
78+
}
79+
80+
@Override
81+
public String version() {
82+
return null;
83+
}
84+
}
85+
}

src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java

+2-27
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24-
import org.apache.kafka.common.config.ConfigDef;
2524
import org.apache.kafka.connect.connector.Task;
2625
import org.apache.kafka.connect.data.Schema;
2726
import org.apache.kafka.connect.data.SchemaBuilder;
2827
import org.apache.kafka.connect.data.Struct;
29-
import org.apache.kafka.connect.source.SourceConnector;
3028
import org.apache.kafka.connect.source.SourceRecord;
3129
import org.apache.kafka.connect.source.SourceTask;
3230

@@ -35,41 +33,18 @@
3533
*
3634
* <p>It just produces a fixed number of struct records.
3735
*/
38-
public class TestSourceConnector extends SourceConnector {
36+
public final class TestSourceConnector extends AbstractTestSourceConnector {
3937
static final long MESSAGES_TO_PRODUCE = 10L;
4038

4139
static final String ORIGINAL_TOPIC = "original-topic";
4240
static final String NEW_TOPIC = "new-topic";
4341
static final String ROUTING_FIELD = "field-0";
4442

45-
@Override
46-
public void start(final Map<String, String> props) {
47-
}
48-
4943
@Override
5044
public Class<? extends Task> taskClass() {
5145
return TestSourceConnectorTask.class;
5246
}
5347

54-
@Override
55-
public List<Map<String, String>> taskConfigs(final int maxTasks) {
56-
return Collections.singletonList(Collections.emptyMap());
57-
}
58-
59-
@Override
60-
public void stop() {
61-
}
62-
63-
@Override
64-
public ConfigDef config() {
65-
return new ConfigDef();
66-
}
67-
68-
@Override
69-
public String version() {
70-
return null;
71-
}
72-
7348
public static class TestSourceConnectorTask extends SourceTask {
7449
private int counter = 0;
7550

@@ -83,7 +58,7 @@ public void start(final Map<String, String> props) {
8358
}
8459

8560
@Override
86-
public List<SourceRecord> poll() throws InterruptedException {
61+
public List<SourceRecord> poll() {
8762
if (counter >= MESSAGES_TO_PRODUCE) {
8863
return null; // indicate pause
8964
}

0 commit comments

Comments
 (0)