Skip to content

Commit 4be7ed5

Browse files
victorcastanogutierrezjpechane
authored andcommitted
DBZ-8752 New property rabbitmq.routingKey.source for static, topic and key states
1 parent af86c42 commit 4be7ed5

File tree

4 files changed

+243
-7
lines changed

4 files changed

+243
-7
lines changed

debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java

+66-6
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import com.rabbitmq.client.ConnectionFactoryConfigurator;
3232

3333
import io.debezium.DebeziumException;
34+
import io.debezium.annotation.VisibleForTesting;
3435
import io.debezium.engine.ChangeEvent;
3536
import io.debezium.engine.DebeziumEngine;
3637
import io.debezium.engine.DebeziumEngine.RecordCommitter;
3738
import io.debezium.engine.Header;
3839
import io.debezium.server.BaseChangeConsumer;
40+
import io.debezium.server.StreamNameMapper;
3941

4042
/**
4143
* Implementation of the consumer that delivers the messages into RabbitMQ Stream destination.
@@ -52,6 +54,23 @@ public class RabbitMqStreamChangeConsumer extends BaseChangeConsumer implements
5254
private static final String PROP_PREFIX = "debezium.sink.rabbitmq.";
5355
private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection.";
5456

57+
/**
58+
* Routing key is calculated from topic name using stream name mapper
59+
*/
60+
private static final String TOPIC_ROUTING_KEY_SOURCE = "topic";
61+
62+
/**
63+
* Routing key statically defined
64+
*/
65+
private static final String STATIC_ROUTING_KEY_SOURCE = "static";
66+
67+
/**
68+
* Routing key is the record key
69+
*/
70+
private static final String KEY_ROUTING_KEY_SOURCE = "key";
71+
72+
private static final String EMPTY_ROUTING_KEY = "";
73+
5574
@ConfigProperty(name = PROP_PREFIX + "exchange", defaultValue = "")
5675
Optional<String> exchange;
5776

@@ -64,10 +83,16 @@ public class RabbitMqStreamChangeConsumer extends BaseChangeConsumer implements
6483
@ConfigProperty(name = PROP_PREFIX + "routingKeyDurable", defaultValue = "true")
6584
Boolean routingKeyDurable;
6685

86+
@ConfigProperty(name = PROP_PREFIX + "routingKey.source", defaultValue = STATIC_ROUTING_KEY_SOURCE)
87+
String routingKeySource;
88+
6789
/**
6890
* When true, the routing key is calculated from topic name using stream name mapper.
6991
* When false the routingKey value or empty string is used.
92+
*
93+
* @deprecated Use `routingKeySource` with value `topic` instead
7094
*/
95+
@Deprecated
7196
@ConfigProperty(name = PROP_PREFIX + "routingKeyFromTopicName", defaultValue = "false")
7297
Boolean routingKeyFromTopicName;
7398

