Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ This lib is proposed to ease the usage of kafka for simple data consumer with ex
* [Getting Started](#getting-started)
* [Vanilla Java](#vanilla-java)
* [Springboot](#springboot)
* [Code References](#code-references)
* [Custom Error Processing](#custom-error-processing)
* [Code References](#code-references)

## Features
### Current version
- Error management with Dead Letter Queue
- Automatic record commit management in case of error or partition rebalance
- Retry on error with Exception exclusion management
- Custom Error processing

### To be implemented
- Implement core lib Log&Fail feature
Expand Down Expand Up @@ -205,7 +207,36 @@ the synchronous KafkaRetryableConsumer method ```listen(topics)```. (or use the
}
}
```
## Code References

# Custom error processing
If you need to implement custom error processing (ex: log additional metrics, send alert, specific Dead Letter Queue format ...etc.),
you can implement a custom process through `ErrorProcessor` interface :

```java
@Slf4j
public class CustomErrorProcessor implements ErrorProcessor<ConsumerRecord<String, MyAvroObject>> {
@Override
public void processError(Throwable throwable, ConsumerRecord<String, MyAvroObject> record, Exception exception, int retryAttemptCount) {
// Your custom error processing here
log.error("...");
}
}
```

Then inject this custom error processor in your RetryableConsumer constructor :
```java
try(RetryableConsumer<String, MyAvroObject> retryableConsumer = new RetryableConsumer<>(
retryableConsumerConfiguration,
new CustomErrorProcessor()
)) {
retryableConsumer.listen(
Collections.singleton("MY_INPUT_TOPIC"),
myBusinessProcessService::processRecord
);
}
```

# Code References
This library heavily relies on an original version from @jeanlouisboudart : https://github.com/jeanlouisboudart/retriable-consumer


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.michelin.kafka;

@FunctionalInterface
public interface ErrorProcessor<R> {

/**
* Performs this operation on the given argument.
*
* @param throwable the input exception
*/
void processError(Throwable throwable, R record, Long retryCount);

Check warning on line 29 in retryable-consumer-core/src/main/java/com/michelin/kafka/ErrorProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this variable to not match a restricted identifier.

See more on https://sonarcloud.io/project/issues?id=michelin_kafka-retryable-consumer&issues=AZraxnjbbWd8HavMNJdf&open=AZraxnjbbWd8HavMNJdf&pullRequest=6
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.michelin.kafka.configuration.KafkaConfigurationException;
import com.michelin.kafka.configuration.KafkaRetryableConfiguration;
import com.michelin.kafka.configuration.RetryableConsumerConfiguration;
import com.michelin.kafka.error.DeadLetterProducer;
import com.michelin.kafka.error.RetryableConsumerErrorHandler;
import java.io.Closeable;
import java.time.Duration;
Expand Down Expand Up @@ -73,6 +72,28 @@
this.name = name;
}

/** Default constructor with no parameters */
public RetryableConsumer(String name, ErrorProcessor<ConsumerRecord<K, V>> errorProcessor)
throws KafkaConfigurationException {
this(KafkaRetryableConfiguration.load(), errorProcessor);
this.name = name;
}

/**
* Constructor with parameters
*
* @param kafkaRetryableConfiguration kafka properties to set
*/
public RetryableConsumer(
KafkaRetryableConfiguration kafkaRetryableConfiguration,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor) {
this.kafkaRetryableConfiguration = kafkaRetryableConfiguration;
this.consumer = new KafkaConsumer<>(
this.kafkaRetryableConfiguration.getConsumer().getProperties());
this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets);
this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor);
}

/**
* Constructor with parameters
*
Expand All @@ -82,9 +103,43 @@
this.kafkaRetryableConfiguration = kafkaRetryableConfiguration;
this.consumer = new KafkaConsumer<>(
this.kafkaRetryableConfiguration.getConsumer().getProperties());
this.errorHandler = new RetryableConsumerErrorHandler<>(
new DeadLetterProducer(this.kafkaRetryableConfiguration.getDeadLetter()),
this.kafkaRetryableConfiguration.getConsumer().getNotRetryableExceptions());
this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration);
this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets);
}

/**
* Constructor with parameters
*
* @param kafkaRetryableConfiguration kafka properties to set
* @param consumer kafka consumer to set
* @param errorProcessor custom error processor to set
* @param rebalanceListener rebalance listener to set
*/
public RetryableConsumer(
KafkaRetryableConfiguration kafkaRetryableConfiguration,
KafkaConsumer<K, V> consumer,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor,
RetryableConsumerRebalanceListener rebalanceListener) {
this.kafkaRetryableConfiguration = kafkaRetryableConfiguration;
this.consumer = consumer;
this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor);
this.rebalanceListener = rebalanceListener;
}

/**
* Constructor with parameters
*
* @param kafkaRetryableConfiguration kafka properties to set
* @param consumer kafka consumer to set
* @param errorProcessor custom error processor to set
*/
public RetryableConsumer(
KafkaRetryableConfiguration kafkaRetryableConfiguration,
KafkaConsumer<K, V> consumer,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor) {
this.kafkaRetryableConfiguration = kafkaRetryableConfiguration;
this.consumer = consumer;
this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor);
this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets);
}

Expand Down Expand Up @@ -195,6 +250,24 @@
});
}

/**
* This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one
* * in the configuration file. Warning : this method is blocker. If you need an asynchronous consumer, use
* listenAsync
*
* @param recordProcessor processor function to process every received records
* @param errorProcessor processor function to be called whenever an unrecoverable error is detected
*/
public void listen(
RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor) {
if (kafkaRetryableConfiguration.getConsumer().getTopics() == null
|| kafkaRetryableConfiguration.getConsumer().getTopics().isEmpty()) {
throw new IllegalArgumentException("Topic list consumer configuration is not set");
}
this.listen(kafkaRetryableConfiguration.getConsumer().getTopics(), recordProcessor, errorProcessor);
}

/**
* This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one
* * in the configuration file. Warning : this method is blocker. If you need an asynchronous consumer, use
Expand All @@ -218,6 +291,21 @@
* @param recordProcessor processor function to process every received records
*/
public void listen(Collection<String> topics, RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor) {
this.listen(topics, recordProcessor, null);
}

/**
* This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one
* in topics parameter. (Topics list of the configuration file is ignored). Warning : this method is blocker. If you
* need an asynchronous consumer, use listenAsync
*
* @param recordProcessor processor function to process every received records
* @param errorProcessor processor function to be called whenever an unrecoverable error is detected
*/
public void listen(
Collection<String> topics,
RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor) {
log.info("Starting consumer for topics {}", topics);
try {
consumer.subscribe(topics, this.rebalanceListener);
Expand All @@ -233,6 +321,19 @@
}
}

/**
* This method will start an asynchronous Kafka the consumer that will run in background. The listened topics are
* the one in the configuration file.
*
* @param recordProcessor processor function to process every received records
* @return A CompletableFuture of the kafka consumer listener
*/
public Future<Void> listenAsync(
RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor,
ErrorProcessor<ConsumerRecord<K, V>> errorProcessor) {
return CompletableFuture.runAsync(() -> listen(recordProcessor, errorProcessor));
}

/**
* This method will start an asynchronous Kafka the consumer that will run in background. The listened topics are
* the one in the configuration file.
Expand All @@ -254,7 +355,7 @@
*/
public Future<Void> listenAsync(
Collection<String> topics, RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor) {
return CompletableFuture.runAsync(() -> listen(topics, recordProcessor));
return CompletableFuture.runAsync(() -> listen(topics, recordProcessor, null));
}

private void pollAndConsumeRecords(RecordProcessor<ConsumerRecord<K, V>, Exception> recordProcessor) {
Expand All @@ -277,7 +378,8 @@
} catch (RecordDeserializationException e) {
log.error("It looks like we ate a poison pill, let's skip this record!", e);
skipCurrentOffset(e.topicPartition(), e.offset());
errorHandler.handleConsumerDeserializationError(e);

errorHandler.handleError(e, null);
} catch (Exception e) {
handleUnknownException(e);
} finally {
Expand Down Expand Up @@ -315,11 +417,12 @@
this.retryCounter++;
}
} else { // non-recoverable error
// Send message to DeadLetter Topic

// Default : Send message to DeadLetter Topic
errorHandler.handleError(e, this.currentProcessingRecord);

// If config is Log&Fail
if (consumerConfig.getStopOnError()) {

Check warning on line 425 in retryable-consumer-core/src/main/java/com/michelin/kafka/RetryableConsumer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=michelin_kafka-retryable-consumer&issues=AZraxnj8bWd8HavMNJdj&open=AZraxnj8bWd8HavMNJdj&pullRequest=6
log.error(
"Non-recoverable error occurred (Not retryable, or retry limit reached). Stopping consumer after 'stop-on-error' configuration...",
e);
Expand Down
Loading
Loading