Skip to content

Commit 7cb48fd

Browse files
authored
[improve][io] KCA: option to collapse partitioned topics (#19923)
1 parent bbf5273 commit 7cb48fd

File tree

4 files changed

+118
-2
lines changed

4 files changed

+118
-2
lines changed

pulsar-io/kafka-connect-adaptor/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@
3939
<scope>provided</scope>
4040
</dependency>
4141

42+
<dependency>
43+
<groupId>${project.groupId}</groupId>
44+
<artifactId>pulsar-common</artifactId>
45+
<version>${project.version}</version>
46+
<scope>compile</scope>
47+
</dependency>
48+
4249
<dependency>
4350
<groupId>com.fasterxml.jackson.core</groupId>
4451
<artifactId>jackson-databind</artifactId>

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.pulsar.client.api.SubscriptionType;
5555
import org.apache.pulsar.client.api.schema.GenericObject;
5656
import org.apache.pulsar.client.api.schema.KeyValueSchema;
57+
import org.apache.pulsar.common.naming.TopicName;
5758
import org.apache.pulsar.common.schema.KeyValue;
5859
import org.apache.pulsar.common.schema.SchemaType;
5960
import org.apache.pulsar.functions.api.Record;
@@ -91,6 +92,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
9192
protected String topicName;
9293

9394
private boolean sanitizeTopicName = false;
95+
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
96+
private boolean collapsePartitionedTopics = false;
97+
9498
private final Cache<String, String> sanitizedTopicCache =
9599
CacheBuilder.newBuilder().maximumSize(1000)
96100
.expireAfterAccess(30, TimeUnit.MINUTES).build();
@@ -159,6 +163,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
159163
topicName = kafkaSinkConfig.getTopic();
160164
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
161165
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
166+
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
162167

163168
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
164169
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -417,8 +422,19 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me
417422

418423
@SuppressWarnings("rawtypes")
419424
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
420-
final int partition = sourceRecord.getPartitionIndex().orElse(0);
421-
final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
425+
final int partition;
426+
final String topic;
427+
428+
if (collapsePartitionedTopics
429+
&& sourceRecord.getTopicName().isPresent()
430+
&& TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
431+
TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
432+
partition = tn.getPartitionIndex();
433+
topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
434+
} else {
435+
partition = sourceRecord.getPartitionIndex().orElse(0);
436+
topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
437+
}
422438
final Object key;
423439
final Object value;
424440
final Schema keySchema;

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
9494
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
9595
private boolean sanitizeTopicName = false;
9696

97+
@FieldDoc(
98+
defaultValue = "false",
99+
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
100+
private boolean collapsePartitionedTopics = false;
101+
97102
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
98103
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
99104
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);

pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java

+88
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,94 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
15711571
assertEquals(ref.getBatchIdx(), batchIdx);
15721572
}
15731573

1574+
@Test
1575+
public void collapsePartitionedTopicEnabledTest() throws Exception {
1576+
testCollapsePartitionedTopic(true,
1577+
"persistent://a/b/fake-topic-partition-0",
1578+
"persistent://a/b/fake-topic",
1579+
0);
1580+
1581+
testCollapsePartitionedTopic(true,
1582+
"persistent://a/b/fake-topic-partition-1",
1583+
"persistent://a/b/fake-topic",
1584+
1);
1585+
1586+
testCollapsePartitionedTopic(true,
1587+
"persistent://a/b/fake-topic",
1588+
"persistent://a/b/fake-topic",
1589+
0);
1590+
1591+
testCollapsePartitionedTopic(true,
1592+
"fake-topic-partition-5",
1593+
"persistent://public/default/fake-topic",
1594+
5);
1595+
}
1596+
1597+
@Test
1598+
public void collapsePartitionedTopicDisabledTest() throws Exception {
1599+
testCollapsePartitionedTopic(false,
1600+
"persistent://a/b/fake-topic-partition-0",
1601+
"persistent://a/b/fake-topic-partition-0",
1602+
0);
1603+
1604+
testCollapsePartitionedTopic(false,
1605+
"persistent://a/b/fake-topic-partition-1",
1606+
"persistent://a/b/fake-topic-partition-1",
1607+
0);
1608+
1609+
testCollapsePartitionedTopic(false,
1610+
"persistent://a/b/fake-topic",
1611+
"persistent://a/b/fake-topic",
1612+
0);
1613+
1614+
testCollapsePartitionedTopic(false,
1615+
"fake-topic-partition-5",
1616+
"fake-topic-partition-5",
1617+
0);
1618+
}
1619+
1620+
private void testCollapsePartitionedTopic(boolean isEnabled,
1621+
String pulsarTopic,
1622+
String expectedKafkaTopic,
1623+
int expectedPartition) throws Exception {
1624+
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
1625+
props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
1626+
1627+
KafkaConnectSink sink = new KafkaConnectSink();
1628+
sink.open(props, context);
1629+
1630+
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
1631+
= AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
1632+
1633+
final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
1634+
obj.put("field1", (byte) 10);
1635+
obj.put("field2", "test");
1636+
obj.put("field3", (short) 100);
1637+
1638+
final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
1639+
Message msg = mock(MessageImpl.class);
1640+
when(msg.getValue()).thenReturn(rec);
1641+
when(msg.getKey()).thenReturn("key");
1642+
when(msg.hasKey()).thenReturn(true);
1643+
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
1644+
1645+
final AtomicInteger status = new AtomicInteger(0);
1646+
Record<GenericObject> record = PulsarRecord.<String>builder()
1647+
.topicName(pulsarTopic)
1648+
.message(msg)
1649+
.schema(pulsarAvroSchema)
1650+
.ackFunction(status::incrementAndGet)
1651+
.failFunction(status::decrementAndGet)
1652+
.build();
1653+
1654+
SinkRecord sinkRecord = sink.toSinkRecord(record);
1655+
1656+
Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
1657+
Assert.assertEquals(sinkRecord.kafkaPartition(), expectedPartition);
1658+
1659+
sink.close();
1660+
}
1661+
15741662
@SneakyThrows
15751663
private java.util.Date getDateFromString(String dateInString) {
15761664
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");

0 commit comments

Comments
 (0)