Skip to content

Commit da15f85

Browse files
committed
Enhance unit testing
1 parent 59b0540 commit da15f85

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/DefaultErrorProcessorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.michelin.kafka.test.unit;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNull;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.Mockito.*;
2425

@@ -31,6 +32,10 @@
3132
import java.io.ObjectInputStream;
3233
import java.nio.ByteBuffer;
3334
import org.apache.kafka.clients.consumer.ConsumerRecord;
35+
import org.apache.kafka.common.TopicPartition;
36+
import org.apache.kafka.common.errors.RecordDeserializationException;
37+
import org.apache.kafka.common.header.Headers;
38+
import org.apache.kafka.common.record.TimestampType;
3439
import org.junit.jupiter.api.BeforeEach;
3540
import org.junit.jupiter.api.Test;
3641
import org.mockito.ArgumentCaptor;
@@ -142,4 +147,31 @@ void shouldHandleErrorWithNullThrowable() {
142147
assertEquals(record.key(), capturedErrorModel.getKey());
143148
assertEquals(record.value(), capturedErrorModel.getValue());
144149
}
150+
151+
@Test
152+
void shouldHandleErrorWithRecordDeserializationException() throws IOException {
153+
// Given
154+
ConsumerRecord<String, String> record = new ConsumerRecord<>("testTopic", 12, 13458L, "key", "value");
155+
RecordDeserializationException rde =
156+
new RecordDeserializationException(
157+
RecordDeserializationException.DeserializationExceptionOrigin.KEY,
158+
new TopicPartition(record.topic(),record.partition()),
159+
record.offset(),
160+
1764603801,
161+
TimestampType.CREATE_TIME,
162+
DefaultErrorProcessor.toByteBuffer("MyKey"),
163+
DefaultErrorProcessor.toByteBuffer("MyValue"),
164+
null,
165+
"Record deserialization error",
166+
null);
167+
168+
// When
169+
errorProcessor.processError(rde, record, 32L);
170+
verify(mockDeadLetterProducer, times(1)).send(keyCaptor.capture(), valueCaptor.capture());
171+
172+
GenericErrorModel capturedErrorModel = valueCaptor.getValue();
173+
174+
assertEquals("Record deserialization error", capturedErrorModel.getCause());
175+
assertEquals(record.topic(), capturedErrorModel.getTopic());
176+
}
145177
}

retryable-consumer-core/src/test/java/com/michelin/kafka/test/unit/RetryableConsumerErrorHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void shouldHandleCustomErrorProcessor() {
160160
ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
161161

162162
// When
163-
errorHandlerWithCustomErrorProcessor.handleError(exception, record);
163+
errorHandlerWithCustomErrorProcessor.handleError(exception, record, 2L);
164164

165165
// Then
166166
verify(mockDeadLetterProducer, times(0)).send(any(), any()); // Verify the default error processor is not used
@@ -173,6 +173,7 @@ void shouldHandleCustomErrorProcessor() {
173173
assertEquals(record.topic(), recordCaptor.getValue().topic());
174174
assertEquals(record.offset(), recordCaptor.getValue().offset());
175175
assertEquals(record.partition(), recordCaptor.getValue().partition());
176+
assertEquals(2L, retryCountCaptor.getValue());
176177
}
177178

178179
public static class SerializableObject implements Serializable {

0 commit comments

Comments
 (0)