Skip to content

Commit b8d92a6

Browse files
committed
asyncapi-generator: kafka producer
1 parent 63062bb commit b8d92a6

File tree

37 files changed

+756
-295
lines changed

37 files changed

+756
-295
lines changed

plugins/asyncapi-generator/src/main/java/io/zenwave360/sdk/plugins/AsyncAPIGenerator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ public enum TransactionalOutboxType {
2828
none, modulith
2929
}
3030

31-
@DocumentedOption(description = "Programming style")
32-
public ProgrammingStyle style = ProgrammingStyle.imperative;
31+
// @DocumentedOption(description = "Programming style")
32+
public final ProgrammingStyle style = ProgrammingStyle.imperative;
3333

3434
@DocumentedOption(description = "Transactional outbox type for message producers.")
3535
public TransactionalOutboxType transactionalOutbox = TransactionalOutboxType.none;
3636

37-
@DocumentedOption(description = "Include ApplicationEvent listener for consuming messages within the modulith.")
38-
public boolean includeApplicationEventListener = false;
37+
// @DocumentedOption(description = "Include ApplicationEvent listener for consuming messages within the modulith.")
38+
// public boolean includeApplicationEventListener = false;
3939

4040
@DocumentedOption(description = "Generate only the producer interface and skip the implementation.")
4141
public boolean skipProducerImplementation = false;

plugins/asyncapi-generator/src/main/java/io/zenwave360/sdk/plugins/templates/AsyncAPIHandlebarsHelpers.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ public Object consumerName(Object context, Options options) {
4242
return String.format("%s%s%s", generator.consumerPrefix, context, generator.consumerSuffix);
4343
}
4444

