|
7 | 7 | import java.util.List; |
8 | 8 | import java.util.concurrent.CompletableFuture; |
9 | 9 | import java.util.concurrent.ExecutionException; |
| 10 | +import java.util.concurrent.atomic.AtomicInteger; |
10 | 11 | import lombok.Getter; |
11 | 12 | import lombok.SneakyThrows; |
12 | 13 | import lombok.extern.slf4j.Slf4j; |
13 | 14 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
14 | 15 | import org.springframework.beans.factory.annotation.Autowired; |
| 16 | +import org.springframework.beans.factory.annotation.Value; |
15 | 17 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
16 | 18 | import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent; |
17 | 19 | import org.springframework.context.annotation.Configuration; |
| 20 | +import org.springframework.context.event.ContextClosedEvent; |
18 | 21 | import org.springframework.context.event.ContextRefreshedEvent; |
19 | 22 | import org.springframework.context.event.EventListener; |
20 | 23 | import org.springframework.core.task.TaskExecutor; |
@@ -51,6 +54,17 @@ public class DynamicKafkaListener { |
51 | 54 |
|
52 | 55 | @Autowired private CommonErrorHandler topicTransactionErrorHandler; |
53 | 56 |
|
| 57 | + private final AtomicInteger inFlightRecords = new AtomicInteger(0); |
| 58 | + |
| 59 | + @Value("${kafka.general.consumer-shutdown-timeout-in-sec:30}") |
| 60 | + private int shutdownTimeoutInSeconds; |
| 61 | + |
| 62 | + @EventListener(ContextClosedEvent.class) |
| 63 | + public void onContextClosed() { |
| 64 | + log.info("Application context closing, performing graceful Kafka shutdown"); |
| 65 | + performGracefulShutdown(); |
| 66 | + } |
| 67 | + |
54 | 68 | @EventListener |
55 | 69 | public void handleEvent(ContextRefreshedEvent event) { |
56 | 70 | log.info("Initializing Kafka Consumers.."); |
@@ -128,58 +142,122 @@ private Object determineMessageListenerForTransactions(KafkaProperties.Consumer |
128 | 142 | */ |
129 | 143 | private Object getMultithreadedBatchAcknowledgingMessageListener() { |
130 | 144 | return new BatchAcknowledgingMessageListener<String, String>() { |
131 | | - |
132 | 145 | @SneakyThrows |
133 | 146 | @Override |
134 | 147 | public void onMessage( |
135 | 148 | List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) { |
136 | 149 | log.debug("Consumer got assigned with a Batch of size : {}", consumerRecords.size()); |
137 | 150 |
|
| 151 | + // Track the number of records we're processing |
| 152 | + inFlightRecords.addAndGet(consumerRecords.size()); |
| 153 | + |
138 | 154 | List<CompletableFuture<Void>> transactionSubmissionTasks = new ArrayList<>(); |
139 | 155 |
|
140 | 156 | // Dispatch workers for asynchronously processing Individual records |
141 | 157 | for (ConsumerRecord<String, String> message : consumerRecords) { |
142 | 158 | transactionSubmissionTasks.add( |
143 | 159 | CompletableFuture.runAsync( |
144 | 160 | () -> { |
145 | | - transactionConsumer.listen(message); |
| 161 | + try { |
| 162 | + transactionConsumer.listen(message); |
| 163 | + } finally { |
| 164 | + // No need to decrement here as we'll do it after all tasks complete or fail |
| 165 | + } |
146 | 166 | }, |
147 | 167 | defaultTaskExecutor)); |
148 | 168 | } |
149 | 169 |
|
| 170 | + boolean batchSuccess = true; |
| 171 | + int failedIndex = -1; |
| 172 | + |
150 | 173 | for (int i = 0; i < transactionSubmissionTasks.size(); i++) { |
151 | 174 | try { |
152 | 175 | transactionSubmissionTasks.get(i).get(); |
153 | 176 | } catch (InterruptedException | ExecutionException e) { |
| 177 | + batchSuccess = false; |
| 178 | + failedIndex = i; |
154 | 179 |
|
155 | 180 | final Throwable cause = e.getCause(); |
156 | 181 |
|
157 | 182 | if (cause instanceof ServiceException) { |
158 | 183 | log.error( |
159 | 184 | "One of the Consumer Record in Async Batch Processor failed with message {}", |
160 | 185 | cause.getMessage()); |
161 | | - throw new BatchListenerFailedException( |
162 | | - "Failed to process a Consumer Record from the Batch", i); |
163 | 186 | } |
164 | 187 |
|
165 | 188 | if (cause instanceof InterruptedException) { |
166 | 189 | throw e; |
167 | 190 | } |
168 | 191 | } |
169 | 192 | } |
| 193 | + |
| 194 | + // Always decrement the counter for all records in the batch |
| 195 | + inFlightRecords.addAndGet(-consumerRecords.size()); |
| 196 | + |
170 | 197 | // If the entire Records were processed successfully, Ack & commit the entire Batch |
171 | | - acknowledgment.acknowledge(); |
| 198 | + if (batchSuccess) { |
| 199 | + acknowledgment.acknowledge(); |
| 200 | + } else { |
| 201 | + throw new BatchListenerFailedException( |
| 202 | + "Failed to process a Consumer Record from the Batch", failedIndex); |
| 203 | + } |
172 | 204 | } |
173 | 205 | }; |
174 | 206 | } |
175 | 207 |
|
176 | 208 | private Object getPerRecordAcknowledgingListener() { |
177 | | - |
178 | 209 | return (AcknowledgingMessageListener<String, String>) |
179 | 210 | (message, acknowledgment) -> { |
180 | | - transactionConsumer.listen(message); |
181 | | - // Manually ack the single Record |
182 | | - acknowledgment.acknowledge(); |
| 211 | + try { |
| 212 | + // Increment counter before processing |
| 213 | + inFlightRecords.incrementAndGet(); |
| 214 | + |
| 215 | + transactionConsumer.listen(message); |
| 216 | + // Manually ack the single Record |
| 217 | + acknowledgment.acknowledge(); |
| 218 | + } finally { |
| 219 | + // Always decrement counter, even if exception occurred |
| 220 | + inFlightRecords.decrementAndGet(); |
| 221 | + } |
183 | 222 | }; |
184 | 223 | } |
| 224 | + |
| 225 | + private void performGracefulShutdown() { |
| 226 | + log.info("Starting graceful shutdown of Kafka consumers"); |
| 227 | + |
| 228 | + // Stop all containers from polling new messages |
| 229 | + if (!CollectionUtils.isEmpty(existingContainers)) { |
| 230 | + existingContainers.forEach( |
| 231 | + container -> { |
| 232 | + log.info("Stopping container: {}", container.metrics().keySet().iterator().next()); |
| 233 | + container.stop(); |
| 234 | + }); |
| 235 | + } |
| 236 | + |
| 237 | + // Wait for in-flight messages to be processed |
| 238 | + log.info( |
| 239 | + "All Kafka containers stopped from polling. Waiting for {} in-flight records to be processed...", |
| 240 | + inFlightRecords.get()); |
| 241 | + |
| 242 | + long startTime = System.currentTimeMillis(); |
| 243 | + |
| 244 | + try { |
| 245 | + while (inFlightRecords.get() > 0 |
| 246 | + && System.currentTimeMillis() - startTime < (shutdownTimeoutInSeconds * 1000L)) { |
| 247 | + log.info("Still waiting for {} records to be acknowledged", inFlightRecords.get()); |
| 248 | + Thread.sleep(500); |
| 249 | + } |
| 250 | + } catch (InterruptedException e) { |
| 251 | + Thread.currentThread().interrupt(); |
| 252 | + log.error("Interrupted during shutdown wait", e); |
| 253 | + } |
| 254 | + |
| 255 | + if (inFlightRecords.get() > 0) { |
| 256 | + log.warn("{} records were not acknowledged before shutdown timeout", inFlightRecords.get()); |
| 257 | + } else { |
| 258 | + log.info("All records successfully processed and acknowledged"); |
| 259 | + } |
| 260 | + |
| 261 | + log.info("Kafka consumer graceful shutdown completed"); |
| 262 | + } |
185 | 263 | } |
0 commit comments