Skip to content

Commit 32a2c88

Browse files
mibutecJoergSiebahn
authored andcommitted
feat(kafka): support max. number of retries in RetryProcessingErrorMLS
1 parent 4b43533 commit 32a2c88

File tree

4 files changed

+484
-14
lines changed

4 files changed

+484
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.sdase.commons.server.kafka.consumer.strategies.retryprocessingerror;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import org.apache.kafka.clients.consumer.ConsumerRecord;
7+
8+
/**
9+
* Helper class counting the number of executions for a given record.
10+
*
11+
* <p>Internally it increases a counter for the current offset of the record as long as the offset
12+
* remains the same. If the offset changes, the counter is reset to 0.
13+
*/
14+
public class RetryCounter {
15+
private final Map<Integer, OffsetCounter> offsetCountersByPartition =
16+
Collections.synchronizedMap(new HashMap<>());
17+
18+
private final long maxRetryCount;
19+
20+
public RetryCounter(long maxRetryCount) {
21+
this.maxRetryCount = maxRetryCount;
22+
}
23+
24+
public void incErrorCount(ConsumerRecord<?, ?> consumerRecord) {
25+
getOffsetCounterInternal(consumerRecord).inc();
26+
}
27+
28+
public boolean isMaxRetryCountReached(ConsumerRecord<?, ?> consumerRecord) {
29+
long counter = getOffsetCounter(consumerRecord);
30+
return counter > maxRetryCount;
31+
}
32+
33+
public long getOffsetCounter(ConsumerRecord<?, ?> consumerRecord) {
34+
return getOffsetCounterInternal(consumerRecord).count;
35+
}
36+
37+
public long getMaxRetryCount() {
38+
return maxRetryCount;
39+
}
40+
41+
private OffsetCounter getOffsetCounterInternal(ConsumerRecord<?, ?> consumerRecord) {
42+
OffsetCounter counter =
43+
offsetCountersByPartition.computeIfAbsent(
44+
consumerRecord.partition(), k -> new OffsetCounter(0));
45+
if (counter.offset != consumerRecord.offset()) {
46+
counter.offset = consumerRecord.offset();
47+
counter.count = 0;
48+
}
49+
return counter;
50+
}
51+
52+
private static class OffsetCounter {
53+
private long offset = 0;
54+
private long count = 0;
55+
56+
public OffsetCounter(long offset) {
57+
this.offset = offset;
58+
this.count = 0;
59+
}
60+
61+
public long inc() {
62+
return ++count;
63+
}
64+
}
65+
}

sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/consumer/strategies/retryprocessingerror/RetryProcessingErrorMLS.java

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,57 @@
2121