45-
public Object messageType(Object operation, Options options) {
46-
List<String> messageTypes = JSONPath.get(operation, "$.x--messages[*].x--javaType");
47-
List<String> envelopTypes = JSONPath.get(operation, "$.x--messages[*]." + generator.envelopeJavaTypeExtensionName);
48-
String operationEnvelop = JSONPath.get(operation, "$." + generator.envelopeJavaTypeExtensionName);
49-
if(operationEnvelop != null) {
50-
envelopTypes.add(operationEnvelop);
45+
public Object messageType(List<Map<String, Object>> operations, Options options) {
46+
List<String> messageTypes = JSONPath.get(operations, "$[*].x--messages[*].x--javaType");
47+
List<String> envelopTypes = JSONPath.get(operations, "$[*].x--messages[*]." + generator.envelopeJavaTypeExtensionName);
48+
List<String> operationEnvelop = JSONPath.get(operations, "$[*]." + generator.envelopeJavaTypeExtensionName);
49+
if(operationEnvelop != null && !operationEnvelop.isEmpty()) {
50+
envelopTypes.addAll(operationEnvelop);
5151
}
5252
if(generator.useEnterpriseEnvelope && !envelopTypes.isEmpty()) {
5353
return envelopTypes.size() == 1 ? envelopTypes.get(0) : "Object";
@@ -56,10 +56,14 @@ public Object messageType(Object operation, Options options) {
5656
}
5757

5858
public Object hasEnterpriseEnvelope(Object operation, Options options) {
59-
List<String> envelopTypes = JSONPath.get(operation, "$.x--messages[*]." + generator.envelopeJavaTypeExtensionName);
60-
String operationEnvelop = JSONPath.get(operation, "$." + generator.envelopeJavaTypeExtensionName);
61-
if(operationEnvelop != null) {
62-
envelopTypes.add(operationEnvelop);
59+
var arrayPrefix = operation instanceof List? "$[*]." : "$.";
60+
List<String> envelopTypes = JSONPath.get(operation, arrayPrefix + "x--messages[*]." + generator.envelopeJavaTypeExtensionName);
61+
Object operationEnvelop = JSONPath.get(operation, arrayPrefix + generator.envelopeJavaTypeExtensionName);
62+
if(operationEnvelop instanceof String) {
63+
envelopTypes.add((String) operationEnvelop);
64+
}
65+
if(operationEnvelop instanceof List) {
66+
envelopTypes.addAll((List) operationEnvelop);
6367
}
6468
return generator.useEnterpriseEnvelope && !envelopTypes.isEmpty();
6569
}

plugins/asyncapi-generator/src/main/java/io/zenwave360/sdk/plugins/templates/SpringCloudStreamTemplates.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public SpringCloudStreamTemplates(AsyncAPIGenerator generator) {
2222
}
2323
addTemplate(producerByServiceTemplates, "shared/producer/mocks/InMemoryEventsProducer.java", "src/test/java/{{asPackageFolder producerApiPackage}}/{{producerInMemoryName serviceName operationRoleType}}.java");
2424

25-
addTemplate(consumerByOperationTemplates, "scs/consumer/{{style}}/Consumer.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerName operation.x--operationIdCamelCase}}.java");
2625
addTemplate(consumerByOperationTemplates, "shared/consumer/{{style}}/IService.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerServiceInterfaceName operation.x--operationIdCamelCase}}.java");
27-
26+
addTemplate(consumerByChannelTemplates, "scs/consumer/{{style}}/Consumer.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerName (camelCase channelName)}}.java");
2827
}
2928
}

plugins/asyncapi-generator/src/main/java/io/zenwave360/sdk/plugins/templates/SpringKafkaTemplates.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public SpringKafkaTemplates(AsyncAPIGenerator generator) {
2222
}
2323
addTemplate(producerByServiceTemplates, "shared/producer/mocks/InMemoryEventsProducer.java", "src/test/java/{{asPackageFolder producerApiPackage}}/{{producerInMemoryName serviceName operationRoleType}}.java");
2424

25-
addTemplate(consumerByOperationTemplates, "kafka/consumer/{{style}}/Consumer.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerName operation.x--operationIdCamelCase}}.java");
26-
addTemplate(consumerByOperationTemplates, "shared/consumer/{{style}}/IService.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerServiceInterfaceName operation.x--operationIdCamelCase}}.java");
27-
25+
addTemplate(consumerByChannelTemplates, "shared/consumer/{{style}}/IServiceByChannel.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerServiceInterfaceName (camelCase channelName)}}.java");
26+
addTemplate(consumerByChannelTemplates, "kafka/consumer/{{style}}/Consumer.java", "src/main/java/{{asPackageFolder consumerApiPackage}}/{{consumerName (camelCase channelName)}}.java");
2827
}
2928
}

plugins/asyncapi-generator/src/main/resources/io/zenwave360/sdk/plugins/AsyncAPIGenerator/kafka/consumer/imperative/Consumer.java.hbs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,23 @@ import org.springframework.stereotype.Component;
2323
import {{modelPackage}}.*;
2424
{{/if}}
2525

26-
{{#assign "messageCount"}}{{size operation.x--messages}}{{/assign~}}
27-
28-
@Component("{{bindingPrefix}}{{kebabCase operation.operationId}}")
26+
@Component("{{bindingPrefix}}{{kebabCase channelName}}")
2927
@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.AsyncAPIGeneratorPlugin", date = "{{date}}")
3028
{{~#if generatedAnnotationClass}}@{{generatedAnnotationClass}}{{~/if}}
31-
public class {{consumerName operation.x--operationIdCamelCase}} {
29+
public class {{consumerName (camelCase channelName)}} {
3230

3331
protected Logger log = LoggerFactory.getLogger(getClass());
3432

35-
protected {{consumerServiceInterfaceName operation.x--operationIdCamelCase}} service;
33+
protected {{consumerServiceInterfaceName (camelCase channelName)}} service;
3634
protected KafkaTemplate<String, Object> kafkaTemplate;
3735

38-
@Value("#{${app.kafka.topics.{{bindingPrefix}}{{operation.operationId}}.dead-letter-queue-error-map:{:}}}")
36+
@Value("#{${app.kafka.topics.{{bindingPrefix}}{{kebabCase channelName}}.dead-letter-queue-error-map:{:}}}")
3937
protected Map<Class<? extends Exception>, String> errorQueueMap;
4038
{{~#if useEnterpriseEnvelope}}
4139
public EnvelopeUnWrapper envelopeUnWrapper;
4240
{{~/if}}
4341

44-
public {{consumerName operation.x--operationIdCamelCase}}({{consumerServiceInterfaceName operation.x--operationIdCamelCase}} service, @Autowired(required=false) KafkaTemplate<String, Object> kafkaTemplate) {
42+
public {{consumerName (camelCase channelName)}}({{consumerServiceInterfaceName (camelCase channelName)}} service, @Autowired(required=false) KafkaTemplate<String, Object> kafkaTemplate) {
4543
this.service = service;
4644
this.kafkaTemplate = kafkaTemplate;
4745
}
@@ -58,34 +56,39 @@ public class {{consumerName operation.x--operationIdCamelCase}} {
5856

5957
{{~#if includeApplicationEventListener}}
6058
@org.springframework.modulith.events.ApplicationModuleListener
61-
public void on(Message<{{messageType operation}}> message) {
59+
public void on(Message<{{messageType operations}}> message) {
6260
accept(message);
6361
}
6462
{{/if}}
6563

6664
@KafkaListener(
67-
topics = "${app.kafka.topics.{{bindingPrefix}}{{operation.operationId}}.topic}",
68-
groupId = "${app.kafka.topics.{{bindingPrefix}}{{operation.operationId}}.groupId:}",
69-
containerFactory = "${app.kafka.topics.{{bindingPrefix}}{{operation.operationId}}.containerFactory:}")
70-
public void listen(@Payload {{messageType operation}} payload,
65+
topics = "${app.kafka.topics.{{bindingPrefix}}{{kebabCase channelName}}.topic}",
66+
groupId = "${app.kafka.topics.{{bindingPrefix}}{{kebabCase channelName}}.groupId:}",
67+
containerFactory = "${app.kafka.topics.{{bindingPrefix}}{{kebabCase channelName}}.containerFactory:}")
68+
public void listen(@Payload {{messageType operations}} payload,
7169
@Header Map<String, Object> headers,
7270
Acknowledgment acknowledgment) {
7371
log.debug("Received message: {}", payload);
7472
try {
75-
Object unwrappedPayload = {{#if (hasEnterpriseEnvelope operation)}}unwrap(payload){{else}}payload{{/if}};
73+
Object unwrappedPayload = {{#if (hasEnterpriseEnvelope operations)}}unwrap(payload){{else}}payload{{/if}};
74+
{{~initVisited 'messageType'}}
75+
{{~#each operations as |operation|}}
7676
{{~#each operation.x--messages as |message|}}
77+
{{~#unless (isVisited 'messageType' message.x--javaType register=true)}}
7778
if(unwrappedPayload instanceof {{message.x--javaType}}) {
7879
{{~#if exposeMessage}}
7980
service.{{operation.operationId}}{{methodSuffix message operation}}(MessageBuilder.createMessage(({{message.x--javaType}}) unwrappedPayload, new MessageHeaders(headers)));
8081
{{~else}}
81-
var serviceHeaders = new {{consumerServiceInterfaceName operation.x--operationIdCamelCase}}.{{message.x--javaTypeSimpleName}}Headers();
82+
var serviceHeaders = new {{consumerServiceInterfaceName (camelCase channelName)}}.{{message.x--javaTypeSimpleName}}Headers();
8283
serviceHeaders.putAll(headers);
8384
service.{{operation.operationId}}{{methodSuffix message operation}}(({{message.x--javaType}}) unwrappedPayload, serviceHeaders);
8485
{{~/if}}
8586
acknowledgment.acknowledge();
8687
return;
8788
}
89+
{{~/unless}}
8890
{{~/each}}
91+
{{~/each}}
8992
log.warn("Received message without any business handler: [payload: {}]", unwrappedPayload.getClass().getName());
9093
{{~#if exposeMessage}}
9194
service.defaultHandler(MessageBuilder.createMessage(unwrappedPayload, new MessageHeaders(headers)));

plugins/asyncapi-generator/src/main/resources/io/zenwave360/sdk/plugins/AsyncAPIGenerator/kafka/consumer/reactive/Consumer.java.hbs

Lines changed: 0 additions & 54 deletions
This file was deleted.

plugins/asyncapi-generator/src/main/resources/io/zenwave360/sdk/plugins/AsyncAPIGenerator/scs/consumer/imperative/Consumer.java.hbs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,26 @@ import org.springframework.stereotype.Component;
2121
import {{modelPackage}}.*;
2222
{{/if}}
2323

24-
{{#assign "messageCount"}}{{size operation.x--messages}}{{/assign~}}
25-
26-
@Component("{{bindingPrefix}}{{kebabCase operation.operationId}}")
24+
@Component("{{bindingPrefix}}{{kebabCase channelName}}")
2725
@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.AsyncAPIGeneratorPlugin", date = "{{date}}")
2826
{{~#if generatedAnnotationClass}}@{{generatedAnnotationClass}}{{~/if}}
29-
public class {{consumerName operation.x--operationIdCamelCase}} implements Consumer<Message<{{messageType operation}}>> {
27+
public class {{consumerName (camelCase channelName)}} implements Consumer<Message<{{messageType operations}}>> {
3028

3129
protected Logger log = LoggerFactory.getLogger(getClass());
3230

33-
protected {{consumerServiceInterfaceName operation.x--operationIdCamelCase}} service;
31+
protected {{consumerServiceInterfaceName (camelCase channelName)}} service;
3432
protected StreamBridge streamBridge;
3533
protected Map<Class<? extends Exception>, String> errorQueueMap;
3634
{{~#if useEnterpriseEnvelope}}
3735
public EnvelopeUnWrapper envelopeUnWrapper;
3836
{{~/if}}
3937

40-
public {{consumerName operation.x--operationIdCamelCase}}({{consumerServiceInterfaceName operation.x--operationIdCamelCase}} service, @Autowired(required=false) StreamBridge streamBridge) {
38+
public {{consumerName (camelCase channelName)}}({{consumerServiceInterfaceName (camelCase channelName)}} service, @Autowired(required=false) StreamBridge streamBridge) {
4139
this.service = service;
4240
this.streamBridge = streamBridge;
4341
}
4442

45-
@Value("#{${spring.cloud.stream.bindings.{{bindingPrefix}}{{kebabCase operation.operationId}}-in-0.dead-letter-queue-error-map:{:}}}")
43+
@Value("#{${spring.cloud.stream.bindings.{{bindingPrefix}}{{kebabCase channelName}}-in-0.dead-letter-queue-error-map:{:}}}")
4644
public void setErrorQueueMap(Map<Class<? extends Exception>, String> errorQueueMap) {
4745
this.errorQueueMap = errorQueueMap;
4846
}
@@ -55,29 +53,34 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu
5553

5654
{{~#if includeApplicationEventListener}}
5755
@org.springframework.modulith.events.ApplicationModuleListener
58-
public void on(Message<{{messageType operation}}> message) {
56+
public void on(Message<{{messageType operations}}> message) {
5957
accept(message);
6058
}
6159
{{/if}}
6260

6361
@Override
64-
public void accept(Message<{{messageType operation}}> message) {
62+
public void accept(Message<{{messageType operations}}> message) {
6563
log.debug("Received message: {}", message);
6664
AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message);
6765
try {
68-
Object payload = {{#if (hasEnterpriseEnvelope operation)}}unwrap(message.getPayload()){{else}}message.getPayload(){{/if}};
66+
Object payload = {{#if (hasEnterpriseEnvelope operations)}}unwrap(message.getPayload()){{else}}message.getPayload(){{/if}};
67+
{{~initVisited 'messageType'}}
68+
{{~#each operations as |operation|}}
6969
{{~#each operation.x--messages as |message|}}
70+
{{~#unless (isVisited 'messageType' message.x--javaType register=true)}}
7071
if(payload instanceof {{message.x--javaType}}) {
7172
{{~#if exposeMessage}}
7273
service.{{operation.operationId}}{{methodSuffix message operation}}(MessageBuilder.createMessage(({{message.x--javaType}}) payload, message.getHeaders()));
7374
{{~else}}
74-
var headers = new {{consumerServiceInterfaceName operation.x--operationIdCamelCase}}.{{message.x--javaTypeSimpleName}}Headers();
75+
var headers = new {{consumerServiceInterfaceName (camelCase channelName)}}.{{message.x--javaTypeSimpleName}}Headers();
7576
headers.putAll(message.getHeaders());
7677
service.{{operation.operationId}}{{methodSuffix message operation}}(({{message.x--javaType}}) payload, headers);
7778
{{~/if}}
7879
return;
7980
}
81+
{{~/unless}}
8082
{{~/each}}
83+
{{~/each}}
8184
log.warn("Received message without any business handler: [payload: {}, message: {}]", payload.getClass().getName(), message);
8285
{{~#if exposeMessage}}
8386
service.defaultHandler(MessageBuilder.createMessage(payload, message.getHeaders()));

plugins/asyncapi-generator/src/main/resources/io/zenwave360/sdk/plugins/AsyncAPIGenerator/scs/consumer/reactive/Consumer.java.hbs

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)