diff --git a/README.md b/README.md index 5e0d9fc..7f87e95 100644 --- a/README.md +++ b/README.md @@ -25,13 +25,15 @@ This lib is proposed to ease the usage of kafka for simple data consumer with ex * [Getting Started](#getting-started) * [Vanilla Java](#vanilla-java) * [Springboot](#springboot) - * [Code References](#code-references) +* [Custom Error Processing](#custom-error-processing) +* [Code References](#code-references) ## Features ### Current version - Error management with Dead Letter Queue - Automatic record commit management in case of error or partition rebalance - Retry on error with Exception exclusion management +- Custom Error processing ### To be implemented - Implement core lib Log&Fail feature @@ -205,7 +207,36 @@ the synchronous KafkaRetryableConsumer method ```listen(topics)```. (or use the } } ``` -## Code References + +# Custom error processing +If you need to implement custom error processing (ex: log additional metrics, send alert, specific Dead Letter Queue format ...etc.), +you can implement a custom process through `ErrorProcessor` interface : + +```java + @Slf4j + public class CustomErrorProcessor implements ErrorProcessor> { + @Override + public void processError(Throwable throwable, ConsumerRecord record, Exception exception, int retryAttemptCount) { + // Your custom error processing here + log.error("..."); + } + } +``` + +Then inject this custom error processor in your RetryableConsumer constructor : +```java + try(RetryableConsumer retryableConsumer = new RetryableConsumer<>( + retryableConsumerConfiguration, + new CustomErrorProcessor() + )) { + retryableConsumer.listen( + Collections.singleton("MY_INPUT_TOPIC"), + myBusinessProcessService::processRecord + ); + } +``` + +# Code References This library heavily relies on an original version from @jeanlouisboudart : https://github.com/jeanlouisboudart/retriable-consumer diff --git a/retryable-consumer-core/src/main/java/com/michelin/kafka/ErrorProcessor.java b/retryable-consumer-core/src/main/java/com/michelin/kafka/ErrorProcessor.java new file mode 100644 index 0000000..92fa38b --- /dev/null +++ b/retryable-consumer-core/src/main/java/com/michelin/kafka/ErrorProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafka; + +@FunctionalInterface +public interface ErrorProcessor { + + /** + * Performs this operation on the given argument. + * + * @param throwable the input exception + */ + void processError(Throwable throwable, R record, Long retryCount); +} diff --git a/retryable-consumer-core/src/main/java/com/michelin/kafka/RetryableConsumer.java b/retryable-consumer-core/src/main/java/com/michelin/kafka/RetryableConsumer.java index 56d34a2..de73c0e 100644 --- a/retryable-consumer-core/src/main/java/com/michelin/kafka/RetryableConsumer.java +++ b/retryable-consumer-core/src/main/java/com/michelin/kafka/RetryableConsumer.java @@ -21,7 +21,6 @@ import com.michelin.kafka.configuration.KafkaConfigurationException; import com.michelin.kafka.configuration.KafkaRetryableConfiguration; import com.michelin.kafka.configuration.RetryableConsumerConfiguration; -import com.michelin.kafka.error.DeadLetterProducer; import com.michelin.kafka.error.RetryableConsumerErrorHandler; import java.io.Closeable; import java.time.Duration; @@ -73,6 +72,28 @@ public RetryableConsumer(String name) throws KafkaConfigurationException { this.name = name; } + /** Default constructor with no parameters */ + public RetryableConsumer(String name, ErrorProcessor> errorProcessor) + throws KafkaConfigurationException { + this(KafkaRetryableConfiguration.load(), errorProcessor); + this.name = name; + } + + /** + * Constructor with parameters + * + * @param kafkaRetryableConfiguration kafka properties to set + */ + public RetryableConsumer( + KafkaRetryableConfiguration kafkaRetryableConfiguration, + ErrorProcessor> errorProcessor) { + this.kafkaRetryableConfiguration = kafkaRetryableConfiguration; + this.consumer = new KafkaConsumer<>( + this.kafkaRetryableConfiguration.getConsumer().getProperties()); + this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets); + this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor); + } + /** * Constructor with parameters * @@ -82,9 +103,43 @@ public RetryableConsumer(KafkaRetryableConfiguration kafkaRetryableConfiguration this.kafkaRetryableConfiguration = kafkaRetryableConfiguration; this.consumer = new KafkaConsumer<>( this.kafkaRetryableConfiguration.getConsumer().getProperties()); - this.errorHandler = new RetryableConsumerErrorHandler<>( - new DeadLetterProducer(this.kafkaRetryableConfiguration.getDeadLetter()), - this.kafkaRetryableConfiguration.getConsumer().getNotRetryableExceptions()); + this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration); + this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets); + } + + /** + * Constructor with parameters + * + * @param kafkaRetryableConfiguration kafka properties to set + * @param consumer kafka consumer to set + * @param errorProcessor custom error processor to set + * @param rebalanceListener rebalance listener to set + */ + public RetryableConsumer( + KafkaRetryableConfiguration kafkaRetryableConfiguration, + KafkaConsumer consumer, + ErrorProcessor> errorProcessor, + RetryableConsumerRebalanceListener rebalanceListener) { + this.kafkaRetryableConfiguration = kafkaRetryableConfiguration; + this.consumer = consumer; + this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor); + this.rebalanceListener = rebalanceListener; + } + + /** + * Constructor with parameters + * + * @param kafkaRetryableConfiguration kafka properties to set + * @param consumer kafka consumer to set + * @param errorProcessor custom error processor to set + */ + public RetryableConsumer( + KafkaRetryableConfiguration kafkaRetryableConfiguration, + KafkaConsumer consumer, + ErrorProcessor> errorProcessor) { + this.kafkaRetryableConfiguration = kafkaRetryableConfiguration; + this.consumer = consumer; + this.errorHandler = new RetryableConsumerErrorHandler<>(this.kafkaRetryableConfiguration, errorProcessor); this.rebalanceListener = new RetryableConsumerRebalanceListener(consumer, offsets); } @@ -195,6 +250,24 @@ protected void seekAndCommitToLatestSuccessfulOffset() { }); } + /** + * This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one + * * in the configuration file. Warning : this method is blocker. If you need an asynchronous consumer, use + * listenAsync + * + * @param recordProcessor processor function to process every received records + * @param errorProcessor processor function to be called whenever an unrecoverable error is detected + */ + public void listen( + RecordProcessor, Exception> recordProcessor, + ErrorProcessor> errorProcessor) { + if (kafkaRetryableConfiguration.getConsumer().getTopics() == null + || kafkaRetryableConfiguration.getConsumer().getTopics().isEmpty()) { + throw new IllegalArgumentException("Topic list consumer configuration is not set"); + } + this.listen(kafkaRetryableConfiguration.getConsumer().getTopics(), recordProcessor, errorProcessor); + } + /** * This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one * * in the configuration file. Warning : this method is blocker. If you need an asynchronous consumer, use @@ -218,6 +291,21 @@ public void listen(RecordProcessor, Exception> recordProces * @param recordProcessor processor function to process every received records */ public void listen(Collection topics, RecordProcessor, Exception> recordProcessor) { + this.listen(topics, recordProcessor, null); + } + + /** + * This method will start a blocking Kafka the consumer that will run in background. The listened topics are the one + * in topics parameter. (Topics list of the configuration file is ignored). Warning : this method is blocker. If you + * need an asynchronous consumer, use listenAsync + * + * @param recordProcessor processor function to process every received records + * @param errorProcessor processor function to be called whenever an unrecoverable error is detected + */ + public void listen( + Collection topics, + RecordProcessor, Exception> recordProcessor, + ErrorProcessor> errorProcessor) { log.info("Starting consumer for topics {}", topics); try { consumer.subscribe(topics, this.rebalanceListener); @@ -233,6 +321,19 @@ public void listen(Collection topics, RecordProcessor listenAsync( + RecordProcessor, Exception> recordProcessor, + ErrorProcessor> errorProcessor) { + return CompletableFuture.runAsync(() -> listen(recordProcessor, errorProcessor)); + } + /** * This method will start an asynchronous Kafka the consumer that will run in background. The listened topics are * the one in the configuration file. @@ -254,7 +355,7 @@ public Future listenAsync(RecordProcessor, Exception> */ public Future listenAsync( Collection topics, RecordProcessor, Exception> recordProcessor) { - return CompletableFuture.runAsync(() -> listen(topics, recordProcessor)); + return CompletableFuture.runAsync(() -> listen(topics, recordProcessor, null)); } private void pollAndConsumeRecords(RecordProcessor, Exception> recordProcessor) { @@ -277,7 +378,8 @@ private void pollAndConsumeRecords(RecordProcessor, Excepti } catch (RecordDeserializationException e) { log.error("It looks like we ate a poison pill, let's skip this record!", e); skipCurrentOffset(e.topicPartition(), e.offset()); - errorHandler.handleConsumerDeserializationError(e); + + errorHandler.handleError(e, null); } catch (Exception e) { handleUnknownException(e); } finally { @@ -315,7 +417,8 @@ private void handleUnknownException(Exception e) { this.retryCounter++; } } else { // non-recoverable error - // Send message to DeadLetter Topic + + // Default : Send message to DeadLetter Topic errorHandler.handleError(e, this.currentProcessingRecord); // If config is Log&Fail diff --git a/retryable-consumer-core/src/main/java/com/michelin/kafka/error/DefaultErrorProcessor.java b/retryable-consumer-core/src/main/java/com/michelin/kafka/error/DefaultErrorProcessor.java new file mode 100644 index 0000000..8666898 --- /dev/null +++ b/retryable-consumer-core/src/main/java/com/michelin/kafka/error/DefaultErrorProcessor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafka.error; + +import com.michelin.kafka.ErrorProcessor; +import com.michelin.kafka.avro.GenericErrorModel; +import com.michelin.kafka.configuration.KafkaRetryableConfiguration; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.RecordDeserializationException; + +@Slf4j +public class DefaultErrorProcessor implements ErrorProcessor> { + + private final DeadLetterProducer deadLetterProducer; + + public DefaultErrorProcessor(KafkaRetryableConfiguration kafkaRetryableConfiguration) { + this.deadLetterProducer = new DeadLetterProducer(kafkaRetryableConfiguration.getDeadLetter()); + } + + public DefaultErrorProcessor(DeadLetterProducer deadLetterProducer) { + this.deadLetterProducer = deadLetterProducer; + } + + @Override + public void processError(Throwable throwable, ConsumerRecord consumerRecord, Long retryCount) { + if (throwable instanceof RecordDeserializationException) { + this.handleConsumerDeserializationError((RecordDeserializationException) throwable); + } else { + if (retryCount == null) { + retryCount = 0L; + } + if (throwable != null) { + if (consumerRecord != null) { + this.handleError( + throwable.getMessage(), + "Error processing record after " + retryCount + " retry(ies).", + consumerRecord.offset(), + consumerRecord.partition(), + consumerRecord.topic(), + throwable, + consumerRecord.key(), + consumerRecord.value()); + } else { + this.handleError( + throwable.getMessage(), + "Error processing an unknown record after " + retryCount + " retry(ies).", + 0L, + 0, + "unknown", + throwable, + null, + null); + } + } else { // Throwable is null + if (consumerRecord != null) { + this.handleError( + "Undefined error", + "Error processing record after " + retryCount + " retry(ies).", + consumerRecord.offset(), + consumerRecord.partition(), + consumerRecord.topic(), + null, + consumerRecord.key(), + consumerRecord.value()); + } + } + } + } + + public void handleConsumerDeserializationError(RecordDeserializationException e) { + if (e != null) { + this.handleError( + e.getMessage(), + null, + e.offset(), + e.topicPartition().partition(), + e.topicPartition().topic(), + e, + null, + null); + } + } + + public void handleError( + String cause, + String context, + Long offset, + Integer partition, + String topic, + Throwable exception, + K key, + V value) { + try { + GenericErrorModel errorAvroObject = GenericErrorModel.newBuilder() + .setCause(cause) + .setContextMessage(context) + .setOffset(offset) + .setPartition(partition) + .setTopic(topic) + .build(); + if (exception != null) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + exception.printStackTrace(pw); + errorAvroObject.setStack(sw.toString()); + } + if (value != null) { + if (value instanceof String s) { + errorAvroObject.setValue(s); + } else { + errorAvroObject.setByteValue(toByteBuffer((Serializable) value)); + } + } + if (key != null) { + if (key instanceof String s) { + errorAvroObject.setKey(s); + } else { + errorAvroObject.setByteKey(toByteBuffer((Serializable) key)); + } + } + + // Build the producer and send message + deadLetterProducer.send(UUID.randomUUID().toString(), errorAvroObject); + + } catch (Exception e) { + log.error("An error occurred during error management... good luck with that!", e); + } + } + + public static ByteBuffer toByteBuffer(Serializable obj) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(obj); + oos.flush(); + ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray()); + oos.close(); + baos.close(); + return buffer; + } +} diff --git a/retryable-consumer-core/src/main/java/com/michelin/kafka/error/RetryableConsumerErrorHandler.java b/retryable-consumer-core/src/main/java/com/michelin/kafka/error/RetryableConsumerErrorHandler.java index 83e6da2..5ae0e38 100644 --- a/retryable-consumer-core/src/main/java/com/michelin/kafka/error/RetryableConsumerErrorHandler.java +++ b/retryable-consumer-core/src/main/java/com/michelin/kafka/error/RetryableConsumerErrorHandler.java @@ -18,9 +18,9 @@ */ package com.michelin.kafka.error; -import com.michelin.kafka.avro.GenericErrorModel; +import com.michelin.kafka.ErrorProcessor; +import com.michelin.kafka.configuration.KafkaRetryableConfiguration; import java.io.*; -import java.nio.ByteBuffer; import java.util.*; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -31,17 +31,23 @@ @AllArgsConstructor public class RetryableConsumerErrorHandler { - private final DeadLetterProducer deadLetterProducer; + private final ErrorProcessor> errorProcessor; private final List> notRetryableExceptions = new ArrayList<>(); public RetryableConsumerErrorHandler( - DeadLetterProducer deadLetterProducer, Collection notRetryableExceptions) { - this.deadLetterProducer = deadLetterProducer; - convertStringToException(notRetryableExceptions).forEach(this::addNotRetryableExceptions); + KafkaRetryableConfiguration kafkaRetryableConfiguration, + ErrorProcessor> errorProcessor) { + this.errorProcessor = errorProcessor; + convertStringToException(kafkaRetryableConfiguration.getConsumer().getNotRetryableExceptions()) + .forEach(this::addNotRetryableExceptions); this.initDefaultRetryableExceptions(); } + public RetryableConsumerErrorHandler(KafkaRetryableConfiguration kafkaRetryableConfiguration) { + this(kafkaRetryableConfiguration, new DefaultErrorProcessor<>(kafkaRetryableConfiguration)); + } + private void initDefaultRetryableExceptions() { this.addNotRetryableExceptions( RecordDeserializationException.class, NoSuchMethodException.class, ClassCastException.class); @@ -56,94 +62,12 @@ public boolean isExceptionRetryable(Class exceptionType) { return !notRetryableExceptions.contains(exceptionType); } - public void handleConsumerDeserializationError(RecordDeserializationException e) { - if (e != null) { - this.handleError( - e.getMessage(), - null, - e.offset(), - e.topicPartition().partition(), - e.topicPartition().topic(), - e, - null, - null); - } - } - - public void handleError(Throwable throwable, ConsumerRecord consumerRecord, Long retryNumber) { - if (throwable != null) { - this.handleError( - throwable.getMessage(), - "Error processing record after" + retryNumber + " retry(ies).", - consumerRecord.offset(), - consumerRecord.partition(), - consumerRecord.topic(), - throwable, - consumerRecord.key(), - consumerRecord.value()); - } + public void handleError(Throwable throwable, ConsumerRecord consumerRecord, Long retryCount) { + this.errorProcessor.processError(throwable, consumerRecord, retryCount); } public void handleError(Throwable throwable, ConsumerRecord consumerRecord) { - handleError(throwable, consumerRecord, 0L); - } - - public void handleError( - String cause, - String context, - Long offset, - Integer partition, - String topic, - Throwable exception, - K key, - V value) { - try { - GenericErrorModel errorAvroObject = GenericErrorModel.newBuilder() - .setCause(cause) - .setContextMessage(context) - .setOffset(offset) - .setPartition(partition) - .setTopic(topic) - .build(); - if (exception != null) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - exception.printStackTrace(pw); - errorAvroObject.setStack(sw.toString()); - } - if (value != null) { - if (value instanceof String s) { - errorAvroObject.setValue(s); - } else { - errorAvroObject.setByteValue(toByteBuffer((Serializable) value)); - } - } - if (key != null) { - if (key instanceof String s) { - errorAvroObject.setKey(s); - } else { - errorAvroObject.setByteKey(toByteBuffer((Serializable) key)); - } - } - - // Build the producer and send message - deadLetterProducer.send(UUID.randomUUID().toString(), errorAvroObject); - - } catch (Exception e) { - - log.error("An error occurred during error management... good luck with that!", e); - } - } - - public static ByteBuffer toByteBuffer(Serializable obj) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(obj); - oos.flush(); - ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray()); - oos.close(); - baos.close(); - return buffer; + this.errorProcessor.processError(throwable, consumerRecord, 0L); } public static List> convertStringToException(Collection exceptionNames) { diff --git a/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/DefaultErrorProcessorTest.java b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/DefaultErrorProcessorTest.java new file mode 100644 index 0000000..5431fd1 --- /dev/null +++ b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/DefaultErrorProcessorTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafka.test.unit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.michelin.kafka.ErrorProcessor; +import com.michelin.kafka.avro.GenericErrorModel; +import com.michelin.kafka.error.DeadLetterProducer; +import com.michelin.kafka.error.DefaultErrorProcessor; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class DefaultErrorProcessorTest { + @Captor + private ArgumentCaptor keyCaptor; + + @Captor + private ArgumentCaptor valueCaptor; + + @Mock + private static DeadLetterProducer mockDeadLetterProducer; + + private ErrorProcessor> errorProcessor; + + AutoCloseable mockCloseable; + + @BeforeEach + void setUp() { + mockCloseable = MockitoAnnotations.openMocks(this); + errorProcessor = new DefaultErrorProcessor<>(mockDeadLetterProducer); + doNothing().when(mockDeadLetterProducer).send(any(), any()); + } + + @Test + void testToByteBuffer() throws IOException, ClassNotFoundException { + // Arrange + String testString = "Test string"; + + // Act + ByteBuffer buffer = DefaultErrorProcessor.toByteBuffer(testString); + + // Convert ByteBuffer back to String + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + String resultString = (String) ois.readObject(); + + // Assert + assertEquals(testString, resultString); + } + + @Test + void shouldHandleErrorWithThrowableAndRecord() { + // Given + String cause = "Exception message"; + Exception e = new Exception(cause); + + ConsumerRecord record = new ConsumerRecord<>("testTopic", 12, 13458L, "key", "value"); + + // When + errorProcessor.processError(e, record, 32L); + + // Then + verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); + + GenericErrorModel capturedErrorModel = valueCaptor.getValue(); + + assertEquals(cause, capturedErrorModel.getCause()); + assertEquals("Error processing record after 32 retry(ies).", capturedErrorModel.getContextMessage()); + assertEquals(record.topic(), capturedErrorModel.getTopic()); + assertEquals(record.partition(), capturedErrorModel.getPartition()); + assertEquals(record.offset(), capturedErrorModel.getOffset()); + assertEquals(record.key(), capturedErrorModel.getKey()); + assertEquals(record.value(), capturedErrorModel.getValue()); + } + + @Test + void shouldHandleErrorWithNullRecord() { + // Given + String cause = "Exception message"; + Exception e = new Exception(cause); + + // When + errorProcessor.processError(e, null, 32L); + + // Then + verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); + + GenericErrorModel capturedErrorModel = valueCaptor.getValue(); + + assertEquals(cause, capturedErrorModel.getCause()); + assertEquals("Error processing an unknown record after 32 retry(ies).", capturedErrorModel.getContextMessage()); + } + + @Test + void shouldHandleErrorWithNullThrowable() { + // Given + ConsumerRecord record = new ConsumerRecord<>("testTopic", 12, 13458L, "key", "value"); + + // When + errorProcessor.processError(null, record, 32L); + + // Then + verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); + + GenericErrorModel capturedErrorModel = valueCaptor.getValue(); + + assertEquals("Undefined error", capturedErrorModel.getCause()); + assertEquals("Error processing record after 32 retry(ies).", capturedErrorModel.getContextMessage()); + assertEquals(record.topic(), capturedErrorModel.getTopic()); + assertEquals(record.partition(), capturedErrorModel.getPartition()); + assertEquals(record.offset(), capturedErrorModel.getOffset()); + assertEquals(record.key(), capturedErrorModel.getKey()); + assertEquals(record.value(), capturedErrorModel.getValue()); + } + + @Test + void shouldHandleErrorWithRecordDeserializationException() throws IOException { + // Given + ConsumerRecord record = new ConsumerRecord<>("testTopic", 12, 13458L, "key", "value"); + RecordDeserializationException rde = new RecordDeserializationException( + RecordDeserializationException.DeserializationExceptionOrigin.KEY, + new TopicPartition(record.topic(), record.partition()), + record.offset(), + 1764603801, + TimestampType.CREATE_TIME, + DefaultErrorProcessor.toByteBuffer("MyKey"), + DefaultErrorProcessor.toByteBuffer("MyValue"), + null, + "Record deserialization error", + null); + + // When + errorProcessor.processError(rde, record, 32L); + verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); + + GenericErrorModel capturedErrorModel = valueCaptor.getValue(); + + assertEquals("Record deserialization error", capturedErrorModel.getCause()); + assertEquals(record.topic(), capturedErrorModel.getTopic()); + } +} diff --git a/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerErrorHandlerTest.java b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerErrorHandlerTest.java index 759f7b8..1f26026 100644 --- a/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerErrorHandlerTest.java +++ b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerErrorHandlerTest.java @@ -22,17 +22,18 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import com.michelin.kafka.ErrorProcessor; import com.michelin.kafka.avro.GenericErrorModel; +import com.michelin.kafka.configuration.KafkaRetryableConfiguration; +import com.michelin.kafka.configuration.RetryableConsumerConfiguration; import com.michelin.kafka.error.DeadLetterProducer; +import com.michelin.kafka.error.DefaultErrorProcessor; import com.michelin.kafka.error.RetryableConsumerErrorHandler; import java.io.*; -import java.nio.ByteBuffer; -import java.time.Instant; import java.util.Arrays; import java.util.List; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,14 +49,33 @@ class RetryableConsumerErrorHandlerTest { @Captor private ArgumentCaptor valueCaptor; + + @Captor + private ArgumentCaptor throwableCaptor; + + @Captor + private ArgumentCaptor> recordCaptor; + + @Captor + private ArgumentCaptor retryCountCaptor; + /** Shared test Dead Letter Topic consumer configuration */ @Mock private static DeadLetterProducer mockDeadLetterProducer; + @Mock + private KafkaRetryableConfiguration retryableConfiguration; + + @Mock + private RetryableConsumerConfiguration consumerConfiguration; + private RetryableConsumerErrorHandler errorHandler; private RetryableConsumerErrorHandler serializableObjectErrorHandler; private RetryableConsumerErrorHandler serializableObjectKeyErrorHandler; + @Mock + ErrorProcessor> customErrorProcessor; + AutoCloseable mockCloseable; @BeforeEach @@ -63,12 +83,26 @@ void setUp() { mockCloseable = MockitoAnnotations.openMocks(this); List notRetryableExceptions = Arrays.asList("java.lang.IllegalArgumentException", "java.lang.NullPointerException"); + when(retryableConfiguration.getConsumer()).thenReturn(consumerConfiguration); + when(consumerConfiguration.getNotRetryableExceptions()).thenReturn(notRetryableExceptions); + + keyCaptor = ArgumentCaptor.forClass(String.class); + valueCaptor = ArgumentCaptor.forClass(GenericErrorModel.class); + doNothing().when(mockDeadLetterProducer).send(any(), any()); - errorHandler = new RetryableConsumerErrorHandler<>(mockDeadLetterProducer, notRetryableExceptions); + ErrorProcessor> errorProcessor = + new DefaultErrorProcessor<>(mockDeadLetterProducer); + errorHandler = new RetryableConsumerErrorHandler<>(retryableConfiguration, errorProcessor); + + ErrorProcessor> serializableObjectErrorProcessor = + new DefaultErrorProcessor<>(mockDeadLetterProducer); serializableObjectErrorHandler = - new RetryableConsumerErrorHandler<>(mockDeadLetterProducer, notRetryableExceptions); + new RetryableConsumerErrorHandler<>(retryableConfiguration, serializableObjectErrorProcessor); + + ErrorProcessor> serializableObjectKeyErrorProcessor = + new DefaultErrorProcessor<>(mockDeadLetterProducer); serializableObjectKeyErrorHandler = - new RetryableConsumerErrorHandler<>(mockDeadLetterProducer, notRetryableExceptions); + new RetryableConsumerErrorHandler<>(retryableConfiguration, serializableObjectKeyErrorProcessor); } @AfterEach @@ -110,92 +144,36 @@ void shouldInitializeWithDefaultNotRetryableExceptions() { } @Test - void testHandleConsumerDeserializationErrorExecution() { - TopicPartition topicPartition = new TopicPartition("ExampleTopic", 1); - RecordDeserializationException exception = new RecordDeserializationException( - RecordDeserializationException.DeserializationExceptionOrigin.VALUE, - topicPartition, - 1L, - Instant.now().toEpochMilli(), - TimestampType.NO_TIMESTAMP_TYPE, - ByteBuffer.wrap("Test Key".getBytes()), - ByteBuffer.wrap("Test Value".getBytes()), - null, - "Test message", - null); - assertDoesNotThrow(() -> errorHandler.handleConsumerDeserializationError(exception)); - } - - @Test - void testHandleConsumerDeserializationWithErrorNullTopic() { - RecordDeserializationException exception = new RecordDeserializationException( - RecordDeserializationException.DeserializationExceptionOrigin.VALUE, - null, - 1L, - Instant.now().toEpochMilli(), - TimestampType.NO_TIMESTAMP_TYPE, - ByteBuffer.wrap("Test Key".getBytes()), - ByteBuffer.wrap("Test Value".getBytes()), - null, - "Test message", - null); - - assertThrows(NullPointerException.class, () -> errorHandler.handleConsumerDeserializationError(exception)); - } - - @Test - void shouldHandleErrorWhenAllParametersAreNotNull() { + void shouldHandleCustomErrorProcessor() { // Given - String cause = "cause"; - String context = "context"; - Long offset = 1L; - Integer partition = 1; - String topic = "topic"; - Throwable exception = new RuntimeException("Test exception"); - String key = "key"; - String value = "value"; - doNothing().when(mockDeadLetterProducer).send(any(), any()); + RetryableConsumerErrorHandler errorHandlerWithCustomErrorProcessor = + new RetryableConsumerErrorHandler<>(retryableConfiguration, customErrorProcessor); - // When - errorHandler.handleError(cause, context, offset, partition, topic, exception, key, value); - - // Then - verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); - - GenericErrorModel capturedErrorModel = valueCaptor.getValue(); - - assertEquals(cause, capturedErrorModel.getCause()); - assertEquals(context, capturedErrorModel.getContextMessage()); - assertEquals(offset, capturedErrorModel.getOffset()); - assertEquals(partition, capturedErrorModel.getPartition()); - assertEquals(topic, capturedErrorModel.getTopic()); - } - - @Test - void shouldHandleErrorWhenExceptionIsNull() { - // Given - String cause = "cause"; - String context = "context"; + String cause = "Test exception message"; Long offset = 1L; Integer partition = 1; String topic = "topic"; + Throwable exception = new RuntimeException(cause); String key = "key"; String value = "value"; + doNothing().when(mockDeadLetterProducer).send(any(), any()); + ConsumerRecord record = new ConsumerRecord<>(topic, partition, offset, key, value); // When - errorHandler.handleError(cause, context, offset, partition, topic, null, key, value); + errorHandlerWithCustomErrorProcessor.handleError(exception, record, 2L); // Then - verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); - - GenericErrorModel capturedErrorModel = valueCaptor.getValue(); - - assertEquals(cause, capturedErrorModel.getCause()); - assertEquals(context, capturedErrorModel.getContextMessage()); - assertEquals(offset, capturedErrorModel.getOffset()); - assertEquals(partition, capturedErrorModel.getPartition()); - assertEquals(topic, capturedErrorModel.getTopic()); - assertNull(capturedErrorModel.getStack()); // Check that Stack is null when exception is null + verify(mockDeadLetterProducer, times(0)).send(any(), any()); // Verify the default error processor is not used + verify(customErrorProcessor, times(1)) // Verify the custom error processor is used + .processError(throwableCaptor.capture(), recordCaptor.capture(), retryCountCaptor.capture()); + + assertEquals(exception.getMessage(), throwableCaptor.getValue().getMessage()); + assertEquals(record.key(), recordCaptor.getValue().key()); + assertEquals(record.value(), recordCaptor.getValue().value()); + assertEquals(record.topic(), recordCaptor.getValue().topic()); + assertEquals(record.offset(), recordCaptor.getValue().offset()); + assertEquals(record.partition(), recordCaptor.getValue().partition()); + assertEquals(2L, retryCountCaptor.getValue()); } public static class SerializableObject implements Serializable { @@ -206,18 +184,19 @@ public static class SerializableObject implements Serializable { @Test void shouldHandleErrorWhenValueIsNotString() { // Given - String cause = "cause"; - String context = "context"; + String cause = "Test exception message"; Long offset = 1L; Integer partition = 1; String topic = "topic"; - Throwable exception = new RuntimeException("Test exception"); + Throwable exception = new RuntimeException(cause); String key = "key"; SerializableObject value = new SerializableObject(); doNothing().when(mockDeadLetterProducer).send(any(), any()); + ConsumerRecord record = new ConsumerRecord<>(topic, partition, offset, key, value); + // When - serializableObjectErrorHandler.handleError(cause, context, offset, partition, topic, exception, key, value); + serializableObjectErrorHandler.handleError(exception, record); // Then verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); @@ -225,7 +204,7 @@ void shouldHandleErrorWhenValueIsNotString() { GenericErrorModel capturedErrorModel = valueCaptor.getValue(); assertEquals(cause, capturedErrorModel.getCause()); - assertEquals(context, capturedErrorModel.getContextMessage()); + assertNotNull(capturedErrorModel.getContextMessage()); assertEquals(offset, capturedErrorModel.getOffset()); assertEquals(partition, capturedErrorModel.getPartition()); assertEquals(topic, capturedErrorModel.getTopic()); @@ -235,18 +214,20 @@ void shouldHandleErrorWhenValueIsNotString() { @Test void shouldHandleErrorWhenKeyIsNotString() { // Given - String cause = "cause"; - String context = "context"; + String cause = "Test exception message"; Long offset = 1L; Integer partition = 1; String topic = "topic"; - Throwable exception = new RuntimeException("Test exception"); + + Throwable exception = new RuntimeException(cause); SerializableObject key = new SerializableObject(); String value = "value"; doNothing().when(mockDeadLetterProducer).send(any(), any()); + ConsumerRecord record = new ConsumerRecord<>(topic, partition, offset, key, value); + // When - serializableObjectKeyErrorHandler.handleError(cause, context, offset, partition, topic, exception, key, value); + serializableObjectKeyErrorHandler.handleError(exception, record); // Then verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture()); @@ -254,32 +235,13 @@ void shouldHandleErrorWhenKeyIsNotString() { GenericErrorModel capturedErrorModel = valueCaptor.getValue(); assertEquals(cause, capturedErrorModel.getCause()); - assertEquals(context, capturedErrorModel.getContextMessage()); + assertNotNull(capturedErrorModel.getContextMessage()); assertEquals(offset, capturedErrorModel.getOffset()); assertEquals(partition, capturedErrorModel.getPartition()); assertEquals(topic, capturedErrorModel.getTopic()); assertNotNull(capturedErrorModel.getByteKey()); // Check that ByteKey is not null when key is not a String } - @Test - void testToByteBuffer() throws IOException, ClassNotFoundException { - // Arrange - String testString = "Test string"; - - // Act - ByteBuffer buffer = RetryableConsumerErrorHandler.toByteBuffer(testString); - - // Convert ByteBuffer back to String - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois = new ObjectInputStream(bais); - String resultString = (String) ois.readObject(); - - // Assert - assertEquals(testString, resultString); - } - @Test void testConvertStringToExceptionWithValidExceptionNames() { diff --git a/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerTest.java b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerTest.java index f2c5f8d..4bfc8ed 100644 --- a/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerTest.java +++ b/retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerTest.java @@ -18,19 +18,27 @@ */ package com.michelin.kafka.test.unit; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.*; +import com.michelin.kafka.ErrorProcessor; import com.michelin.kafka.RecordProcessor; import com.michelin.kafka.RetryableConsumer; import com.michelin.kafka.RetryableConsumerRebalanceListener; +import com.michelin.kafka.configuration.KafkaConfigurationException; import com.michelin.kafka.configuration.KafkaRetryableConfiguration; import com.michelin.kafka.configuration.RetryableConsumerConfiguration; import com.michelin.kafka.error.RetryableConsumerErrorHandler; import java.nio.ByteBuffer; import java.time.Instant; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.record.TimestampType; @@ -94,10 +102,14 @@ void setUp(TestInfo testInfo) throws Exception { when(retryableConfiguration.getName()).thenReturn("Test Consumer"); when(retryableConfiguration.getConsumer()).thenReturn(consumerConfiguration); when(consumerConfiguration.getTopics()).thenReturn(Collections.singletonList(topic)); + when(consumerConfiguration.getNotRetryableExceptions()) + .thenReturn(Collections.singletonList(CustomNotRetryableException.class.getName())); when(retryableConfigurationStopOnError.getName()).thenReturn("Test Consumer with stop on error config"); when(retryableConfigurationStopOnError.getConsumer()).thenReturn(consumerConfigurationStopOnError); when(consumerConfigurationStopOnError.getTopics()).thenReturn(Collections.singletonList(topic)); + when(consumerConfigurationStopOnError.getNotRetryableExceptions()) + .thenReturn(Collections.singletonList(CustomNotRetryableException.class.getName())); when(consumerConfigurationStopOnError.getStopOnError()).thenReturn(true); doNothing().when(recordProcessorNoError).processRecord(any()); @@ -140,8 +152,7 @@ void listenAsync_shouldProcessRecords() throws Exception { retryableConsumer.listenAsync(r -> recordProcessorNoError.processRecord(r)); verify(kafkaConsumer, timeout(5000).atLeast(1)).poll(any()); verify(recordProcessorNoError, timeout(5000).times(1)).processRecord(any()); - Assertions.assertEquals( - retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record1Offset + 1); + assertEquals(retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record1Offset + 1); } @Test @@ -179,8 +190,7 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records // Not retryable error : Check we have correctly skipped the record Assertions.assertNotNull(retryableConsumer.getCurrentOffset(record1TopicPartition)); - Assertions.assertEquals( - retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record2Offset + 1); + assertEquals(retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record2Offset + 1); } @Test @@ -223,8 +233,7 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records // Retryable error : Check we store correctly the offset of second record only Assertions.assertNotNull(retryableConsumer.getCurrentOffset(record1TopicPartition)); - Assertions.assertEquals( - retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record2Offset); + assertEquals(retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record2Offset); } @Test @@ -264,12 +273,11 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records verify(kafkaConsumer, timeout(5000).atLeast(2)).poll(any()); // Check the record is sent to DLQ - verify(errorHandler, timeout(5000).times(1)).handleConsumerDeserializationError(any()); + verify(errorHandler, timeout(5000).times(1)).handleError(any(), any()); // Check we have correctly skipped the record Assertions.assertNotNull(retryableConsumer.getCurrentOffset(record1TopicPartition)); - Assertions.assertEquals( - retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record1Offset + 1); + assertEquals(retryableConsumer.getCurrentOffset(record1TopicPartition).offset(), record1Offset + 1); } @Test @@ -304,7 +312,104 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records Assertions.assertTrue(retryableConsumerStopOnError.isStopped()); } + @Test + @Order(6) + void testRetryableWithCustomErrorProcessor() throws Exception { + CustomErrorProcessor customErrorProcessor = new CustomErrorProcessor(); + ConsumerRecord record1 = + new ConsumerRecord<>(topic, record1Partition, record1Offset, "key1", "value1"); + ConsumerRecord record2 = + new ConsumerRecord<>(topic, record2Partition, record2Offset, "key2", "value2"); + + try (RetryableConsumer retryableConsumerCustomError = + new RetryableConsumer<>(retryableConfigurationStopOnError, kafkaConsumer, customErrorProcessor)) { + when(kafkaConsumer.poll(any())) + .thenReturn( // First poll return one record + new ConsumerRecords<>( + Collections.singletonMap(record1TopicPartition, Collections.singletonList(record1)), + Collections.singletonMap( + record1TopicPartition, new OffsetAndMetadata(1L)) // next records + )) + .thenReturn(new ConsumerRecords<>( + Collections.singletonMap(record2TopicPartition, Collections.singletonList(record2)), + Collections.singletonMap(record1TopicPartition, new OffsetAndMetadata(1L)) // next records + )) + .thenReturn(new ConsumerRecords<>( + Collections.emptyMap(), + Collections.singletonMap(record1TopicPartition, new OffsetAndMetadata(1L)) // next records + )); // all subsequent calls return empty record list + + doThrow(new RetryableConsumerTest.CustomNotRetryableException("Test Custom Error Processor")) + .when(recordProcessorNoError) + .processRecord(record2); + + retryableConsumerCustomError.listenAsync(r -> recordProcessorNoError.processRecord(r)); + verify(kafkaConsumer, timeout(5000).atLeastOnce()).poll(any()); + + assertEquals(1, customErrorProcessor.getErrors().size()); + assertEquals( + "Test Custom Error Processor", + customErrorProcessor.getErrors().get(0)); + } + } + + @Test + @Order(7) + void testRetryableConstructors() throws KafkaConfigurationException { + CustomErrorProcessor customErrorProcessor = new CustomErrorProcessor(); + + try (RetryableConsumer retryableConsumer1 = new RetryableConsumer<>("test")) { + assertNotNull(retryableConsumer1); + } catch (Exception e) { + assertEquals(KafkaException.class, e.getClass()); + } + + KafkaRetryableConfiguration config = KafkaRetryableConfiguration.load(); + try (RetryableConsumer retryableConsumer2 = + new RetryableConsumer<>(config, customErrorProcessor)) { + assertNotNull(retryableConsumer2); + } catch (Exception e) { + assertEquals(KafkaException.class, e.getClass()); + } + + try (RetryableConsumer retryableConsumerCustomError = + new RetryableConsumer<>(retryableConfigurationStopOnError, kafkaConsumer, customErrorProcessor)) { + assertNotNull(retryableConsumerCustomError); + } + + try (RetryableConsumer retryableConsumerCustomError = new RetryableConsumer<>( + retryableConfigurationStopOnError, kafkaConsumer, customErrorProcessor, rebalanceListener)) { + assertNotNull(retryableConsumerCustomError); + } + } + + @Getter + static class CustomErrorProcessor implements ErrorProcessor> { + List errors = new ArrayList<>(); + + @Override + public void processError(Throwable throwable, ConsumerRecord record, Long retryCount) { + // Custom error processing logic + log.error( + "Error processing record with key {} and value {}. Retry count: {}. Error: {}", + record.key(), + record.value(), + retryCount, + throwable.getMessage()); + + errors.add(throwable.getMessage()); + } + } + static class CustomRetryableException extends Exception {} - static class CustomNotRetryableException extends Exception {} + static class CustomNotRetryableException extends Exception { + public CustomNotRetryableException() { + super(); + } + + public CustomNotRetryableException(String message) { + super(message); + } + } } diff --git a/retryable-consumer-core/src/test/resources/application.yml b/retryable-consumer-core/src/test/resources/application.yml index c695629..b595ba1 100644 --- a/retryable-consumer-core/src/test/resources/application.yml +++ b/retryable-consumer-core/src/test/resources/application.yml @@ -20,6 +20,8 @@ kafka: specific: avro: reader: true + key.deserializer: org.apache.kafka.common.serialization.StringDeserializer + value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer topics: TOPIC dead-letter: producer: