Skip to content

Commit 07c7690

Browse files
Add ability to describe exchange and routing key for AMQP consumer and producer. (#69)
Fix crash when having multiple instances of RabbitTemplate. related to #67
1 parent c7d7266 commit 07c7690

File tree

23 files changed

+601
-39
lines changed

23 files changed

+601
-39
lines changed

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiSerializerService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package io.github.stavshamir.springwolf.asyncapi;
22

3+
import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
34
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
45
import com.fasterxml.jackson.annotation.JsonInclude;
56
import com.fasterxml.jackson.core.JsonProcessingException;
67
import com.fasterxml.jackson.databind.ObjectMapper;
78
import com.fasterxml.jackson.databind.module.SimpleModule;
89
import com.google.common.collect.ImmutableMap;
10+
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaChannelBindingSerializer;
911
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaOperationBindingSerializer;
1012
import io.github.stavshamir.springwolf.asyncapi.types.AsyncAPI;
1113
import org.springframework.stereotype.Service;
@@ -26,6 +28,7 @@ void postConstruct() {
2628

2729
private void registerKafkaOperationBindingSerializer() {
2830
SimpleModule module = new SimpleModule();
31+
module.addSerializer(KafkaChannelBinding.class, new KafkaChannelBindingSerializer());
2932
module.addSerializer(KafkaOperationBinding.class, new KafkaOperationBindingSerializer());
3033
jsonMapper.registerModule(module);
3134
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/AbstractChannelScanner.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
22

3+
import com.asyncapi.v2.binding.ChannelBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import com.asyncapi.v2.model.channel.ChannelItem;
56
import com.asyncapi.v2.model.channel.operation.Operation;
@@ -48,7 +49,13 @@ public Map<String, ChannelItem> scan() {
4849

4950
/**
5051
* @param annotation An instance of a listener annotation.
51-
* @return A map containing an operation binding pointed to by the the protocol binding name.
52+
* @return A map containing the channel binding pointed to by the protocol binding name.
53+
*/
54+
protected abstract Map<String, ? extends ChannelBinding> buildChannelBinding(T annotation);
55+
56+
/**
57+
* @param annotation An instance of a listener annotation.
58+
* @return A map containing an operation binding pointed to by the protocol binding name.
5259
*/
5360
protected abstract Map<String, ? extends OperationBinding> buildOperationBinding(T annotation);
5461

@@ -76,14 +83,17 @@ private Map.Entry<String, ChannelItem> mapMethodToChannel(Method method) {
7683

7784
String channelName = getChannelName(annotation);
7885

86+
Map<String, ? extends ChannelBinding> channelBinding = buildChannelBinding(annotation);
7987
Map<String, ? extends OperationBinding> operationBinding = buildOperationBinding(annotation);
8088
Class<?> payload = getPayloadType(method);
81-
ChannelItem channel = buildChannel(payload, operationBinding);
89+
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding);
8290

8391
return Maps.immutableEntry(channelName, channel);
8492
}
8593

86-
private ChannelItem buildChannel(Class<?> payloadType, Map<String, ? extends OperationBinding> operationBinding) {
94+
private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBinding,
95+
Class<?> payloadType,
96+
Map<String, ? extends OperationBinding> operationBinding) {
8797
String modelName = schemasService.register(payloadType);
8898

8999
Message message = Message.builder()
@@ -98,6 +108,7 @@ private ChannelItem buildChannel(Class<?> payloadType, Map<String, ? extends Ope
98108
.build();
99109

100110
return ChannelItem.builder()
111+
.bindings(channelBinding)
101112
.publish(operation)
102113
.build();
103114
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ProducerChannelScanner.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
22

3+
import com.asyncapi.v2.binding.ChannelBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import com.asyncapi.v2.model.channel.ChannelItem;
56
import com.asyncapi.v2.model.channel.operation.Operation;
@@ -41,7 +42,7 @@ public Map<String, ChannelItem> scan() {
4142
private boolean allFieldsAreNonNull(ProducerData producerData) {
4243
boolean allNonNull = producerData.getChannelName() != null
4344
&& producerData.getPayloadType() != null
44-
&& producerData.getBinding() != null;
45+
&& producerData.getOperationBinding() != null;
4546

4647
if (!allNonNull) {
4748
log.warn("Some producer data fields are null - this producer will not be documented: {}", producerData);
@@ -53,14 +54,16 @@ private boolean allFieldsAreNonNull(ProducerData producerData) {
5354
private ChannelItem buildChannel(List<ProducerData> producerDataList) {
5455
// All bindings in the group are assumed to be the same
5556
// AsyncApi does not support multiple bindings on a single channel
56-
Map<String, ? extends OperationBinding> binding = producerDataList.get(0).getBinding();
57+
Map<String, ? extends ChannelBinding> channelBinding = producerDataList.get(0).getChannelBinding();
58+
Map<String, ? extends OperationBinding> operationBinding = producerDataList.get(0).getOperationBinding();
5759

5860
Operation operation = Operation.builder()
5961
.message(getMessageObject(producerDataList))
60-
.bindings(binding)
62+
.bindings(operationBinding)
6163
.build();
6264

6365
return ChannelItem.builder()
66+
.bindings(channelBinding)
6467
.subscribe(operation)
6568
.build();
6669
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.github.stavshamir.springwolf.asyncapi.serializers;
2+
3+
4+
import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
5+
import com.fasterxml.jackson.core.JsonGenerator;
6+
import com.fasterxml.jackson.databind.SerializerProvider;
7+
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
8+
9+
import java.io.IOException;
10+
11+
public class KafkaChannelBindingSerializer extends StdSerializer<KafkaChannelBinding> {
12+
13+
public KafkaChannelBindingSerializer() {
14+
this(null);
15+
}
16+
17+
public KafkaChannelBindingSerializer(Class<KafkaChannelBinding> t) {
18+
super(t);
19+
}
20+
21+
@Override
22+
public void serialize(KafkaChannelBinding value, JsonGenerator gen, SerializerProvider provider) throws IOException {
23+
gen.writeStartObject();
24+
gen.writeEndObject();
25+
}
26+
27+
}

springwolf-core/src/main/java/io/github/stavshamir/springwolf/asyncapi/types/ProducerData.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.types;
22

3+
import com.asyncapi.v2.binding.ChannelBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import lombok.*;
56

@@ -20,19 +21,29 @@ public class ProducerData {
2021
*/
2122
private String channelName;
2223

24+
/**
25+
* The channel binding of the producer.
26+
* <br>
27+
* For example:
28+
* <code>
29+
* ImmutableMap.of("kafka", new KafkaChannelBinding())
30+
* </code>
31+
*/
32+
private Map<String, ? extends ChannelBinding> channelBinding;
33+
2334
/**
2435
* The class object of the payload published by this producer.
2536
*/
2637
private Class<?> payloadType;
2738

2839
/**
29-
* The binding of the producer.
40+
* The operation binding of the producer.
3041
* <br>
3142
* For example:
3243
* <code>
3344
* ImmutableMap.of("kafka", new KafkaOperationBinding())
3445
* </code>
3546
*/
36-
private Map<String, ? extends OperationBinding> binding;
47+
private Map<String, ? extends OperationBinding> operationBinding;
3748

3849
}

springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/DefaultAsyncApiServiceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.asyncapi.v2.model.server.Server;
77
import com.google.common.collect.ImmutableMap;
88
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ProducerChannelScanner;
9-
import io.github.stavshamir.springwolf.asyncapi.scanners.components.DefaultClassPathComponentsScanner;
109
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
1110
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
1211
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
@@ -46,7 +45,7 @@ public AsyncApiDocket docket() {
4645
ProducerData kafkaProducerData = ProducerData.builder()
4746
.channelName("producer-topic")
4847
.payloadType(String.class)
49-
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
48+
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
5049
.build();
5150

5251
return AsyncApiDocket.builder()

springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/AbstractChannelScannerTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,14 @@ public void scan_componentHasListenerMethod() {
7979
.build();
8080

8181
Operation operation = Operation.builder()
82-
.bindings(ImmutableMap.of("test", new TestChannelScanner.TestBinding()))
82+
.bindings(ImmutableMap.of("test-operation-binding", new TestChannelScanner.TestOperationBinding()))
8383
.message(message)
8484
.build();
8585

86-
ChannelItem expectedChannel = ChannelItem.builder().publish(operation).build();
86+
ChannelItem expectedChannel = ChannelItem.builder()
87+
.bindings(ImmutableMap.of("test-channel-binding", new TestChannelScanner.TestChannelBinding()))
88+
.publish(operation)
89+
.build();
8790

8891
assertThat(actualChannels)
8992
.containsExactly(Maps.immutableEntry("test-channel", expectedChannel));

springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/ProducerChannelScannerTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
22

3+
import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
34
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
45
import com.asyncapi.v2.model.channel.ChannelItem;
56
import com.asyncapi.v2.model.channel.operation.Operation;
@@ -41,7 +42,8 @@ public void allFieldsProducerData() {
4142
String channelName = "example-producer-topic-foo1";
4243
ProducerData producerData = ProducerData.builder()
4344
.channelName(channelName)
44-
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
45+
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
46+
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
4547
.payloadType(ExamplePayloadDto.class)
4648
.build();
4749

@@ -64,6 +66,7 @@ public void allFieldsProducerData() {
6466
.build();
6567

6668
ChannelItem expectedChannel = ChannelItem.builder()
69+
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
6770
.subscribe(operation)
6871
.build();
6972

@@ -95,13 +98,15 @@ public void multipleProducersForSameTopic() {
9598

9699
ProducerData producerData1 = ProducerData.builder()
97100
.channelName(channelName)
98-
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
101+
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
102+
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
99103
.payloadType(ExamplePayloadDto.class)
100104
.build();
101105

102106
ProducerData producerData2 = ProducerData.builder()
103107
.channelName(channelName)
104-
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
108+
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
109+
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
105110
.payloadType(AnotherExamplePayloadDto.class)
106111
.build();
107112

@@ -134,6 +139,7 @@ public void multipleProducersForSameTopic() {
134139
.build();
135140

136141
ChannelItem expectedChannel = ChannelItem.builder()
142+
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
137143
.subscribe(operation)
138144
.build();
139145

springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/TestChannelScanner.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
22

3+
import com.asyncapi.v2.binding.ChannelBinding;
34
import com.asyncapi.v2.binding.OperationBinding;
45
import com.google.common.collect.ImmutableMap;
56
import lombok.EqualsAndHashCode;
@@ -19,9 +20,14 @@ protected String getChannelName(AbstractChannelScannerTest.TestChannelListener a
1920
return "test-channel";
2021
}
2122

23+
@Override
24+
protected Map<String, ? extends ChannelBinding> buildChannelBinding(AbstractChannelScannerTest.TestChannelListener annotation) {
25+
return ImmutableMap.of("test-channel-binding", new TestChannelBinding());
26+
}
27+
2228
@Override
2329
protected Map<String, ? extends OperationBinding> buildOperationBinding(AbstractChannelScannerTest.TestChannelListener annotation) {
24-
return ImmutableMap.of("test", new TestBinding());
30+
return ImmutableMap.of("test-operation-binding", new TestOperationBinding());
2531
}
2632

2733
@Override
@@ -34,9 +40,13 @@ protected Class<?> getPayloadType(Method method) {
3440
return parameterTypes[0];
3541
}
3642

43+
@EqualsAndHashCode(callSuper = true)
44+
public static class TestChannelBinding extends ChannelBinding {
45+
}
46+
3747

3848
@EqualsAndHashCode(callSuper = true)
39-
public static class TestBinding extends OperationBinding {
49+
public static class TestOperationBinding extends OperationBinding {
4050
}
4151

4252
}

springwolf-examples/springwolf-amqp-example/src/main/java/io/github/stavshamir/springwolf/example/configuration/AsyncApiConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package io.github.stavshamir.springwolf.example.configuration;
22

3+
import com.asyncapi.v2.binding.amqp.AMQPChannelBinding;
4+
import com.asyncapi.v2.binding.amqp.AMQPOperationBinding;
35
import com.asyncapi.v2.model.info.Info;
46
import com.asyncapi.v2.model.server.Server;
7+
import com.google.common.collect.ImmutableMap;
8+
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
59
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
610
import io.github.stavshamir.springwolf.configuration.EnableAsyncApi;
11+
import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto;
712
import org.springframework.beans.factory.annotation.Value;
813
import org.springframework.context.annotation.Bean;
914
import org.springframework.context.annotation.Configuration;
1015

16+
import java.util.Collections;
17+
1118
@Configuration
1219
@EnableAsyncApi
1320
public class AsyncApiConfiguration {
@@ -34,10 +41,25 @@ public AsyncApiDocket asyncApiDocket() {
3441
.url(String.format("%s:%s", amqpHost, amqpPort))
3542
.build();
3643

44+
AMQPChannelBinding.ExchangeProperties exchangeProperties = new AMQPChannelBinding.ExchangeProperties();
45+
exchangeProperties.setName("example-topic-exchange");
46+
ProducerData exampleProducer = ProducerData.builder()
47+
.channelName("example-producer-channel")
48+
.channelBinding(ImmutableMap.of("amqp", AMQPChannelBinding.builder()
49+
.is("routingKey")
50+
.exchange(exchangeProperties)
51+
.build()))
52+
.payloadType(AnotherPayloadDto.class)
53+
.operationBinding(ImmutableMap.of("amqp", AMQPOperationBinding.builder()
54+
.cc(Collections.singletonList("example-topic-routing-key"))
55+
.build()))
56+
.build();
57+
3758
return AsyncApiDocket.builder()
3859
.basePackage("io.github.stavshamir.springwolf.example.consumers")
3960
.info(info)
4061
.server("amqp", amqp)
62+
.producer(exampleProducer)
4163
.build();
4264
}
4365

0 commit comments

Comments
 (0)