From b65d5588878a560273b8da733743768a113c7b25 Mon Sep 17 00:00:00 2001 From: 52CL Date: Sun, 10 Dec 2023 15:21:59 +0800 Subject: [PATCH] fixed MQ serverMode lost default topic --- .../canal/connector/kafka/producer/CanalKafkaProducer.java | 4 +++- .../connector/pulsarmq/producer/CanalPulsarMQProducer.java | 4 +++- .../connector/rabbitmq/producer/CanalRabbitMQProducer.java | 4 +++- .../connector/rocketmq/producer/CanalRocketMQProducer.java | 4 +++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java index 0300cd13ef..7c93076c26 100644 --- a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java +++ b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java @@ -151,7 +151,9 @@ public void send(MQDestination mqDestination, Message message, Callback callback // 针对不同的topic,引入多线程提升效率 for (Map.Entry entry : messageMap.entrySet()) { - final String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + final String topicName = mqDestination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); final Message messageSub = entry.getValue(); template.submit((Callable) () -> { try { diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index b13003dfd8..e6213b8abf 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -158,7 +158,9 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes .messageTopics(message, destination.getTopic(), destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> { try { diff --git a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java index 9e4c8b197e..259d11e76f 100644 --- a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java +++ b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java @@ -142,7 +142,9 @@ public void send(final MQDestination destination, Message message, Callback call .messageTopics(message, destination.getTopic(), destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - final String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + final String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> send(destination, topicName, messageSub)); diff --git a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java index b2024e0364..a3f787a6f4 100644 --- a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java +++ b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java @@ -154,7 +154,9 @@ private void loadRocketMQProperties(Properties properties) { destination.getDynamicTopic()); for (Map.Entry entry : messageMap.entrySet()) { - String topicName = entry.getKey().replace('.', '_'); + // 与默认topic匹配直接返回默认topic + String topicName = destination.getTopic().equals(entry.getKey()) ? + entry.getKey() : entry.getKey().replace('.', '_'); com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); template.submit(() -> { try {