Skip to content

Commit 5de2031

Browse files
committed
Add ClassLevelKafkaListenerScanner
1 parent dc150ff commit 5de2031

File tree

4 files changed

+400
-110
lines changed

4 files changed

+400
-110
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.stavshamir.springwolf.asyncapi;
2+
3+
public class Constants {
4+
public static final String ONE_OF = "oneOf";
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
2+
3+
import com.asyncapi.v2.binding.OperationBinding;
4+
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
5+
import com.asyncapi.v2.model.channel.ChannelItem;
6+
import com.asyncapi.v2.model.channel.operation.Operation;
7+
import com.google.common.collect.ImmutableMap;
8+
import com.google.common.collect.Maps;
9+
import io.github.stavshamir.springwolf.asyncapi.scanners.components.ComponentsScanner;
10+
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
11+
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
12+
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
13+
import io.github.stavshamir.springwolf.schemas.SchemasService;
14+
import lombok.RequiredArgsConstructor;
15+
import lombok.extern.slf4j.Slf4j;
16+
import org.springframework.beans.factory.annotation.Autowired;
17+
import org.springframework.context.EmbeddedValueResolverAware;
18+
import org.springframework.kafka.annotation.KafkaHandler;
19+
import org.springframework.kafka.annotation.KafkaListener;
20+
import org.springframework.messaging.handler.annotation.Payload;
21+
import org.springframework.stereotype.Service;
22+
import org.springframework.util.StringValueResolver;
23+
24+
import java.lang.annotation.Annotation;
25+
import java.lang.reflect.Method;
26+
import java.util.*;
27+
28+
import static io.github.stavshamir.springwolf.asyncapi.Constants.ONE_OF;
29+
import static java.util.stream.Collectors.*;
30+
31+
@Slf4j
32+
@Service
33+
@RequiredArgsConstructor
34+
public class ClassLevelKafkaListenerScanner
35+
implements ChannelsScanner, EmbeddedValueResolverAware {
36+
37+
38+
private StringValueResolver resolver;
39+
40+
@Autowired
41+
private AsyncApiDocket docket;
42+
43+
@Autowired
44+
private ComponentsScanner componentsScanner;
45+
46+
@Autowired
47+
private SchemasService schemasService;
48+
49+
@Override
50+
public void setEmbeddedValueResolver(StringValueResolver resolver) {
51+
this.resolver = resolver;
52+
}
53+
54+
public Map<String, ChannelItem> scan() {
55+
return componentsScanner.scanForComponents(docket.getBasePackage()).stream()
56+
.filter(this::isAnnotatedWithKafkaListener)
57+
.map(this::mapClassToChannel)
58+
.filter(Optional::isPresent).map(Optional::get)
59+
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
60+
}
61+
62+
private boolean isAnnotatedWithKafkaListener(Class<?> component) {
63+
return component.isAnnotationPresent(KafkaListener.class);
64+
}
65+
66+
private Optional<Map.Entry<String, ChannelItem>> mapClassToChannel(Class<?> component) {
67+
log.debug("Mapping class \"{}\" to channel", component.getName());
68+
69+
KafkaListener annotation = component.getAnnotation(KafkaListener.class);
70+
String channelName = getChannelName(annotation);
71+
Map<String, ? extends OperationBinding> operationBinding = buildOperationBinding(annotation);
72+
Set<Method> annotatedMethods = getAnnotatedMethods(component);
73+
74+
if (annotatedMethods.isEmpty()) {
75+
return Optional.empty();
76+
}
77+
78+
ChannelItem channelItem = buildChannel(annotatedMethods, operationBinding);
79+
return Optional.of(Maps.immutableEntry(channelName, channelItem));
80+
}
81+
82+
protected String getChannelName(KafkaListener annotation) {
83+
List<String> resolvedTopics = Arrays.stream(annotation.topics())
84+
.map(resolver::resolveStringValue)
85+
.collect(toList());
86+
87+
log.debug("Found topics: {}", String.join(", ", resolvedTopics));
88+
return resolvedTopics.get(0);
89+
}
90+
91+
protected Map<String, ? extends OperationBinding> buildOperationBinding(KafkaListener annotation) {
92+
String groupId = resolver.resolveStringValue(annotation.groupId());
93+
if (groupId == null || groupId.isEmpty()) {
94+
log.debug("No group ID found for this listener");
95+
groupId = null;
96+
} else {
97+
log.debug("Found group id: {}", groupId);
98+
}
99+
100+
KafkaOperationBinding binding = new KafkaOperationBinding();
101+
binding.setGroupId(groupId);
102+
return ImmutableMap.of("kafka", binding);
103+
104+
}
105+
106+
protected Class<?> getPayloadType(Method method) {
107+
String methodName = String.format("%s::%s", method.getDeclaringClass().getSimpleName(), method.getName());
108+
log.debug("Finding payload type for {}", methodName);
109+
110+
Class<?>[] parameterTypes = method.getParameterTypes();
111+
switch (parameterTypes.length) {
112+
case 0:
113+
throw new IllegalArgumentException("Listener methods must not have 0 parameters: " + methodName);
114+
case 1:
115+
return parameterTypes[0];
116+
default:
117+
return getPayloadType(parameterTypes, method.getParameterAnnotations(), methodName);
118+
}
119+
}
120+
121+
private Class<?> getPayloadType(Class<?>[] parameterTypes, Annotation[][] parameterAnnotations, String methodName) {
122+
int payloadAnnotatedParameterIndex = getPayloadAnnotatedParameterIndex(parameterAnnotations);
123+
124+
if (payloadAnnotatedParameterIndex == -1) {
125+
String msg = "Multi-parameter methods must have one parameter annotated with @Payload, "
126+
+ "but none was found: "
127+
+ methodName;
128+
129+
throw new IllegalArgumentException(msg);
130+
}
131+
132+
return parameterTypes[payloadAnnotatedParameterIndex];
133+
}
134+
135+
private int getPayloadAnnotatedParameterIndex(Annotation[][] parameterAnnotations) {
136+
for (int i = 0, length = parameterAnnotations.length; i < length; i++) {
137+
Annotation[] annotations = parameterAnnotations[i];
138+
boolean hasPayloadAnnotation = Arrays.stream(annotations)
139+
.anyMatch(annotation -> annotation instanceof Payload);
140+
141+
if (hasPayloadAnnotation) {
142+
return i;
143+
}
144+
}
145+
146+
return -1;
147+
}
148+
149+
private Set<Method> getAnnotatedMethods(Class<?> component) {
150+
Class<KafkaHandler> annotationClass = KafkaHandler.class;
151+
log.debug("Scanning class \"{}\" for @\"{}\" annotated methods", component.getName(), annotationClass.getName());
152+
153+
return Arrays.stream(component.getDeclaredMethods())
154+
.filter(method -> method.isAnnotationPresent(annotationClass))
155+
.collect(toSet());
156+
}
157+
158+
private ChannelItem buildChannel(Set<Method> methods, Map<String, ? extends OperationBinding> operationBinding) {
159+
Operation operation = Operation.builder()
160+
.message(getMessageObject(methods))
161+
.bindings(operationBinding)
162+
.build();
163+
164+
return ChannelItem.builder()
165+
.publish(operation)
166+
.build();
167+
}
168+
169+
private Object getMessageObject(Set<Method> methods) {
170+
Set<Message> messages = methods.stream()
171+
.map(this::buildMessage)
172+
.collect(toSet());
173+
174+
return methods.size() == 1
175+
? messages.toArray()[0]
176+
: ImmutableMap.of(ONE_OF, messages);
177+
}
178+
179+
private Message buildMessage(Method method) {
180+
Class<?> payloadType = getPayloadType(method);
181+
String modelName = schemasService.register(payloadType);
182+
183+
return Message.builder()
184+
.name(payloadType.getName())
185+
.title(modelName)
186+
.payload(PayloadReference.fromModelName(modelName))
187+
.build();
188+
}
189+
190+
}

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/KafkaChannelsScanner.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

0 commit comments

Comments
 (0)