|
1 | 1 | package it.gov.pagopa.common.kafka; |
2 | 2 |
|
3 | | -import ch.qos.logback.classic.LoggerContext; |
4 | | -import ch.qos.logback.classic.spi.ILoggingEvent; |
5 | 3 | import com.fasterxml.jackson.core.JsonProcessingException; |
6 | 4 | import com.fasterxml.jackson.databind.ObjectMapper; |
7 | 5 | import it.gov.pagopa.common.kafka.utils.KafkaConstants; |
8 | | -import it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer; |
9 | | -import it.gov.pagopa.common.utils.MemoryAppender; |
10 | 6 | import it.gov.pagopa.common.utils.TestUtils; |
| 7 | +import org.apache.kafka.clients.admin.AdminClient; |
11 | 8 | import org.apache.kafka.clients.admin.OffsetSpec; |
12 | 9 | import org.apache.kafka.clients.admin.RecordsToDelete; |
13 | 10 | import org.apache.kafka.clients.admin.TopicListing; |
|
20 | 17 | import org.apache.kafka.common.serialization.ByteArraySerializer; |
21 | 18 | import org.apache.kafka.common.serialization.StringDeserializer; |
22 | 19 | import org.junit.jupiter.api.Assertions; |
23 | | -import org.slf4j.LoggerFactory; |
24 | 20 | import org.springframework.beans.factory.annotation.Autowired; |
25 | 21 | import org.springframework.beans.factory.annotation.Value; |
26 | 22 | import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; |
@@ -83,28 +79,30 @@ public KafkaTemplate<byte[], byte[]> testPublisher(ProducerFactory<byte[], byte[ |
83 | 79 |
|
84 | 80 | @AfterTestClass |
85 | 81 | void clearTopics() { |
86 | | - kafkaBroker.doWithAdmin(admin -> { |
87 | | - try { |
88 | | - Collection<TopicListing> topics = admin.listTopics().listings().get(); |
89 | | - admin.deleteRecords( |
90 | | - admin.listOffsets( |
91 | | - topics.stream() |
92 | | - .filter(topicListing -> !topicListing.isInternal()) |
93 | | - .flatMap(t -> IntStream.range(0, kafkaBroker.getPartitionsPerTopic()) |
94 | | - .boxed() |
95 | | - .map(p -> new TopicPartition(t.name(), p))) |
96 | | - .collect(Collectors.toMap(tp -> tp, |
97 | | - tp -> OffsetSpec.latest())) |
98 | | - ).all().get().entrySet().stream() |
99 | | - .filter(e -> e.getValue().offset() > 0) |
100 | | - .collect(Collectors.toMap(Map.Entry::getKey, |
101 | | - e -> RecordsToDelete.beforeOffset(e.getValue().offset())))) |
102 | | - .all().get(); |
103 | | - |
104 | | - } catch (InterruptedException | ExecutionException e) { |
105 | | - throw new IllegalStateException("Something gone wrong while emptying topics", e); |
106 | | - } |
107 | | - }); |
| 82 | + Map<String, Object> brokerConfig = Map.of( |
| 83 | + "bootstrap.servers", kafkaBroker.getBrokersAsString() |
| 84 | + ); |
| 85 | + |
| 86 | + try (AdminClient admin = AdminClient.create(brokerConfig)) { |
| 87 | + Collection<TopicListing> topics = admin.listTopics().listings().get(); |
| 88 | + admin.deleteRecords( |
| 89 | + admin.listOffsets( |
| 90 | + topics.stream() |
| 91 | + .filter(topicListing -> !topicListing.isInternal()) |
| 92 | + .flatMap(t -> IntStream.range(0, kafkaBroker.getPartitionsPerTopic()) |
| 93 | + .boxed() |
| 94 | + .map(p -> new TopicPartition(t.name(), p))) |
| 95 | + .collect(Collectors.toMap(tp -> tp, |
| 96 | + tp -> OffsetSpec.latest())) |
| 97 | + ).all().get().entrySet().stream() |
| 98 | + .filter(e -> e.getValue().offset() > 0) |
| 99 | + .collect(Collectors.toMap(Map.Entry::getKey, |
| 100 | + e -> RecordsToDelete.beforeOffset(e.getValue().offset())))) |
| 101 | + .all().get(); |
| 102 | + |
| 103 | + } catch (InterruptedException | ExecutionException e) { |
| 104 | + throw new IllegalStateException("Something went wrong while clearing topics", e); |
| 105 | + } |
108 | 106 | } |
109 | 107 |
|
110 | 108 | /** It will return usefull URLs related to embedded kafka */ |
@@ -223,10 +221,10 @@ public void publishIntoEmbeddedKafka(String topic, Integer partition, Iterable<H |
223 | 221 | headers = Stream.concat( |
224 | 222 | Arrays.stream(additionalHeaders), |
225 | 223 | StreamSupport.stream(headers.spliterator(), false)) |
226 | | - .collect(Collectors.toList()); |
| 224 | + .toList(); |
227 | 225 | } |
228 | | - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(StandardCharsets.UTF_8), payload.getBytes(StandardCharsets.UTF_8), headers); |
229 | | - template.send(record); |
| 226 | + ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(StandardCharsets.UTF_8), payload.getBytes(StandardCharsets.UTF_8), headers); |
| 227 | + template.send(rec); |
230 | 228 | } |
231 | 229 | //endregion |
232 | 230 |
|
@@ -279,55 +277,16 @@ public Map<TopicPartition, Long> checkPublishedOffsets(String topic, long expect |
279 | 277 | } |
280 | 278 | //endregion |
281 | 279 |
|
282 | | -//region check commit by logs |
283 | | - protected MemoryAppender commitLogMemoryAppender; |
284 | | - /** To be called before each test in order to perform the asserts on {@link #assertCommitOrder(String, int)} */ |
285 | | - public void setupCommitLogMemoryAppender() { |
286 | | - ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BaseKafkaConsumer.class.getName()); |
287 | | - commitLogMemoryAppender = new MemoryAppender(); |
288 | | - commitLogMemoryAppender.setContext((LoggerContext) LoggerFactory.getILoggerFactory()); |
289 | | - logger.setLevel(ch.qos.logback.classic.Level.INFO); |
290 | | - logger.addAppender(commitLogMemoryAppender); |
291 | | - commitLogMemoryAppender.start(); |
292 | | - } |
293 | | - |
294 | | - private final Pattern partitionCommitsPattern = Pattern.compile("partition (\\d+): (\\d+) - (\\d+)"); |
295 | | - /** It will assert the right offset commit and the total messages by the provided {@link BaseKafkaConsumer#getFlowName()}.<br /> |
296 | | - * In order to be used, you have to call {@link #setupCommitLogMemoryAppender()} before each test */ |
297 | | - public void assertCommitOrder(String flowName, int totalSendMessages) { |
298 | | - Map<Integer, Integer> partition2last = new HashMap<>(Map.of(0, -1, 1, -1)); |
299 | | - for (ILoggingEvent loggedEvent : commitLogMemoryAppender.getLoggedEvents()) { |
300 | | - if(loggedEvent.getMessage().equals("[KAFKA_COMMIT][{}] Committing {} messages: {}") && flowName.equals(loggedEvent.getArgumentArray()[0])){ |
301 | | - Arrays.stream(((String)loggedEvent.getArgumentArray()[2]).split(";")) |
302 | | - .forEach(s -> { |
303 | | - Matcher matcher = partitionCommitsPattern.matcher(s); |
304 | | - Assertions.assertTrue(matcher.matches(), "Unexpected partition commit string: " + s); |
305 | | - int partition = Integer.parseInt(matcher.group(1)); |
306 | | - int startOffset = Integer.parseInt(matcher.group(2)); |
307 | | - int endOffset = Integer.parseInt(matcher.group(3)); |
308 | | - Assertions.assertTrue(endOffset>=startOffset, "EndOffset less than StartOffset!: " + s); |
309 | | - |
310 | | - Integer lastCommittedOffset = partition2last.get(partition); |
311 | | - Assertions.assertEquals(lastCommittedOffset, startOffset-1); |
312 | | - partition2last.put(partition, endOffset); |
313 | | - }); |
314 | | - } |
315 | | - } |
316 | | - |
317 | | - Assertions.assertEquals(totalSendMessages, partition2last.values().stream().mapToInt(x->x+1).sum()); |
318 | | - } |
319 | | -//endregion |
320 | | - |
321 | 280 | //region error topic |
322 | 281 | public void checkErrorsPublished(String topicErrors, Pattern errorUseCaseIdPatternMatch, int expectedErrorMessagesNumber, long maxWaitingMs, List<Pair<Supplier<String>, java.util.function.Consumer<ConsumerRecord<String, String>>>> errorUseCases) { |
323 | 282 | final List<ConsumerRecord<String, String>> errors = consumeMessages(topicErrors, expectedErrorMessagesNumber, maxWaitingMs); |
324 | | - for (final ConsumerRecord<String, String> record : errors) { |
325 | | - final Matcher matcher = errorUseCaseIdPatternMatch.matcher(record.value()); |
| 283 | + for (final ConsumerRecord<String, String> rec : errors) { |
| 284 | + final Matcher matcher = errorUseCaseIdPatternMatch.matcher(rec.value()); |
326 | 285 | int useCaseId = matcher.find() ? Integer.parseInt(matcher.group(1)) : -1; |
327 | 286 | if (useCaseId == -1) { |
328 | | - throw new IllegalStateException("UseCaseId not recognized! " + record.value()); |
| 287 | + throw new IllegalStateException("UseCaseId not recognized! " + rec.value()); |
329 | 288 | } |
330 | | - errorUseCases.get(useCaseId).getSecond().accept(record); |
| 289 | + errorUseCases.get(useCaseId).getSecond().accept(rec); |
331 | 290 | } |
332 | 291 | } |
333 | 292 |
|
|
0 commit comments