Skip to content

Commit dc150ff

Browse files
committed
Rename KafkaChannelsScanner to MethodLevelKafkaListenerScanner
1 parent c73d277 commit dc150ff

File tree

2 files changed

+119
-9
lines changed

2 files changed

+119
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;
2+
3+
import com.asyncapi.v2.binding.OperationBinding;
4+
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
5+
import com.google.common.collect.ImmutableMap;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.context.EmbeddedValueResolverAware;
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.messaging.handler.annotation.Payload;
11+
import org.springframework.stereotype.Service;
12+
import org.springframework.util.StringValueResolver;
13+
14+
import java.lang.annotation.Annotation;
15+
import java.lang.reflect.Method;
16+
import java.util.Arrays;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
import static java.util.stream.Collectors.toList;
21+
22+
@Slf4j
23+
@Service
24+
@RequiredArgsConstructor
25+
public class MethodLevelKafkaListenerScanner extends AbstractChannelScanner<KafkaListener>
26+
implements ChannelsScanner, EmbeddedValueResolverAware {
27+
28+
private StringValueResolver resolver;
29+
30+
@Override
31+
public void setEmbeddedValueResolver(StringValueResolver resolver) {
32+
this.resolver = resolver;
33+
}
34+
35+
@Override
36+
protected Class<KafkaListener> getListenerAnnotationClass() {
37+
return KafkaListener.class;
38+
}
39+
40+
@Override
41+
protected String getChannelName(KafkaListener annotation) {
42+
List<String> resolvedTopics = Arrays.stream(annotation.topics())
43+
.map(resolver::resolveStringValue)
44+
.collect(toList());
45+
46+
log.debug("Found topics: {}", String.join(", ", resolvedTopics));
47+
return resolvedTopics.get(0);
48+
}
49+
50+
@Override
51+
protected Map<String, ? extends OperationBinding> buildOperationBinding(KafkaListener annotation) {
52+
String groupId = resolver.resolveStringValue(annotation.groupId());
53+
if (groupId == null || groupId.isEmpty()) {
54+
log.debug("No group ID found for this listener");
55+
groupId = null;
56+
} else {
57+
log.debug("Found group id: {}", groupId);
58+
}
59+
60+
KafkaOperationBinding binding = new KafkaOperationBinding();
61+
binding.setGroupId(groupId);
62+
return ImmutableMap.of("kafka", binding);
63+
64+
}
65+
66+
@Override
67+
protected Class<?> getPayloadType(Method method) {
68+
String methodName = String.format("%s::%s", method.getDeclaringClass().getSimpleName(), method.getName());
69+
log.debug("Finding payload type for {}", methodName);
70+
71+
Class<?>[] parameterTypes = method.getParameterTypes();
72+
switch (parameterTypes.length) {
73+
case 0:
74+
throw new IllegalArgumentException("Listener methods must not have 0 parameters: " + methodName);
75+
case 1:
76+
return parameterTypes[0];
77+
default:
78+
return getPayloadType(parameterTypes, method.getParameterAnnotations(), methodName);
79+
}
80+
}
81+
82+
private Class<?> getPayloadType(Class<?>[] parameterTypes, Annotation[][] parameterAnnotations, String methodName) {
83+
int payloadAnnotatedParameterIndex = getPayloadAnnotatedParameterIndex(parameterAnnotations);
84+
85+
if (payloadAnnotatedParameterIndex == -1) {
86+
String msg = "Multi-parameter KafkaListener methods must have one parameter annotated with @Payload, "
87+
+ "but none was found: "
88+
+ methodName;
89+
90+
throw new IllegalArgumentException(msg);
91+
}
92+
93+
return parameterTypes[payloadAnnotatedParameterIndex];
94+
}
95+
96+
private int getPayloadAnnotatedParameterIndex(Annotation[][] parameterAnnotations) {
97+
for (int i = 0, length = parameterAnnotations.length; i < length; i++) {
98+
Annotation[] annotations = parameterAnnotations[i];
99+
boolean hasPayloadAnnotation = Arrays.stream(annotations)
100+
.anyMatch(annotation -> annotation instanceof Payload);
101+
102+
if (hasPayloadAnnotation) {
103+
return i;
104+
}
105+
}
106+
107+
return -1;
108+
}
109+
110+
}

springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/KafkaChannelsScannerTest.java renamed to springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/MethodLevelKafkaListenerScannerTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
import static org.mockito.Mockito.when;
3636

3737
@RunWith(SpringRunner.class)
38-
@ContextConfiguration(classes = {KafkaChannelsScanner.class, DefaultSchemasService.class})
38+
@ContextConfiguration(classes = {MethodLevelKafkaListenerScanner.class, DefaultSchemasService.class})
3939
@TestPropertySource(properties = "kafka.topics.test=test-topic")
40-
public class KafkaChannelsScannerTest {
40+
public class MethodLevelKafkaListenerScannerTest {
4141

4242
@Autowired
43-
private KafkaChannelsScanner kafkaChannelsScanner;
43+
private MethodLevelKafkaListenerScanner methodLevelKafkaListenerScanner;
4444

4545
@MockBean
4646
private ComponentsScanner componentsScanner;
@@ -68,7 +68,7 @@ private void setClassToScan(Class<?> classToScan) {
6868
public void scan_componentHasNoKafkaListenerMethods() {
6969
setClassToScan(ClassWithoutKafkaListenerAnnotations.class);
7070

71-
Map<String, ChannelItem> channels = kafkaChannelsScanner.scan();
71+
Map<String, ChannelItem> channels = methodLevelKafkaListenerScanner.scan();
7272

7373
assertThat(channels)
7474
.isEmpty();
@@ -80,7 +80,7 @@ public void scan_componentHasKafkaListenerMethods_hardCodedTopic() {
8080
setClassToScan(ClassWithKafkaListenerAnnotationHardCodedTopic.class);
8181

8282
// When scan is called
83-
Map<String, ChannelItem> actualChannels = kafkaChannelsScanner.scan();
83+
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();
8484

8585
// Then the returned collection contains the channel
8686
Message message = Message.builder()
@@ -106,7 +106,7 @@ public void scan_componentHasKafkaListenerMethods_embeddedValueTopic() {
106106
setClassToScan(ClassWithKafkaListenerAnnotationsEmbeddedValueTopic.class);
107107

108108
// When scan is called
109-
Map<String, ChannelItem> actualChannels = kafkaChannelsScanner.scan();
109+
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();
110110

111111
// Then the returned collection contains the channel
112112
Message message = Message.builder()
@@ -132,7 +132,7 @@ public void scan_componentHasKafkaListenerMethods_withGroupId() {
132132
setClassToScan(ClassWithKafkaListenerAnnotationWithGroupId.class);
133133

134134
// When scan is called
135-
Map<String, ChannelItem> actualChannels = kafkaChannelsScanner.scan();
135+
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();
136136

137137
// Then the returned collection contains a correct binding
138138
Map<String, ? extends OperationBinding> actualBindings = actualChannels.get(TOPIC)
@@ -154,7 +154,7 @@ public void scan_componentHasKafkaListenerMethods_multipleParamsWithoutPayloadAn
154154
setClassToScan(ClassWithKafkaListenerAnnotationMultipleParamsWithoutPayloadAnnotation.class);
155155

156156
// Then an exception is thrown when scan is called
157-
assertThatThrownBy(() -> kafkaChannelsScanner.scan())
157+
assertThatThrownBy(() -> methodLevelKafkaListenerScanner.scan())
158158
.isInstanceOf(IllegalArgumentException.class);
159159
}
160160

@@ -167,7 +167,7 @@ public void scan_componentHasKafkaListenerMethods_multipleParamsWithPayloadAnnot
167167
setClassToScan(ClassWithKafkaListenerAnnotationMultipleParamsWithPayloadAnnotation.class);
168168

169169
// When scan is called
170-
Map<String, ChannelItem> actualChannels = kafkaChannelsScanner.scan();
170+
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();
171171

172172
// Then the returned collection contains the channel, and the payload is of the parameter annotated with @Payload
173173
Message message = Message.builder()

0 commit comments

Comments
 (0)