Skip to content

Commit 521c255

Browse files
timonbacksam0r040Copilot
authored
feat(kafka): add kafka channel bindings (#1388)
* feat(kafka): add kafka channel bindings * Update springwolf-bindings/springwolf-kafka-binding/src/main/java/io/github/springwolf/bindings/kafka/scanners/channels/KafkaChannelBindingProcessor.java Co-authored-by: Copilot <[email protected]> * Update KafkaChannelBindingProcessor.java --------- Co-authored-by: sam0r040 <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 7bed7e9 commit 521c255

File tree

26 files changed

+597
-17
lines changed

26 files changed

+597
-17
lines changed

springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/kafka/KafkaChannelTopicConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,23 @@ public class KafkaChannelTopicConfiguration {
3434
*/
3535
@Min(value = -1, message = "retention.ms must be greater or equals to -1")
3636
@JsonProperty("retention.ms")
37-
private Integer retentionMs;
37+
private Long retentionMs;
3838

3939
/**
4040
* The <a href="https://kafka.apache.org/documentation/#topicconfigs_retention.bytes">retention.bytes</a>
4141
* configuration option.
4242
*/
4343
@Min(value = -1, message = "retention.bytes must be greater or equals to -1")
4444
@JsonProperty("retention.bytes")
45-
private Integer retentionBytes;
45+
private Long retentionBytes;
4646

4747
/**
4848
* The <a href="https://kafka.apache.org/documentation/#topicconfigs_delete.retention.ms">delete.retention.ms</a>
4949
* configuration option.
5050
*/
5151
@PositiveOrZero
5252
@JsonProperty("delete.retention.ms")
53-
private Integer deleteRetentionMs;
53+
private Long deleteRetentionMs;
5454

5555
/**
5656
* The <a href="https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes">max.message.bytes</a>

springwolf-asyncapi/src/test/java/io/github/springwolf/asyncapi/v3/bindings/kafka/KafkaBindingTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ void shouldSerializeKafkaChannel() throws IOException {
8787
.cleanupPolicy(List.of(
8888
KafkaChannelTopicCleanupPolicy.DELETE,
8989
KafkaChannelTopicCleanupPolicy.COMPACT))
90-
.retentionMs(604800000)
91-
.retentionBytes(1000000000)
92-
.deleteRetentionMs(86400000)
90+
.retentionMs(604800000L)
91+
.retentionBytes(1000000000L)
92+
.deleteRetentionMs(86400000L)
9393
.maxMessageBytes(1048588)
9494
.build())
9595
.build()))
@@ -111,9 +111,9 @@ void shouldSerializeKafkaTopic() throws IOException {
111111
KafkaChannelTopicConfiguration.builder()
112112
.cleanupPolicy(
113113
List.of(KafkaChannelTopicCleanupPolicy.DELETE, KafkaChannelTopicCleanupPolicy.COMPACT))
114-
.retentionMs(604800000)
115-
.retentionBytes(1000000000)
116-
.deleteRetentionMs(86400000)
114+
.retentionMs(604800000L)
115+
.retentionBytes(1000000000L)
116+
.deleteRetentionMs(86400000L)
117117
.maxMessageBytes(1048588)
118118
.confluentKeySchemaValidation(true)
119119
.confluentKeySubjectNameStrategy("TopicNameStrategy")

springwolf-bindings/springwolf-kafka-binding/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ dependencies {
1616
implementation libs.spring.core
1717
implementation libs.spring.boot.autoconfigure
1818

19+
implementation libs.commons.lang3
20+
1921
implementation libs.jakarta.annotation.api
2022

2123
compileOnly libs.lombok
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.springwolf.bindings.kafka.annotations;
3+
4+
import io.github.springwolf.core.asyncapi.annotations.AsyncChannelBinding;
5+
import io.github.springwolf.core.asyncapi.annotations.AsyncListener;
6+
import io.github.springwolf.core.asyncapi.annotations.AsyncPublisher;
7+
8+
import java.lang.annotation.ElementType;
9+
import java.lang.annotation.Inherited;
10+
import java.lang.annotation.Retention;
11+
import java.lang.annotation.RetentionPolicy;
12+
import java.lang.annotation.Target;
13+
14+
/**
15+
* {@code @KafkaAsyncChannelBinding} is a method-level annotation used in combination with {@link AsyncPublisher} or @{@link AsyncListener}.
16+
* It configures the channel binding for the Kafka protocol.
17+
*/
18+
@Retention(RetentionPolicy.RUNTIME)
19+
@Target(value = {ElementType.METHOD, ElementType.ANNOTATION_TYPE})
20+
@AsyncChannelBinding
21+
@Inherited
22+
public @interface KafkaAsyncChannelBinding {
23+
24+
String topic() default "";
25+
26+
int partitions() default VALUE_NOT_SET;
27+
28+
int replicas() default VALUE_NOT_SET;
29+
30+
KafkaChannelTopicConfiguration topicConfiguration() default @KafkaChannelTopicConfiguration();
31+
32+
@Retention(RetentionPolicy.CLASS)
33+
@Target({})
34+
@interface KafkaChannelTopicConfiguration {
35+
36+
CleanupPolicy[] cleanup() default {};
37+
38+
long retentionMs() default VALUE_NOT_SET;
39+
40+
long retentionBytes() default VALUE_NOT_SET;
41+
42+
long deleteRetentionMs() default VALUE_NOT_SET;
43+
44+
int maxMessageBytes() default VALUE_NOT_SET;
45+
46+
enum CleanupPolicy {
47+
COMPACT,
48+
DELETE,
49+
}
50+
}
51+
52+
int VALUE_NOT_SET = Integer.MIN_VALUE;
53+
}

springwolf-bindings/springwolf-kafka-binding/src/main/java/io/github/springwolf/bindings/kafka/configuration/SpringwolfKafkaBindingAutoConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package io.github.springwolf.bindings.kafka.configuration;
33

4+
import io.github.springwolf.bindings.kafka.scanners.channels.KafkaChannelBindingProcessor;
45
import io.github.springwolf.bindings.kafka.scanners.messages.KafkaMessageBindingProcessor;
56
import io.github.springwolf.bindings.kafka.scanners.operations.KafkaOperationBindingProcessor;
67
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingProcessorPriority;
@@ -21,6 +22,13 @@
2122
@StandaloneConfiguration
2223
public class SpringwolfKafkaBindingAutoConfiguration {
2324

25+
@Bean
26+
@Order(value = BindingProcessorPriority.PROTOCOL_BINDING)
27+
@ConditionalOnMissingBean
28+
public KafkaChannelBindingProcessor kafkaChannelBindingProcessor(StringValueResolver stringValueResolver) {
29+
return new KafkaChannelBindingProcessor(stringValueResolver);
30+
}
31+
2432
@Bean
2533
@Order(value = BindingProcessorPriority.PROTOCOL_BINDING)
2634
@ConditionalOnMissingBean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.springwolf.bindings.kafka.scanners.channels;
3+
4+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelBinding;
5+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicCleanupPolicy;
6+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicConfiguration;
7+
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncChannelBinding;
8+
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.AbstractChannelBindingProcessor;
9+
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ProcessedChannelBinding;
10+
import org.apache.commons.lang3.StringUtils;
11+
import org.springframework.util.StringValueResolver;
12+
13+
import java.util.Arrays;
14+
15+
public class KafkaChannelBindingProcessor extends AbstractChannelBindingProcessor<KafkaAsyncChannelBinding> {
16+
17+
private static final KafkaChannelTopicConfiguration EMPTY_TOPIC_CONFIGURATION =
18+
KafkaChannelTopicConfiguration.builder().build();
19+
20+
public KafkaChannelBindingProcessor(StringValueResolver stringValueResolver) {
21+
super(stringValueResolver);
22+
}
23+
24+
protected ProcessedChannelBinding mapToChannelBinding(KafkaAsyncChannelBinding bindingAnnotation) {
25+
KafkaChannelBinding.KafkaChannelBindingBuilder bindingBuilder = KafkaChannelBinding.builder();
26+
if (StringUtils.isNotBlank(bindingAnnotation.topic())) {
27+
bindingBuilder.topic(resolveOrNull(bindingAnnotation.topic()));
28+
}
29+
if (bindingAnnotation.partitions() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
30+
bindingBuilder.partitions(bindingAnnotation.partitions());
31+
}
32+
if (bindingAnnotation.replicas() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
33+
bindingBuilder.replicas(bindingAnnotation.replicas());
34+
}
35+
bindingBuilder.topicConfiguration(mapToTopicConfiguration(bindingAnnotation));
36+
37+
return new ProcessedChannelBinding("kafka", bindingBuilder.build());
38+
}
39+
40+
private KafkaChannelTopicConfiguration mapToTopicConfiguration(KafkaAsyncChannelBinding bindingAnnotation) {
41+
KafkaChannelTopicConfiguration.KafkaChannelTopicConfigurationBuilder topicConfiguration =
42+
KafkaChannelTopicConfiguration.builder();
43+
44+
if (bindingAnnotation.topicConfiguration().cleanup().length > 0) {
45+
topicConfiguration.cleanupPolicy(
46+
Arrays.stream(bindingAnnotation.topicConfiguration().cleanup())
47+
.map(this::toKafkaChannelTopicCleanupPolicy)
48+
.toList());
49+
}
50+
51+
if (bindingAnnotation.topicConfiguration().retentionMs() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
52+
topicConfiguration.retentionMs(
53+
bindingAnnotation.topicConfiguration().retentionMs());
54+
}
55+
if (bindingAnnotation.topicConfiguration().retentionBytes() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
56+
topicConfiguration.retentionBytes(
57+
bindingAnnotation.topicConfiguration().retentionBytes());
58+
}
59+
if (bindingAnnotation.topicConfiguration().deleteRetentionMs() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
60+
topicConfiguration.deleteRetentionMs(
61+
bindingAnnotation.topicConfiguration().deleteRetentionMs());
62+
}
63+
if (bindingAnnotation.topicConfiguration().maxMessageBytes() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
64+
topicConfiguration.maxMessageBytes(
65+
bindingAnnotation.topicConfiguration().maxMessageBytes());
66+
}
67+
68+
KafkaChannelTopicConfiguration buildTopicConfiguration = topicConfiguration.build();
69+
if (EMPTY_TOPIC_CONFIGURATION.equals(buildTopicConfiguration)) {
70+
return null;
71+
}
72+
return buildTopicConfiguration;
73+
}
74+
75+
private KafkaChannelTopicCleanupPolicy toKafkaChannelTopicCleanupPolicy(
76+
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy cleanupType) {
77+
return switch (cleanupType) {
78+
case COMPACT -> KafkaChannelTopicCleanupPolicy.COMPACT;
79+
case DELETE -> KafkaChannelTopicCleanupPolicy.DELETE;
80+
};
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.springwolf.bindings.kafka.scanners.channels;
3+
4+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelBinding;
5+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicCleanupPolicy;
6+
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicConfiguration;
7+
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncChannelBinding;
8+
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ProcessedChannelBinding;
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.util.StringValueResolver;
11+
12+
import java.lang.reflect.Method;
13+
import java.util.List;
14+
import java.util.Optional;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.mockito.Mockito.mock;
18+
import static org.mockito.Mockito.when;
19+
20+
class KafkaChannelBindingProcessorTest {
21+
private final StringValueResolver stringValueResolver = mock();
22+
private final KafkaChannelBindingProcessor processor = new KafkaChannelBindingProcessor(stringValueResolver);
23+
24+
@Test
25+
void processTest() throws NoSuchMethodException {
26+
Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithAnnotation");
27+
28+
ProcessedChannelBinding binding = processor.process(method).get();
29+
30+
assertThat(binding.getType()).isEqualTo("kafka");
31+
assertThat(binding.getBinding()).isEqualTo(new KafkaChannelBinding());
32+
}
33+
34+
@Test
35+
void processWithoutAnnotationTest() throws NoSuchMethodException {
36+
Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithoutAnnotation");
37+
38+
Optional<ProcessedChannelBinding> binding = processor.process(method);
39+
40+
assertThat(binding).isNotPresent();
41+
}
42+
43+
@Test
44+
void processTestWithFullConfiguration() throws NoSuchMethodException {
45+
when(stringValueResolver.resolveStringValue("test-topic")).thenReturn("resolved-test-topic");
46+
47+
Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithFullConfiguration");
48+
49+
ProcessedChannelBinding binding = processor.process(method).get();
50+
51+
assertThat(binding.getType()).isEqualTo("kafka");
52+
assertThat(binding.getBinding())
53+
.isEqualTo(KafkaChannelBinding.builder()
54+
.topic("resolved-test-topic")
55+
.partitions(3)
56+
.replicas(2)
57+
.topicConfiguration(KafkaChannelTopicConfiguration.builder()
58+
.cleanupPolicy(List.of(
59+
KafkaChannelTopicCleanupPolicy.COMPACT, KafkaChannelTopicCleanupPolicy.DELETE))
60+
.retentionMs(86400000L)
61+
.retentionBytes(-1L)
62+
.deleteRetentionMs(86400000L)
63+
.maxMessageBytes(1048588)
64+
.build())
65+
.build());
66+
}
67+
68+
@KafkaAsyncChannelBinding
69+
public void methodWithAnnotation() {}
70+
71+
public void methodWithoutAnnotation() {}
72+
73+
@KafkaAsyncChannelBinding(
74+
topic = "test-topic",
75+
partitions = 3,
76+
replicas = 2,
77+
topicConfiguration =
78+
@KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration(
79+
cleanup = {
80+
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy.COMPACT,
81+
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy.DELETE
82+
},
83+
retentionMs = 86400000L,
84+
retentionBytes = -1L,
85+
deleteRetentionMs = 86400000L,
86+
maxMessageBytes = 1048588))
87+
public void methodWithFullConfiguration() {}
88+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.springwolf.core.asyncapi.annotations;
3+
4+
import java.lang.annotation.ElementType;
5+
import java.lang.annotation.Inherited;
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.RetentionPolicy;
8+
import java.lang.annotation.Target;
9+
10+
/**
11+
* {@code @AsyncChannelBinding} is a meta-annotation used to identify Channel Binding annotations.
12+
* </p>
13+
* The annotations annotated with {@code @AsyncChannelBinding} are intended to provide the Channel Bindings
14+
* Object documentation. Those implementations are usually available in its own plugin, like {@code springwolf-kafka-plugin}
15+
*/
16+
@Retention(RetentionPolicy.RUNTIME)
17+
@Target(value = {ElementType.ANNOTATION_TYPE})
18+
@Inherited
19+
public @interface AsyncChannelBinding {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.springwolf.core.asyncapi.scanners.bindings.channels;
3+
4+
import io.github.springwolf.core.asyncapi.annotations.AsyncChannelBinding;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.util.StringUtils;
8+
import org.springframework.util.StringValueResolver;
9+
10+
import java.lang.annotation.Annotation;
11+
import java.lang.reflect.AnnotatedElement;
12+
import java.lang.reflect.ParameterizedType;
13+
import java.util.Arrays;
14+
import java.util.Optional;
15+
import java.util.stream.Stream;
16+
17+
@Slf4j
18+
@RequiredArgsConstructor
19+
public abstract class AbstractChannelBindingProcessor<A> implements ChannelBindingProcessor {
20+
21+
private final StringValueResolver stringValueResolver;
22+
23+
private final Class<A> specificAnnotationClazz =
24+
(Class<A>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
25+
26+
@Override
27+
public Optional<ProcessedChannelBinding> process(AnnotatedElement annotatedElement) {
28+
return Arrays.stream(annotatedElement.getAnnotations())
29+
.filter(annotation -> annotation.annotationType().isAnnotationPresent(AsyncChannelBinding.class))
30+
.flatMap(this::tryCast)
31+
.findAny()
32+
.map(this::mapToChannelBinding);
33+
}
34+
35+
/**
36+
* Attempt to cast the annotation to the specific annotation
37+
* <p>
38+
* Casting might fail, when multiple, different binding annotations are used,
39+
* which results in an (expected) exception.
40+
* <p>
41+
* If there is an option to previously test casting without casting, then lets change the code here.
42+
*/
43+
private Stream<A> tryCast(Annotation obj) {
44+
try {
45+
return Stream.of(specificAnnotationClazz.cast(obj));
46+
} catch (ClassCastException ex) {
47+
log.trace("Method has multiple bindings defined.", ex);
48+
}
49+
return Stream.empty();
50+
}
51+
52+
protected abstract ProcessedChannelBinding mapToChannelBinding(A bindingAnnotation);
53+
54+
protected String resolveOrNull(String stringValue) {
55+
return StringUtils.hasText(stringValue) ? stringValueResolver.resolveStringValue(stringValue) : null;
56+
}
57+
}

0 commit comments

Comments
 (0)