2929import java .nio .ByteBuffer ;
3030import java .time .Instant ;
3131import java .util .Collections ;
32+ import lombok .extern .slf4j .Slf4j ;
3233import org .apache .kafka .clients .consumer .*;
3334import org .apache .kafka .common .TopicPartition ;
3435import org .apache .kafka .common .errors .RecordDeserializationException ;
3536import org .apache .kafka .common .record .TimestampType ;
36- import org .junit .jupiter .api .AfterEach ;
37- import org .junit .jupiter .api .Assertions ;
38- import org .junit .jupiter .api .BeforeEach ;
39- import org .junit .jupiter .api .Test ;
37+ import org .junit .jupiter .api .*;
4038import org .mockito .Mock ;
4139import org .mockito .MockitoAnnotations ;
4240
41+ @ Slf4j
42+ @ TestMethodOrder (MethodOrderer .OrderAnnotation .class )
4343class RetryableConsumerTest {
44+ private RetryableConsumer <String , String > retryableConsumer ; // The tested class
45+
4446 @ Mock
4547 private KafkaConsumer <String , String > kafkaConsumer ;
4648
@@ -50,9 +52,6 @@ class RetryableConsumerTest {
5052 @ Mock
5153 private RetryableConsumerErrorHandler <String , String > errorHandler ;
5254
53- @ Mock
54- private RetryableConsumer <String , String > retryableConsumer ;
55-
5655 @ Mock
5756 private RetryableConsumerRebalanceListener rebalanceListener ;
5857
@@ -65,7 +64,9 @@ class RetryableConsumerTest {
6564 @ Mock
6665 RecordProcessor <ConsumerRecord <String , String >, Exception > recordProcessorDeserializationError ;
6766
68- private final String topic = "topic" ;
67+ private AutoCloseable closeableMocks ;
68+
69+ private final String topic = "retryable-cons-test-topic" ;
6970 private final int record1Partition = 1 ;
7071 private final long record1Offset = 1L ;
7172 private final TopicPartition record1TopicPartition = new TopicPartition (topic , record1Partition );
@@ -75,8 +76,10 @@ class RetryableConsumerTest {
7576 private final TopicPartition record2TopicPartition = new TopicPartition (topic , record2Partition );
7677
7778 @ BeforeEach
78- void setUp () throws Exception {
79- MockitoAnnotations .openMocks (this );
79+ void setUp (TestInfo testInfo ) throws Exception {
80+ log .info ("Setting up test : {}" , testInfo .getDisplayName ());
81+ closeableMocks = MockitoAnnotations .openMocks (this );
82+ log .info ("Mocks initialized!" );
8083
8184 when (errorHandler .isExceptionRetryable (CustomRetryableException .class )).thenReturn (true );
8285 when (errorHandler .isExceptionRetryable (CustomNotRetryableException .class ))
@@ -103,16 +106,26 @@ void setUp() throws Exception {
103106
104107 retryableConsumer =
105108 new RetryableConsumer <>(retryableConfiguration , kafkaConsumer , errorHandler , rebalanceListener );
109+
110+ log .info ("Test setup completed for test {} !" , testInfo .getDisplayName ());
106111 }
107112
108113 @ AfterEach
109- void teardown () {
114+ void teardown (TestInfo testInfo ) throws Exception {
115+ log .info ("Tearing down test : {} ..." , testInfo .getDisplayName ());
110116 if (retryableConsumer != null ) {
111117 retryableConsumer .close ();
112118 }
119+ if (closeableMocks != null ) {
120+ closeableMocks .close ();
121+ log .info ("Mocks closed" );
122+ }
123+
124+ log .info ("Test tear down completed for test {} !" , testInfo .getDisplayName ());
113125 }
114126
115127 @ Test
128+ @ Order (1 )
116129 void listenAsync_shouldProcessRecords () throws Exception {
117130 ConsumerRecord <String , String > consumerRecord =
118131 new ConsumerRecord <>(topic , record1Partition , record1Offset , "key" , "value" );
@@ -129,56 +142,12 @@ void listenAsync_shouldProcessRecords() throws Exception {
129142 retryableConsumer .listenAsync (r -> recordProcessorNoError .processRecord (r ));
130143 verify (kafkaConsumer , timeout (5000 ).atLeast (1 )).poll (any ());
131144 verify (recordProcessorNoError , timeout (5000 ).times (1 )).processRecord (any ());
132-
133- Assertions .assertEquals (
134- retryableConsumer .getCurrentOffset (record1TopicPartition ).offset (), record1Offset + 1 );
135- }
136-
137- @ Test
138- void listenAsync_shouldHandleDeserializationException () throws Exception {
139- ConsumerRecord <String , String > consumerRecord =
140- new ConsumerRecord <>(topic , record1Partition , record1Offset , "key" , "value" );
141-
142- when (kafkaConsumer .poll (any ()))
143- .thenReturn ( // First poll return one record
144- new ConsumerRecords <>(
145- Collections .singletonMap (
146- record1TopicPartition , Collections .singletonList (consumerRecord )),
147- Collections .singletonMap (
148- record1TopicPartition , new OffsetAndMetadata (1L )) // next records
149- ))
150- .thenReturn (new ConsumerRecords <>(
151- Collections .emptyMap (),
152- Collections .singletonMap (record1TopicPartition , new OffsetAndMetadata (1L )) // next records
153- )); // all subsequent calls return empty record list
154-
155- doThrow (new RecordDeserializationException (
156- RecordDeserializationException .DeserializationExceptionOrigin .VALUE ,
157- record1TopicPartition ,
158- record1Offset ,
159- Instant .now ().toEpochMilli (),
160- TimestampType .NO_TIMESTAMP_TYPE ,
161- ByteBuffer .wrap ("Test Key" .getBytes ()),
162- ByteBuffer .wrap ("Test Value" .getBytes ()),
163- null ,
164- "Fake DeSer Error" ,
165- new Exception ()))
166- .when (recordProcessorNoError )
167- .processRecord (any ());
168-
169- retryableConsumer .listenAsync (r -> recordProcessorNoError .processRecord (r ));
170- verify (kafkaConsumer , timeout (5000 ).atLeast (2 )).poll (any ());
171-
172- // Check the record is sent to DLQ
173- verify (errorHandler , timeout (5000 ).times (1 )).handleConsumerDeserializationError (any ());
174-
175- // Check we have correctly skipped the record
176- Assertions .assertNotNull (retryableConsumer .getCurrentOffset (record1TopicPartition ));
177145 Assertions .assertEquals (
178146 retryableConsumer .getCurrentOffset (record1TopicPartition ).offset (), record1Offset + 1 );
179147 }
180148
181149 @ Test
150+ @ Order (2 )
182151 void listenAsync_shouldHandleNotRetryableError () throws Exception {
183152 ConsumerRecord <String , String > record1 =
184153 new ConsumerRecord <>(topic , record1Partition , record1Offset , "key1" , "value1" );
@@ -202,7 +171,9 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records
202171 Collections .singletonMap (record1TopicPartition , new OffsetAndMetadata (1L )) // next records
203172 )); // all subsequent calls return empty record list
204173
205- doThrow (new CustomNotRetryableException ()).when (recordProcessorNoError ).processRecord (record2 );
174+ doThrow (new RetryableConsumerTest .CustomNotRetryableException ())
175+ .when (recordProcessorNoError )
176+ .processRecord (record2 );
206177
207178 retryableConsumer .listenAsync (r -> recordProcessorNoError .processRecord (r ));
208179 verify (kafkaConsumer , timeout (5000 ).atLeastOnce ()).poll (any ());
@@ -215,6 +186,7 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records
215186 }
216187
217188 @ Test
189+ @ Order (3 )
218190 void listenAsync_shouldHandleInfiniteRetryableError () throws Exception {
219191 ConsumerRecord <String , String > record1 =
220192 new ConsumerRecord <>(topic , record1Partition , record1Offset , "key1" , "value1" );
@@ -238,7 +210,9 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records
238210 Collections .singletonMap (record1TopicPartition , new OffsetAndMetadata (1L )) // next record
239211 )); // all subsequent calls return empty record list
240212
241- doThrow (new CustomRetryableException ()).when (recordProcessorNoError ).processRecord (record2 );
213+ doThrow (new RetryableConsumerTest .CustomRetryableException ())
214+ .when (recordProcessorNoError )
215+ .processRecord (record2 );
242216
243217 retryableConsumer .listenAsync (r -> recordProcessorNoError .processRecord (r ));
244218
@@ -255,6 +229,51 @@ record1TopicPartition, new OffsetAndMetadata(1L)) // next records
255229 retryableConsumer .getCurrentOffset (record1TopicPartition ).offset (), record2Offset );
256230 }
257231
232+ @ Test
233+ @ Order (4 )
234+ void listenAsync_shouldHandleDeserializationException () throws Exception {
235+ ConsumerRecord <String , String > consumerRecord =
236+ new ConsumerRecord <>(topic , record1Partition , record1Offset , "key" , "value" );
237+
238+ when (kafkaConsumer .poll (any ()))
239+ .thenReturn ( // First poll return one record
240+ new ConsumerRecords <>(
241+ Collections .singletonMap (
242+ record1TopicPartition , Collections .singletonList (consumerRecord )),
243+ Collections .singletonMap (
244+ record1TopicPartition , new OffsetAndMetadata (1L )) // next records
245+ ))
246+ .thenReturn (new ConsumerRecords <>(
247+ Collections .emptyMap (),
248+ Collections .singletonMap (record1TopicPartition , new OffsetAndMetadata (1L )) // next records
249+ )); // all subsequent calls return empty record list
250+
251+ doThrow (new RecordDeserializationException (
252+ RecordDeserializationException .DeserializationExceptionOrigin .VALUE ,
253+ record1TopicPartition ,
254+ record1Offset ,
255+ Instant .now ().toEpochMilli (),
256+ TimestampType .NO_TIMESTAMP_TYPE ,
257+ ByteBuffer .wrap ("Test Key" .getBytes ()),
258+ ByteBuffer .wrap ("Test Value" .getBytes ()),
259+ null ,
260+ "Fake DeSer Error" ,
261+ new Exception ()))
262+ .when (recordProcessorNoError )
263+ .processRecord (any ());
264+
265+ retryableConsumer .listenAsync (r -> recordProcessorNoError .processRecord (r ));
266+ verify (kafkaConsumer , timeout (5000 ).atLeast (2 )).poll (any ());
267+
268+ // Check the record is sent to DLQ
269+ verify (errorHandler , timeout (5000 ).times (1 )).handleConsumerDeserializationError (any ());
270+
271+ // Check we have correctly skipped the record
272+ Assertions .assertNotNull (retryableConsumer .getCurrentOffset (record1TopicPartition ));
273+ Assertions .assertEquals (
274+ retryableConsumer .getCurrentOffset (record1TopicPartition ).offset (), record1Offset + 1 );
275+ }
276+
258277 static class CustomRetryableException extends Exception {}
259278
260279 static class CustomNotRetryableException extends Exception {}
0 commit comments