diff --git a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/KafkaBundle.java b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/KafkaBundle.java index 155fab5dec..e97f4cd862 100644 --- a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/KafkaBundle.java +++ b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/KafkaBundle.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; @@ -230,6 +231,11 @@ public Future send(K key, V value) { public Future send(K key, V value, Headers headers) { return null; } + + @Override + public Future send(K key, V value, Headers headers, Callback callback) { + return null; + } }; } diff --git a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducer.java b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducer.java index 1437dff7a8..f3b5dba985 100644 --- a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducer.java +++ b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducer.java @@ -1,14 +1,20 @@ package org.sdase.commons.server.kafka.producer; import java.util.concurrent.Future; +import java.util.function.BiFunction; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Headers; import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaMessageProducer implements MessageProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProducer.class); + private String topic; private KafkaProducer producer; @@ -32,14 +38,21 @@ public KafkaMessageProducer( public Future send(K key, V value) { ProducerRecord record = new ProducerRecord<>(topic, key, value); msgCounter.increase(producerName, record.topic()); - return producer.send(record); + return producer.send(record, callbackProxy.apply(key, null)); } @Override public Future send(K key, V value, Headers headers) { ProducerRecord record = new ProducerRecord<>(topic, null, key, value, headers); msgCounter.increase(producerName, record.topic()); - return producer.send(record); + return producer.send(record, callbackProxy.apply(key, null)); + } + + @Override + public Future send(K key, V value, Headers headers, Callback callback) { + ProducerRecord record = new ProducerRecord<>(topic, null, key, value, headers); + msgCounter.increase(producerName, record.topic()); + return producer.send(record, callbackProxy.apply(key, callback)); } public void close() { @@ -50,4 +63,30 @@ public void close() { public void flush() { producer.flush(); } + + private final BiFunction callbackProxy = + (messageKey, callback) -> + (metadata, exception) -> { + if (exception != null) { + LOGGER.error( + "An error occurred while producing a message with key {} to the topic {}.", + messageKey, + topic, + exception); + } else { + // avoid noise + LOGGER.debug( + "Message with key {} was produced to topic {}: partition {}; offset: {}.", + messageKey, + topic, + metadata.partition(), + metadata.offset()); + } + + if (callback == null) { + return; + } + // delegate + callback.onCompletion(metadata, exception); + }; } diff --git a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/MessageProducer.java b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/MessageProducer.java index 91012b2608..46fa5212aa 100644 --- a/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/MessageProducer.java +++ b/sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/producer/MessageProducer.java @@ -1,6 +1,7 @@ package org.sdase.commons.server.kafka.producer; import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Headers; @@ -23,6 +24,17 @@ public interface MessageProducer { Future send(K key, V value, Headers headers); + /** + * @param key key to send + * @param value value to send + * @param headers headers to send + * @param callback A callback that will be triggered once the request is complete (if the ACK is + * received or an exception is raised). + * @return The result of the send is a {@link RecordMetadata} specifying the partition the record + * was sent to, the offset it was assigned and the timestamp of the record. + */ + Future send(K key, V value, Headers headers, Callback callback); + /** * This method is a blank default implementation in order to avoid it being a breaking change. The * implementing class must override this to add behaviour to it. The implementation should call diff --git a/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/health/ExternalKafkaHealthCheckIT.java b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/health/ExternalKafkaHealthCheckIT.java index 0c45dc3a6d..232d0fea16 100644 --- a/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/health/ExternalKafkaHealthCheckIT.java +++ b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/health/ExternalKafkaHealthCheckIT.java @@ -24,7 +24,6 @@ public class ExternalKafkaHealthCheckIT { @Test public void testHealthCheckIt() throws Exception { - KafkaConfiguration config = new KafkaConfiguration(); config.setBrokers( KAFKA.getKafkaBrokers().stream() diff --git a/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerIT.java b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerIT.java new file mode 100644 index 0000000000..0b798b3093 --- /dev/null +++ b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerIT.java @@ -0,0 +1,107 @@ +package org.sdase.commons.server.kafka.producer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; +import java.util.Arrays; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junitpioneer.jupiter.StdIo; +import org.junitpioneer.jupiter.StdOut; +import org.sdase.commons.server.kafka.KafkaConfiguration; +import org.sdase.commons.server.kafka.KafkaProperties; +import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter; + +class KafkaMessageProducerIT { + + @RegisterExtension + static final SharedKafkaTestResource KAFKA = + new SharedKafkaTestResource() + .withBrokerProperty("offsets.topic.num.partitions", "1") + // a broker with limited accepted message size + .withBrokerProperty("message.max.bytes", "100") + .withBrokerProperty("max.message.bytes", "100"); + + static KafkaMessageProducer testee; + private static final String topicName = "test-topic"; + private static ProducerTopicMessageCounter messageCounter; + private static KafkaProducer producer; + + @BeforeAll + static void beforeAll() { + messageCounter = new ProducerTopicMessageCounter(); + KafkaProperties producerProperties = + KafkaProperties.forProducer( + new KafkaConfiguration() + .setBrokers(Arrays.asList(KAFKA.getKafkaConnectString().split(",")))); + producer = + new KafkaProducer<>(producerProperties, new StringSerializer(), new StringSerializer()); + testee = new KafkaMessageProducer(topicName, producer, messageCounter, "test-producer"); + } + + @AfterAll + static void afterAll() { + messageCounter.unregister(); + producer.close(); + } + + @Test + @StdIo + void shouldLogAndDelegate(StdOut out) { + // given + String statement = "A message has been produced."; + // a dummy callback to run on completion + Callback callback = + (metadata, exception) -> { + System.out.println(statement); + }; + + // when + testee.send( + "k", + "3nC8Vd2BE1ZHEnpozFmE7Jay5uwajvtFHI6lkD06UWW3xwYNkL2y3tYNLfa8AxPj9SdUo4r5XisrWYxVMs8mtrfrCOet3Ble56silErvLtAATxgfaiuSF6lNhoHDkqOc9VgJeuZfcSdxfTQcGWmyY5AxJdk5OQnIxUj3JsKWyoFTzzcTNWRMrO1u5JSIcNJmodhuiCoQ", + null, + callback); + + // then + Awaitility.await() + .untilAsserted( + () -> { + assertThat(out.capturedLines()) + .anyMatch(l -> l.contains(statement)) + .anyMatch( + l -> + l.contains( + "An error occurred while producing a message with key k to the topic test-topic.")); + }); + } + + @Test + @StdIo + void shouldLogErrors(StdOut out) { + // given large test value not supported by the broker + // when + testee.send( + "k", + "3nC8Vd2BE1ZHEnpozFmE7Jay5uwajvtFHI6lkD06UWW3xwYNkL2y3tYNLfa8AxPj9SdUo4r5XisrWYxVMs8mtrfrCOet3Ble56silErvLtAATxgfaiuSF6lNhoHDkqOc9VgJeuZfcSdxfTQcGWmyY5AxJdk5OQnIxUj3JsKWyoFTzzcTNWRMrO1u5JSIcNJmodhuiCoQ"); + + // then + Awaitility.await() + .untilAsserted( + () -> { + assertThat(out.capturedLines()) + .anyMatch( + l -> + l.contains( + "An error occurred while producing a message with key k to the topic test-topic.")) + .anyMatch( + l -> l.contains("org.apache.kafka.common.errors.RecordTooLargeException")); + }); + } +} diff --git a/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerTest.java b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerTest.java index 91be3a37f5..0d58b53761 100644 --- a/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerTest.java +++ b/sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/producer/KafkaMessageProducerTest.java @@ -3,6 +3,7 @@ import static org.mockito.Mockito.*; import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Headers; @@ -42,6 +43,12 @@ public Future send(Object key, Object value) { public Future send(Object key, Object value, Headers headers) { return null; } + + @Override + public Future send( + Object key, Object value, Headers headers, Callback callback) { + return null; + } }; messageProducer.flush(); }