diff --git a/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaAdditionalConfig.java b/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaAdditionalConfig.java new file mode 100644 index 0000000..1f344a8 --- /dev/null +++ b/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaAdditionalConfig.java @@ -0,0 +1,32 @@ +/** + * Copyright 2017 Red Hat, Inc, and individual contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.aerogear.kafka.cdi.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * Additional configuration annotation. Specifcy custom configuration for {@link KafkaConfig}. + */ +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface KafkaAdditionalConfig { + String key(); + String value(); +} diff --git a/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaConfig.java b/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaConfig.java index 5ed96b1..8ab7933 100644 --- a/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaConfig.java +++ b/src/main/java/org/aerogear/kafka/cdi/annotation/KafkaConfig.java @@ -31,4 +31,6 @@ @Documented public @interface KafkaConfig { String bootstrapServers(); + + KafkaAdditionalConfig[] additionalConfig() default {}; } diff --git a/src/main/java/org/aerogear/kafka/cdi/extension/KafkaExtension.java b/src/main/java/org/aerogear/kafka/cdi/extension/KafkaExtension.java index be32541..93f95b0 100644 --- a/src/main/java/org/aerogear/kafka/cdi/extension/KafkaExtension.java +++ b/src/main/java/org/aerogear/kafka/cdi/extension/KafkaExtension.java @@ -64,6 +64,7 @@ public class KafkaExtension implements Extension { private String bootstrapServers = null; + private Properties additionalConfig = null; private final Set> listenerMethods = newSetFromMap(new ConcurrentHashMap<>()); private final Set> streamProcessorMethods = newSetFromMap(new ConcurrentHashMap<>()); private final Set managedConsumers = newSetFromMap(new ConcurrentHashMap<>()); @@ -81,6 +82,11 @@ public void kafkaConfig(@Observes @WithAnnotations(KafkaConfig.class) ProcessAnn if (kafkaConfig != null && bootstrapServers == null) { logger.info("setting bootstrap.servers IP for, {}", kafkaConfig.bootstrapServers()); bootstrapServers = VerySimpleEnvironmentResolver.simpleBootstrapServerResolver(kafkaConfig.bootstrapServers()); + + additionalConfig = new Properties(); + Arrays.stream(kafkaConfig.additionalConfig()).forEach(kafkaAdditionalConfig -> { + additionalConfig.put(kafkaAdditionalConfig.key(), kafkaAdditionalConfig.value()); + }); } } @@ -120,7 +126,7 @@ public void afterDeploymentValidation(@Observes AfterDeploymentValidation adv, f final DelegationKafkaConsumer frameworkConsumer = (DelegationKafkaConsumer) bm.getReference(bean, DelegationKafkaConsumer.class, ctx); // hooking it all together - frameworkConsumer.initialize(bootstrapServers, consumerMethod, bm); + frameworkConsumer.initialize(bootstrapServers, consumerMethod, bm, additionalConfig); managedConsumers.add(frameworkConsumer); submitToExecutor(frameworkConsumer); @@ -237,12 +243,14 @@ private void submitToExecutor(final DelegationKafkaConsumer delegationKafkaConsu } private org.apache.kafka.clients.producer.Producer createInjectionProducer(final String bootstrapServers, final Class keySerializerClass, final Class valSerializerClass, final Serializer keySerializer, final Serializer valSerializer ) { - final Properties properties = new Properties(); + properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass); properties.put(VALUE_SERIALIZER_CLASS_CONFIG, valSerializerClass); + properties.putAll(additionalConfig); + return new InjectedKafkaProducer(properties, keySerializer, valSerializer); } diff --git a/src/main/java/org/aerogear/kafka/impl/DelegationKafkaConsumer.java b/src/main/java/org/aerogear/kafka/impl/DelegationKafkaConsumer.java index dcdbaf6..289a37f 100644 --- a/src/main/java/org/aerogear/kafka/impl/DelegationKafkaConsumer.java +++ b/src/main/java/org/aerogear/kafka/impl/DelegationKafkaConsumer.java @@ -104,7 +104,7 @@ private void createKafkaConsumer(final Class keyType, final Class v } - public void initialize(final String bootstrapServers, final AnnotatedMethod annotatedMethod, final BeanManager beanManager) { + public void initialize(final String bootstrapServers, final AnnotatedMethod annotatedMethod, final BeanManager beanManager, Properties additionalConfig) { final Consumer consumerAnnotation = annotatedMethod.getAnnotation(Consumer.class); this.topics = Arrays.stream(consumerAnnotation.topics()) @@ -125,6 +125,8 @@ public void initialize(final String bootstrapServers, final AnnotatedMethod anno properties.put(KEY_DESERIALIZER_CLASS_CONFIG, CafdiSerdes.serdeFrom(keyTypeClass).deserializer().getClass()); properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,CafdiSerdes.serdeFrom(valTypeClass).deserializer().getClass()); + properties.putAll(additionalConfig); + createKafkaConsumer(keyTypeClass, valTypeClass, properties); this.consumerRebalanceListener = createConsumerRebalanceListener(consumerAnnotation.consumerRebalanceListener());