Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright 2017 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.aerogear.kafka.cdi.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;

/**
* Additional configuration annotation. Specifcy custom configuration for {@link KafkaConfig}.
*/
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaAdditionalConfig {
String key();
String value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@
@Documented
public @interface KafkaConfig {
String bootstrapServers();

KafkaAdditionalConfig[] additionalConfig() default {};
}
12 changes: 10 additions & 2 deletions src/main/java/org/aerogear/kafka/cdi/extension/KafkaExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
public class KafkaExtension<X> implements Extension {

private String bootstrapServers = null;
private Properties additionalConfig = null;
private final Set<AnnotatedMethod<?>> listenerMethods = newSetFromMap(new ConcurrentHashMap<>());
private final Set<AnnotatedMethod<?>> streamProcessorMethods = newSetFromMap(new ConcurrentHashMap<>());
private final Set<DelegationKafkaConsumer> managedConsumers = newSetFromMap(new ConcurrentHashMap<>());
Expand All @@ -81,6 +82,11 @@ public void kafkaConfig(@Observes @WithAnnotations(KafkaConfig.class) ProcessAnn
if (kafkaConfig != null && bootstrapServers == null) {
logger.info("setting bootstrap.servers IP for, {}", kafkaConfig.bootstrapServers());
bootstrapServers = VerySimpleEnvironmentResolver.simpleBootstrapServerResolver(kafkaConfig.bootstrapServers());

additionalConfig = new Properties();
Arrays.stream(kafkaConfig.additionalConfig()).forEach(kafkaAdditionalConfig -> {
additionalConfig.put(kafkaAdditionalConfig.key(), kafkaAdditionalConfig.value());
});
}
}

Expand Down Expand Up @@ -120,7 +126,7 @@ public void afterDeploymentValidation(@Observes AfterDeploymentValidation adv, f
final DelegationKafkaConsumer frameworkConsumer = (DelegationKafkaConsumer) bm.getReference(bean, DelegationKafkaConsumer.class, ctx);

// hooking it all together
frameworkConsumer.initialize(bootstrapServers, consumerMethod, bm);
frameworkConsumer.initialize(bootstrapServers, consumerMethod, bm, additionalConfig);

managedConsumers.add(frameworkConsumer);
submitToExecutor(frameworkConsumer);
Expand Down Expand Up @@ -237,12 +243,14 @@ private void submitToExecutor(final DelegationKafkaConsumer delegationKafkaConsu
}

private org.apache.kafka.clients.producer.Producer createInjectionProducer(final String bootstrapServers, final Class<?> keySerializerClass, final Class<?> valSerializerClass, final Serializer<?> keySerializer, final Serializer<?> valSerializer ) {

final Properties properties = new Properties();

properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, valSerializerClass);

properties.putAll(additionalConfig);

return new InjectedKafkaProducer(properties, keySerializer, valSerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private <K, V> void createKafkaConsumer(final Class<K> keyType, final Class<V> v
}


public void initialize(final String bootstrapServers, final AnnotatedMethod annotatedMethod, final BeanManager beanManager) {
public void initialize(final String bootstrapServers, final AnnotatedMethod annotatedMethod, final BeanManager beanManager, Properties additionalConfig) {
final Consumer consumerAnnotation = annotatedMethod.getAnnotation(Consumer.class);

this.topics = Arrays.stream(consumerAnnotation.topics())
Expand All @@ -125,6 +125,8 @@ public void initialize(final String bootstrapServers, final AnnotatedMethod anno
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, CafdiSerdes.serdeFrom(keyTypeClass).deserializer().getClass());
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,CafdiSerdes.serdeFrom(valTypeClass).deserializer().getClass());

properties.putAll(additionalConfig);

createKafkaConsumer(keyTypeClass, valTypeClass, properties);
this.consumerRebalanceListener = createConsumerRebalanceListener(consumerAnnotation.consumerRebalanceListener());

Expand Down