diff --git a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java index 0300cd13ef..7c93076c26 100644 --- a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java +++ b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java @@ -151,7 +151,9 @@ public void send(MQDestination mqDestination, Message message, Callback callback // 针对不同的topic,引入多线程提升效率 for (Map.Entry entry : messageMap.entrySet()) { - final String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + final String topicName = mqDestination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); final Message messageSub = entry.getValue(); template.submit((Callable) () -> { try { diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index b13003dfd8..e6213b8abf 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -158,7 +158,9 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes .messageTopics(message, destination.getTopic(), destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> { try { diff --git a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java index 9e4c8b197e..259d11e76f 100644 --- a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java +++ b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java @@ -142,7 +142,9 @@ public void send(final MQDestination destination, Message message, Callback call .messageTopics(message, destination.getTopic(), destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - final String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + final String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> send(destination, topicName, messageSub)); diff --git a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java index b2024e0364..a3f787a6f4 100644 --- a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java +++ b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java @@ -154,7 +154,9 @@ private void loadRocketMQProperties(Properties properties) { destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> { try {