2222
/**
2323
* {@link MessageListenerStrategy} commits records for each partition. In case of processing errors
24-
* an error handler can decide if processing should be retried or stopped.
24+
* the message will be retried for a configured amount of times.
25+
*
26+
* <p>After each error an error handler can decide if processing should be retried or stopped.
27+
*
28+
* <p>If the retry count exceeds:
29+
*
30+
* <ul>
31+
* <li>The failed message handling will be logged as ERROR
32+
* <li>a retryLimitExceededErrorHandler will be called allowing to do more error handling (i.e.
33+
* write to DLT)
34+
* <li>the offset will get increased, so the message handling gets ignored.
35+
* </ul>
2536
*/
2637
public class RetryProcessingErrorMLS<K, V> extends MessageListenerStrategy<K, V> {
2738

2839
private static final Logger LOGGER = LoggerFactory.getLogger(RetryProcessingErrorMLS.class);
2940
private final MessageHandler<K, V> handler;
3041
private final ErrorHandler<K, V> errorHandler;
42+
private final ErrorHandler<K, V> retryLimitExceededErrorHandler;
3143
private String consumerName;
44+
private final RetryCounter retryCounter;
3245

46+
/**
47+
* Creates a new instance of {@link RetryProcessingErrorMLS} retrying the message on error
48+
* infinite times.
49+
*
50+
* @param handler the message handler
51+
* @param errorHandler the error handler called after each error
52+
*/
3353
public RetryProcessingErrorMLS(MessageHandler<K, V> handler, ErrorHandler<K, V> errorHandler) {
54+
this(handler, errorHandler, Long.MAX_VALUE, null);
55+
}
56+
57+
/**
58+
* Creates a new instance of {@link RetryProcessingErrorMLS} retrying the message on error for a
59+
* given amount of times
60+
*
61+
* @param handler the message handler
62+
* @param errorHandler the error handler called after each error
63+
* @param maxRetryCount the maximum number of retries
64+
* @param retryLimitExceededErrorHandler the error handler called if the retry limit is exceeded
65+
*/
66+
public RetryProcessingErrorMLS(
67+
MessageHandler<K, V> handler,
68+
ErrorHandler<K, V> errorHandler,
69+
long maxRetryCount,
70+
ErrorHandler<K, V> retryLimitExceededErrorHandler) {
3471
this.handler = handler;
3572
this.errorHandler = errorHandler;
73+
this.retryLimitExceededErrorHandler = retryLimitExceededErrorHandler;
74+
this.retryCounter = new RetryCounter(maxRetryCount);
3675
}
3776

3877
@Override
@@ -57,9 +96,7 @@ private void processRecordsByPartition(
5796
try {
5897
Instant timerStart = Instant.now();
5998
handler.handle(consumerRecord);
60-
// mark last successful processed record for commit
61-
lastCommitOffset = new OffsetAndMetadata(consumerRecord.offset() + 1);
62-
addOffsetToCommitOnClose(consumerRecord);
99+
lastCommitOffset = markConsumerRecordProcessed(consumerRecord);
63100

64101
Instant timerEnd = Instant.now();
65102
if (LOGGER.isTraceEnabled()) {
@@ -69,23 +106,28 @@ private void processRecordsByPartition(
69106
consumerName,
70107
consumerRecord.topic());
71108
}
72-
73109
} catch (RuntimeException e) {
74-
LOGGER.error(
75-
"Error while handling record {} in message handler {}",
76-
consumerRecord.key(),
77-
handler.getClass(),
78-
e);
79-
boolean shouldContinue = errorHandler.handleError(consumerRecord, e, consumer);
80-
if (!shouldContinue) {
81-
throw new StopListenerException(e);
110+
retryCounter.incErrorCount(consumerRecord);
111+
if (retryCounter.isMaxRetryCountReached(consumerRecord)) {
112+
LOGGER.error(
113+
"Error while handling record {} in message handler {}, no more retries",
114+
consumerRecord.key(),
115+
handler.getClass(),
116+
e);
117+
118+
callErrorHandler(retryLimitExceededErrorHandler, consumerRecord, e, consumer);
119+
lastCommitOffset = markConsumerRecordProcessed(consumerRecord);
82120
} else {
83121
LOGGER.warn(
84-
"Error while handling record {} in message handler {}, will be retried",
122+
"Error while handling record {} in message handler {}, will be retried ({} / {})...",
85123
consumerRecord.key(),
86124
handler.getClass(),
125+
retryCounter.getOffsetCounter(consumerRecord),
126+
retryCounter.getMaxRetryCount(),
87127
e);
88128

129+
callErrorHandler(errorHandler, consumerRecord, e, consumer);
130+
89131
// seek to the current offset of the failing record for retry
90132
consumer.seek(partition, consumerRecord.offset());
91133
break;
@@ -94,6 +136,7 @@ private void processRecordsByPartition(
94136
}
95137
}
96138
if (lastCommitOffset != null) {
139+
97140
consumer.commitSync(Collections.singletonMap(partition, lastCommitOffset));
98141
}
99142
}
@@ -105,4 +148,22 @@ public void verifyConsumerConfig(Map<String, String> config) {
105148
"The strategy should commit explicitly by partition but property 'enable.auto.commit' in consumer config is set to 'true'");
106149
}
107150
}
151+
152+
private void callErrorHandler(
153+
ErrorHandler<K, V> errorHandler,
154+
ConsumerRecord<K, V> consumerRecord,
155+
RuntimeException e,
156+
KafkaConsumer<K, V> consumer) {
157+
if (errorHandler != null) {
158+
boolean shouldContinue = errorHandler.handleError(consumerRecord, e, consumer);
159+
if (!shouldContinue) {
160+
throw new StopListenerException(e);
161+
}
162+
}
163+
}
164+
165+
private OffsetAndMetadata markConsumerRecordProcessed(ConsumerRecord<K, V> consumerRecord) {
166+
addOffsetToCommitOnClose(consumerRecord);
167+
return new OffsetAndMetadata(consumerRecord.offset() + 1);
168+
}
108169
}

0 commit comments

Comments
 (0)