|
7 | 7 | import java.util.List; |
8 | 8 | import java.util.concurrent.CompletableFuture; |
9 | 9 | import java.util.concurrent.ExecutionException; |
| 10 | +import java.util.concurrent.atomic.AtomicInteger; |
10 | 11 | import lombok.Getter; |
11 | 12 | import lombok.SneakyThrows; |
12 | 13 | import lombok.extern.slf4j.Slf4j; |
13 | 14 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
14 | 15 | import org.springframework.beans.factory.annotation.Autowired; |
| 16 | +import org.springframework.beans.factory.annotation.Value; |
15 | 17 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
16 | 18 | import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent; |
17 | 19 | import org.springframework.context.annotation.Configuration; |
| 20 | +import org.springframework.context.event.ContextClosedEvent; |
18 | 21 | import org.springframework.context.event.ContextRefreshedEvent; |
19 | 22 | import org.springframework.context.event.EventListener; |
20 | 23 | import org.springframework.core.task.TaskExecutor; |
@@ -51,6 +54,30 @@ public class DynamicKafkaListener { |
51 | 54 |
|
52 | 55 | @Autowired private CommonErrorHandler topicTransactionErrorHandler; |
53 | 56 |
|
| 57 | + private final AtomicInteger inFlightRecords = new AtomicInteger(0); |
| 58 | + |
| 59 | + @Value("${kafka.general.consumer-shutdown-timeout-in-sec:30}") |
| 60 | + private int shutdownTimeoutInSeconds; |
| 61 | + |
| 62 | + // Registering a 'graceful' shutdown hook in PostConstruct |
| 63 | + /* @PostConstruct |
| 64 | + public void init() { |
| 65 | + Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
| 66 | + log.info("JVM shutdown hook triggered for Kafka consumers"); |
| 67 | + try { |
| 68 | + performGracefulShutdown(); |
| 69 | + } catch (Exception e) { |
| 70 | + log.error("Error during Kafka shutdown", e); |
| 71 | + } |
| 72 | + })); |
| 73 | + }*/ |
| 74 | + |
| 75 | + @EventListener(ContextClosedEvent.class) |
| 76 | + public void onContextClosed() { |
| 77 | + log.info("Application context closing, performing graceful Kafka shutdown"); |
| 78 | + performGracefulShutdown(); |
| 79 | + } |
| 80 | + |
54 | 81 | @EventListener |
55 | 82 | public void handleEvent(ContextRefreshedEvent event) { |
56 | 83 | log.info("Initializing Kafka Consumers.."); |
@@ -128,58 +155,122 @@ private Object determineMessageListenerForTransactions(KafkaProperties.Consumer |
128 | 155 | */ |
129 | 156 | private Object getMultithreadedBatchAcknowledgingMessageListener() { |
130 | 157 | return new BatchAcknowledgingMessageListener<String, String>() { |
131 | | - |
132 | 158 | @SneakyThrows |
133 | 159 | @Override |
134 | 160 | public void onMessage( |
135 | 161 | List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) { |
136 | 162 | log.debug("Consumer got assigned with a Batch of size : {}", consumerRecords.size()); |
137 | 163 |
|
| 164 | + // Track the number of records we're processing |
| 165 | + inFlightRecords.addAndGet(consumerRecords.size()); |
| 166 | + |
138 | 167 | List<CompletableFuture<Void>> transactionSubmissionTasks = new ArrayList<>(); |
139 | 168 |
|
140 | 169 | // Dispatch workers for asynchronously processing Individual records |
141 | 170 | for (ConsumerRecord<String, String> message : consumerRecords) { |
142 | 171 | transactionSubmissionTasks.add( |
143 | 172 | CompletableFuture.runAsync( |
144 | 173 | () -> { |
145 | | - transactionConsumer.listen(message); |
| 174 | + try { |
| 175 | + transactionConsumer.listen(message); |
| 176 | + } finally { |
| 177 | + // No need to decrement here as we'll do it after all tasks complete or fail |
| 178 | + } |
146 | 179 | }, |
147 | 180 | defaultTaskExecutor)); |
148 | 181 | } |
149 | 182 |
|
| 183 | + boolean batchSuccess = true; |
| 184 | + int failedIndex = -1; |
| 185 | + |
150 | 186 | for (int i = 0; i < transactionSubmissionTasks.size(); i++) { |
151 | 187 | try { |
152 | 188 | transactionSubmissionTasks.get(i).get(); |
153 | 189 | } catch (InterruptedException | ExecutionException e) { |
| 190 | + batchSuccess = false; |
| 191 | + failedIndex = i; |
154 | 192 |
|
155 | 193 | final Throwable cause = e.getCause(); |
156 | 194 |
|
157 | 195 | if (cause instanceof ServiceException) { |
158 | 196 | log.error( |
159 | 197 | "One of the Consumer Record in Async Batch Processor failed with message {}", |
160 | 198 | cause.getMessage()); |
161 | | - throw new BatchListenerFailedException( |
162 | | - "Failed to process a Consumer Record from the Batch", i); |
163 | 199 | } |
164 | 200 |
|
165 | 201 | if (cause instanceof InterruptedException) { |
166 | 202 | throw e; |
167 | 203 | } |
168 | 204 | } |
169 | 205 | } |
| 206 | + |
| 207 | + // Always decrement the counter for all records in the batch |
| 208 | + inFlightRecords.addAndGet(-consumerRecords.size()); |
| 209 | + |
170 | 210 | // If the entire Records were processed successfully, Ack & commit the entire Batch |
171 | | - acknowledgment.acknowledge(); |
| 211 | + if (batchSuccess) { |
| 212 | + acknowledgment.acknowledge(); |
| 213 | + } else { |
| 214 | + throw new BatchListenerFailedException( |
| 215 | + "Failed to process a Consumer Record from the Batch", failedIndex); |
| 216 | + } |
172 | 217 | } |
173 | 218 | }; |
174 | 219 | } |
175 | 220 |
|
176 | 221 | private Object getPerRecordAcknowledgingListener() { |
177 | | - |
178 | 222 | return (AcknowledgingMessageListener<String, String>) |
179 | 223 | (message, acknowledgment) -> { |
180 | | - transactionConsumer.listen(message); |
181 | | - // Manually ack the single Record |
182 | | - acknowledgment.acknowledge(); |
| 224 | + try { |
| 225 | + // Increment counter before processing |
| 226 | + inFlightRecords.incrementAndGet(); |
| 227 | + |
| 228 | + transactionConsumer.listen(message); |
| 229 | + // Manually ack the single Record |
| 230 | + acknowledgment.acknowledge(); |
| 231 | + } finally { |
| 232 | + // Always decrement counter, even if exception occurred |
| 233 | + inFlightRecords.decrementAndGet(); |
| 234 | + } |
183 | 235 | }; |
184 | 236 | } |
| 237 | + |
| 238 | + private void performGracefulShutdown() { |
| 239 | + log.info("Starting graceful shutdown of Kafka consumers"); |
| 240 | + |
| 241 | + // Stop all containers from polling new messages |
| 242 | + if (!CollectionUtils.isEmpty(existingContainers)) { |
| 243 | + existingContainers.forEach( |
| 244 | + container -> { |
| 245 | + log.info("Stopping container: {}", container.metrics().keySet().iterator().next()); |
| 246 | + container.stop(); |
| 247 | + }); |
| 248 | + } |
| 249 | + |
| 250 | + // Wait for in-flight messages to be processed |
| 251 | + log.info( |
| 252 | + "All Kafka containers stopped from polling. Waiting for {} in-flight records to be processed...", |
| 253 | + inFlightRecords.get()); |
| 254 | + |
| 255 | + long startTime = System.currentTimeMillis(); |
| 256 | + |
| 257 | + try { |
| 258 | + while (inFlightRecords.get() > 0 |
| 259 | + && System.currentTimeMillis() - startTime < (shutdownTimeoutInSeconds * 1000L)) { |
| 260 | + log.info("Still waiting for {} records to be acknowledged", inFlightRecords.get()); |
| 261 | + Thread.sleep(500); |
| 262 | + } |
| 263 | + } catch (InterruptedException e) { |
| 264 | + Thread.currentThread().interrupt(); |
| 265 | + log.error("Interrupted during shutdown wait", e); |
| 266 | + } |
| 267 | + |
| 268 | + if (inFlightRecords.get() > 0) { |
| 269 | + log.warn("{} records were not acknowledged before shutdown timeout", inFlightRecords.get()); |
| 270 | + } else { |
| 271 | + log.info("All records successfully processed and acknowledged"); |
| 272 | + } |
| 273 | + |
| 274 | + log.info("Kafka consumer graceful shutdown completed"); |
| 275 | + } |
185 | 276 | } |
0 commit comments