Skip to content

Commit 867a763

Browse files
committed
[improve][io] KCA: option to collapse partitioned topics (apache#19923)
(cherry picked from commit 7cb48fd)
1 parent be7c237 commit 867a763

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

pulsar-io/kafka-connect-adaptor/pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,17 @@
3939
<scope>provided</scope>
4040
</dependency>
4141

42+
<dependency>
43+
<groupId>${project.groupId}</groupId>
44+
<artifactId>pulsar-io-common</artifactId>
45+
<version>${project.version}</version>
46+
<scope>compile</scope>
47+
</dependency>
48+
4249
<dependency>
4350
<groupId>com.fasterxml.jackson.core</groupId>
4451
<artifactId>jackson-databind</artifactId>
52+
<scope>compile</scope>
4553
</dependency>
4654

4755
<dependency>

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pulsar.client.api.SubscriptionType;
5656
import org.apache.pulsar.client.api.schema.GenericObject;
5757
import org.apache.pulsar.client.api.schema.KeyValueSchema;
58+
import org.apache.pulsar.common.naming.TopicName;
5859
import org.apache.pulsar.common.schema.KeyValue;
5960
import org.apache.pulsar.common.schema.SchemaType;
6061
import org.apache.pulsar.functions.api.Record;
@@ -92,6 +93,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
9293
protected String topicName;
9394

9495
private boolean sanitizeTopicName = false;
96+
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
97+
private boolean collapsePartitionedTopics = false;
98+
9599
private final Cache<String, String> sanitizedTopicCache =
96100
CacheBuilder.newBuilder().maximumSize(1000)
97101
.expireAfterAccess(30, TimeUnit.MINUTES).build();
@@ -160,6 +164,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
160164
topicName = kafkaSinkConfig.getTopic();
161165
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
162166
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
167+
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
163168

164169
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
165170
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -418,8 +423,19 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me
418423

419424
@SuppressWarnings("rawtypes")
420425
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
421-
final int partition = sourceRecord.getPartitionIndex().orElse(0);
422-
final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
426+
final int partition;
427+
final String topic;
428+
429+
if (collapsePartitionedTopics
430+
&& sourceRecord.getTopicName().isPresent()
431+
&& TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
432+
TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
433+
partition = tn.getPartitionIndex();
434+
topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
435+
} else {
436+
partition = sourceRecord.getPartitionIndex().orElse(0);
437+
topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
438+
}
423439
final Object key;
424440
final Object value;
425441
final Schema keySchema;

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
9595
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
9696
private boolean sanitizeTopicName = false;
9797

98+
@FieldDoc(
99+
defaultValue = "false",
100+
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
101+
private boolean collapsePartitionedTopics = false;
102+
98103
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
99104
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
100105
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);

pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java

+88
Original file line numberDiff line numberDiff line change
@@ -1564,6 +1564,94 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
15641564
assertEquals(ref.getBatchIdx(), batchIdx);
15651565
}
15661566

1567+
@Test
1568+
public void collapsePartitionedTopicEnabledTest() throws Exception {
1569+
testCollapsePartitionedTopic(true,
1570+
"persistent://a/b/fake-topic-partition-0",
1571+
"persistent://a/b/fake-topic",
1572+
0);
1573+
1574+
testCollapsePartitionedTopic(true,
1575+
"persistent://a/b/fake-topic-partition-1",
1576+
"persistent://a/b/fake-topic",
1577+
1);
1578+
1579+
testCollapsePartitionedTopic(true,
1580+
"persistent://a/b/fake-topic",
1581+
"persistent://a/b/fake-topic",
1582+
0);
1583+
1584+
testCollapsePartitionedTopic(true,
1585+
"fake-topic-partition-5",
1586+
"persistent://public/default/fake-topic",
1587+
5);
1588+
}
1589+
1590+
@Test
1591+
public void collapsePartitionedTopicDisabledTest() throws Exception {
1592+
testCollapsePartitionedTopic(false,
1593+
"persistent://a/b/fake-topic-partition-0",
1594+
"persistent://a/b/fake-topic-partition-0",
1595+
0);
1596+
1597+
testCollapsePartitionedTopic(false,
1598+
"persistent://a/b/fake-topic-partition-1",
1599+
"persistent://a/b/fake-topic-partition-1",
1600+
0);
1601+
1602+
testCollapsePartitionedTopic(false,
1603+
"persistent://a/b/fake-topic",
1604+
"persistent://a/b/fake-topic",
1605+
0);
1606+
1607+
testCollapsePartitionedTopic(false,
1608+
"fake-topic-partition-5",
1609+
"fake-topic-partition-5",
1610+
0);
1611+
}
1612+
1613+
private void testCollapsePartitionedTopic(boolean isEnabled,
1614+
String pulsarTopic,
1615+
String expectedKafkaTopic,
1616+
int expectedPartition) throws Exception {
1617+
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
1618+
props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
1619+
1620+
KafkaConnectSink sink = new KafkaConnectSink();
1621+
sink.open(props, context);
1622+
1623+
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
1624+
= AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
1625+
1626+
final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
1627+
obj.put("field1", (byte) 10);
1628+
obj.put("field2", "test");
1629+
obj.put("field3", (short) 100);
1630+
1631+
final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
1632+
Message msg = mock(MessageImpl.class);
1633+
when(msg.getValue()).thenReturn(rec);
1634+
when(msg.getKey()).thenReturn("key");
1635+
when(msg.hasKey()).thenReturn(true);
1636+
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
1637+
1638+
final AtomicInteger status = new AtomicInteger(0);
1639+
Record<GenericObject> record = PulsarRecord.<String>builder()
1640+
.topicName(pulsarTopic)
1641+
.message(msg)
1642+
.schema(pulsarAvroSchema)
1643+
.ackFunction(status::incrementAndGet)
1644+
.failFunction(status::decrementAndGet)
1645+
.build();
1646+
1647+
SinkRecord sinkRecord = sink.toSinkRecord(record);
1648+
1649+
Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
1650+
Assert.assertEquals((int)sinkRecord.kafkaPartition(), expectedPartition);
1651+
1652+
sink.close();
1653+
}
1654+
15671655
@SneakyThrows
15681656
private java.util.Date getDateFromString(String dateInString) {
15691657
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");

0 commit comments

Comments
 (0)