@@ -96,12 +121,18 @@ void connect() {
96121

97122
LOGGER.info("Using connection to {}:{}", factory.getHost(), factory.getPort());
98123

124+
if (Boolean.TRUE.equals(routingKeyFromTopicName)) {
125+
routingKeySource = TOPIC_ROUTING_KEY_SOURCE;
126+
LOGGER.warn("Using deprecated `{}` config value. Please, use `{}` with value `topic` instead", PROP_PREFIX + "routingKeyFromTopicName",
127+
PROP_PREFIX + "routingKey.source");
128+
}
129+
99130
try {
100131
connection = factory.newConnection();
101132
channel = connection.createChannel();
102133
channel.confirmSelect();
103134

104-
if (!routingKeyFromTopicName && autoCreateRoutingKey) {
135+
if (!isTopicRoutingKeySource() && autoCreateRoutingKey) {
105136
final var routingKeyName = routingKey.orElse("");
106137
LOGGER.info("Creating queue for routing key named '{}'", routingKeyName);
107138
channel.queueDeclare(routingKeyName, routingKeyDurable, false, false, null);
@@ -135,12 +166,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
135166
for (ChangeEvent<Object, Object> record : records) {
136167
LOGGER.trace("Received event '{}'", record);
137168

138-
final var routingKeyName = routingKey
139-
.orElse(routingKeyFromTopicName ? streamNameMapper.map(record.destination()) : "");
140-
final var exchangeName = exchange.orElse(streamNameMapper.map(record.destination()));
169+
final String exchangeName = exchange.orElse(streamNameMapper.map(record.destination()));
170+
final String routingKeyName = getRoutingKey(record).orElse(EMPTY_ROUTING_KEY);
141171

142172
try {
143-
if (routingKeyFromTopicName && autoCreateRoutingKey) {
173+
if (isTopicRoutingKeySource() && autoCreateRoutingKey) {
144174
LOGGER.trace("Creating queue for routing key named '{}'", routingKeyName);
145175
channel.queueDeclare(routingKeyName, routingKeyDurable, false, false, null);
146176
}
@@ -174,12 +204,42 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
174204
LOGGER.trace("Batch marked finished");
175205
}
176206

177-
private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> record) {
207+
private Optional<String> getRoutingKey(ChangeEvent<Object, Object> eventRecord) {
208+
if (isStaticRoutingKeySource()) {
209+
return routingKey;
210+
}
211+
else if (isTopicRoutingKeySource()) {
212+
return Optional.of(streamNameMapper.map(eventRecord.destination()));
213+
}
214+
else if (isKeyRoutingKeySource()) {
215+
return Optional.ofNullable(eventRecord.key()).map(this::getString);
216+
}
217+
return Optional.empty();
218+
}
219+
220+
private boolean isStaticRoutingKeySource() {
221+
return STATIC_ROUTING_KEY_SOURCE.equals(routingKeySource);
222+
}
223+
224+
private boolean isTopicRoutingKeySource() {
225+
return TOPIC_ROUTING_KEY_SOURCE.equals(routingKeySource);
226+
}
227+
228+
private boolean isKeyRoutingKeySource() {
229+
return KEY_ROUTING_KEY_SOURCE.equals(routingKeySource);
230+
}
231+
232+
private static Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> record) {
178233
List<Header<Object>> headers = record.headers();
179234
Map<String, Object> rabbitMqHeaders = new HashMap<>();
180235
for (Header<Object> header : headers) {
181236
rabbitMqHeaders.put(header.getKey(), header.getValue());
182237
}
183238
return rabbitMqHeaders;
184239
}
240+
241+
@VisibleForTesting
242+
void setStreamNameMapper(StreamNameMapper streamNameMapper) {
243+
this.streamNameMapper = streamNameMapper;
244+
}
185245
}

debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,12 @@ void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException
8383
connection = factory.newConnection();
8484
channel = connection.createChannel();
8585

86+
// With direct exchange and 'debezium.sink.rabbitmq.routingKey.source: topic'
87+
// the routing key is expected to be the same as the topic name. Otherwise, the
88+
// message won't reach the queue
8689
channel.exchangeDeclare(RabbitMqTestConfigSource.TOPIC_NAME, BuiltinExchangeType.DIRECT);
8790
String queue = channel.queueDeclare().getQueue();
88-
channel.queueBind(queue, RabbitMqTestConfigSource.TOPIC_NAME, "");
91+
channel.queueBind(queue, RabbitMqTestConfigSource.TOPIC_NAME, RabbitMqTestConfigSource.TOPIC_NAME);
8992

9093
channel.basicConsume(queue, new DefaultConsumer(channel) {
9194
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server.rabbitmq;
7+
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.anyBoolean;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.never;
12+
import static org.mockito.Mockito.verify;
13+
import static org.mockito.Mockito.when;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.Optional;
19+
import java.util.concurrent.TimeoutException;
20+
import java.util.stream.Stream;
21+
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.params.ParameterizedTest;
25+
import org.junit.jupiter.params.provider.Arguments;
26+
import org.junit.jupiter.params.provider.MethodSource;
27+
28+
import com.rabbitmq.client.AMQP;
29+
import com.rabbitmq.client.Channel;
30+
31+
import io.debezium.engine.ChangeEvent;
32+
import io.debezium.engine.DebeziumEngine.RecordCommitter;
33+
import io.debezium.server.StreamNameMapper;
34+
35+
class RabbitMqStreamChangeConsumerTest {
36+
37+
private static final int DELIVERY_MODE = 2;
38+
private static final int ACK_TIMEOUT = 1000;
39+
40+
private Channel channelMock;
41+
private StreamNameMapper streamNameMapperMock;
42+
private ChangeEvent<Object, Object> eventMock;
43+
private RecordCommitter<ChangeEvent<Object, Object>> committerMock;
44+
45+
private RabbitMqStreamChangeConsumer rabbitMqStreamChangeConsumer;
46+
47+
public static Stream<Arguments> testHandleBatch_StaticRoutingKeySourceParameters() {
48+
return Stream.of(
49+
Arguments.of("static-routing-key", "static-routing-key"),
50+
Arguments.of(null, ""));
51+
}
52+
53+
public static Stream<Arguments> testHandleBatch_KeyRoutingKeySourceParameters() {
54+
return Stream.of(
55+
Arguments.of("test-routing-key", "test-routing-key"),
56+
Arguments.of(null, ""));
57+
}
58+
59+
@BeforeEach
60+
@SuppressWarnings("unchecked")
61+
void setUp() {
62+
eventMock = mock(ChangeEvent.class);
63+
channelMock = mock(Channel.class);
64+
committerMock = mock(RecordCommitter.class);
65+
streamNameMapperMock = mock(StreamNameMapper.class);
66+
67+
rabbitMqStreamChangeConsumer = new RabbitMqStreamChangeConsumer();
68+
rabbitMqStreamChangeConsumer.channel = channelMock;
69+
rabbitMqStreamChangeConsumer.setStreamNameMapper(streamNameMapperMock);
70+
rabbitMqStreamChangeConsumer.deliveryMode = DELIVERY_MODE;
71+
rabbitMqStreamChangeConsumer.ackTimeout = ACK_TIMEOUT;
72+
}
73+
74+
@Test
75+
void testHandleBatch_TopicRoutingKeySource() throws InterruptedException, IOException, TimeoutException {
76+
// given
77+
String topicName = "test-topic";
78+
String payload = "test content";
79+
List<ChangeEvent<Object, Object>> records = List.of(eventMock);
80+
81+
when(eventMock.destination()).thenReturn(topicName);
82+
when(eventMock.value()).thenReturn(payload);
83+
when(eventMock.headers()).thenReturn(List.of());
84+
when(streamNameMapperMock.map(topicName)).thenReturn(topicName);
85+
86+
rabbitMqStreamChangeConsumer.exchange = Optional.of(topicName);
87+
rabbitMqStreamChangeConsumer.routingKeySource = "topic";
88+
rabbitMqStreamChangeConsumer.autoCreateRoutingKey = true;
89+
rabbitMqStreamChangeConsumer.routingKeyDurable = true;
90+
rabbitMqStreamChangeConsumer.routingKey = Optional.of("ignored");
91+
92+
// when
93+
rabbitMqStreamChangeConsumer.handleBatch(records, committerMock);
94+
95+
// then
96+
verify(channelMock).queueDeclare(topicName, true, false, false, null);
97+
98+
final AMQP.BasicProperties expectedProperties = new AMQP.BasicProperties.Builder()
99+
.deliveryMode(DELIVERY_MODE)
100+
.headers(Map.of())
101+
.build();
102+
103+
verify(channelMock).basicPublish(topicName, topicName, expectedProperties, payload.getBytes());
104+
verify(channelMock).waitForConfirmsOrDie(ACK_TIMEOUT);
105+
}
106+
107+
@ParameterizedTest
108+
@MethodSource("testHandleBatch_StaticRoutingKeySourceParameters")
109+
void testHandleBatch_StaticRoutingKeySource(String staticRoutingKey, String expectedRoutingKey) throws InterruptedException, IOException, TimeoutException {
110+
// given
111+
String topicName = "test-topic";
112+
String payload = "test content";
113+
List<ChangeEvent<Object, Object>> records = List.of(eventMock);
114+
115+
when(eventMock.destination()).thenReturn(topicName);
116+
when(eventMock.value()).thenReturn(payload);
117+
when(eventMock.headers()).thenReturn(List.of());
118+
when(streamNameMapperMock.map(topicName)).thenReturn(topicName);
119+
120+
rabbitMqStreamChangeConsumer.exchange = Optional.of(topicName);
121+
rabbitMqStreamChangeConsumer.routingKeySource = "static";
122+
rabbitMqStreamChangeConsumer.routingKey = Optional.ofNullable(staticRoutingKey);
123+
124+
// when
125+
rabbitMqStreamChangeConsumer.handleBatch(records, committerMock);
126+
127+
// then
128+
verify(channelMock, never()).queueDeclare(any(), anyBoolean(), anyBoolean(), anyBoolean(), any());
129+
130+
final AMQP.BasicProperties expectedProperties = new AMQP.BasicProperties.Builder()
131+
.deliveryMode(DELIVERY_MODE)
132+
.headers(Map.of())
133+
.build();
134+
135+
verify(channelMock).basicPublish(topicName, expectedRoutingKey, expectedProperties, payload.getBytes());
136+
verify(channelMock).waitForConfirmsOrDie(ACK_TIMEOUT);
137+
}
138+
139+
@ParameterizedTest
140+
@MethodSource("testHandleBatch_KeyRoutingKeySourceParameters")
141+
void testHandleBatch_KeyRoutingKeySource(String routingKey, String expectedRoutingKey) throws InterruptedException, IOException, TimeoutException {
142+
// given
143+
String topicName = "test-topic";
144+
String payload = "test content";
145+
List<ChangeEvent<Object, Object>> records = List.of(eventMock);
146+
147+
when(eventMock.destination()).thenReturn(topicName);
148+
when(eventMock.value()).thenReturn(payload);
149+
when(eventMock.key()).thenReturn(routingKey);
150+
when(eventMock.headers()).thenReturn(List.of());
151+
when(streamNameMapperMock.map(topicName)).thenReturn(topicName);
152+
when(streamNameMapperMock.map(routingKey)).thenReturn(routingKey);
153+
154+
rabbitMqStreamChangeConsumer.exchange = Optional.of(topicName);
155+
rabbitMqStreamChangeConsumer.routingKeySource = "key";
156+
rabbitMqStreamChangeConsumer.routingKey = Optional.of("ignored");
157+
158+
// when
159+
rabbitMqStreamChangeConsumer.handleBatch(records, committerMock);
160+
161+
// then
162+
verify(channelMock, never()).queueDeclare(any(), anyBoolean(), anyBoolean(), anyBoolean(), any());
163+
164+
final AMQP.BasicProperties expectedProperties = new AMQP.BasicProperties.Builder()
165+
.deliveryMode(DELIVERY_MODE)
166+
.headers(Map.of())
167+
.build();
168+
169+
verify(channelMock).basicPublish(topicName, expectedRoutingKey, expectedProperties, payload.getBytes());
170+
verify(channelMock).waitForConfirmsOrDie(ACK_TIMEOUT);
171+
}
172+
}

debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public RabbitMqTestConfigSource() {
3333
rabbitmqConfig.put("debezium.source.topic.prefix", "testc");
3434
rabbitmqConfig.put("debezium.source.schema.include.list", "inventory");
3535
rabbitmqConfig.put("debezium.source.table.include.list", "inventory.customers");
36+
rabbitmqConfig.put("debezium.sink.rabbitmq.routingKey.source", "topic");
3637
config = rabbitmqConfig;
3738
}
3839

0 commit comments

Comments
 (0)