Skip to content

feat: provides api to register a callbacks #2055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -230,6 +231,11 @@ public Future<RecordMetadata> send(K key, V value) {
public Future<RecordMetadata> send(K key, V value, Headers headers) {
return null;
}

@Override
public Future<RecordMetadata> send(K key, V value, Headers headers, Callback callback) {
return null;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package org.sdase.commons.server.kafka.producer;

import java.util.concurrent.Future;
import java.util.function.BiFunction;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageProducer<K, V> implements MessageProducer<K, V> {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProducer.class);

private String topic;

private KafkaProducer<K, V> producer;
Expand All @@ -32,14 +38,21 @@ public KafkaMessageProducer(
public Future<RecordMetadata> send(K key, V value) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
msgCounter.increase(producerName, record.topic());
return producer.send(record);
return producer.send(record, callbackProxy.apply(key, null));
}

@Override
public Future<RecordMetadata> send(K key, V value, Headers headers) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, null, key, value, headers);
msgCounter.increase(producerName, record.topic());
return producer.send(record);
return producer.send(record, callbackProxy.apply(key, null));
}

@Override
public Future<RecordMetadata> send(K key, V value, Headers headers, Callback callback) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, null, key, value, headers);
msgCounter.increase(producerName, record.topic());
return producer.send(record, callbackProxy.apply(key, callback));
}

public void close() {
Expand All @@ -50,4 +63,30 @@ public void close() {
public void flush() {
producer.flush();
}

private final BiFunction<K, Callback, Callback> callbackProxy =
(messageKey, callback) ->
(metadata, exception) -> {
if (exception != null) {
LOGGER.error(
"An error occurred while producing a message with key {} to the topic {}.",
messageKey,
topic,
exception);
} else {
// avoid noise
LOGGER.debug(
"Message with key {} was produced to topic {}: partition {}; offset: {}.",
messageKey,
topic,
metadata.partition(),
metadata.offset());
}

if (callback == null) {
return;
}
// delegate
callback.onCompletion(metadata, exception);
};
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.sdase.commons.server.kafka.producer;

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

Expand All @@ -23,6 +24,17 @@ public interface MessageProducer<K, V> {

Future<RecordMetadata> send(K key, V value, Headers headers);

/**
* @param key key to send
* @param value value to send
* @param headers headers to send
* @param callback A callback that will be triggered once the request is complete (if the ACK is
* received or an exception is raised).
* @return The result of the send is a {@link RecordMetadata} specifying the partition the record
* was sent to, the offset it was assigned and the timestamp of the record.
*/
Future<RecordMetadata> send(K key, V value, Headers headers, Callback callback);

/**
* This method is a blank default implementation in order to avoid it being a breaking change. The
* implementing class must override this to add behaviour to it. The implementation should call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class ExternalKafkaHealthCheckIT {

@Test
public void testHealthCheckIt() throws Exception {

KafkaConfiguration config = new KafkaConfiguration();
config.setBrokers(
KAFKA.getKafkaBrokers().stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.sdase.commons.server.kafka.producer;

import static org.assertj.core.api.Assertions.assertThat;

import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
import java.util.Arrays;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junitpioneer.jupiter.StdIo;
import org.junitpioneer.jupiter.StdOut;
import org.sdase.commons.server.kafka.KafkaConfiguration;
import org.sdase.commons.server.kafka.KafkaProperties;
import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter;

class KafkaMessageProducerIT {

@RegisterExtension
static final SharedKafkaTestResource KAFKA =
new SharedKafkaTestResource()
.withBrokerProperty("offsets.topic.num.partitions", "1")
// a broker with limited accepted message size
.withBrokerProperty("message.max.bytes", "100")
.withBrokerProperty("max.message.bytes", "100");

static KafkaMessageProducer<String, String> testee;
private static final String topicName = "test-topic";
private static ProducerTopicMessageCounter messageCounter;
private static KafkaProducer<String, String> producer;

@BeforeAll
static void beforeAll() {
messageCounter = new ProducerTopicMessageCounter();
KafkaProperties producerProperties =
KafkaProperties.forProducer(
new KafkaConfiguration()
.setBrokers(Arrays.asList(KAFKA.getKafkaConnectString().split(","))));
producer =
new KafkaProducer<>(producerProperties, new StringSerializer(), new StringSerializer());
testee = new KafkaMessageProducer(topicName, producer, messageCounter, "test-producer");
}

@AfterAll
static void afterAll() {
messageCounter.unregister();
producer.close();
}

@Test
@StdIo
void shouldLogAndDelegate(StdOut out) {
// given
String statement = "A message has been produced.";
// a dummy callback to run on completion
Callback callback =
(metadata, exception) -> {
System.out.println(statement);
};

// when
testee.send(
"k",
"3nC8Vd2BE1ZHEnpozFmE7Jay5uwajvtFHI6lkD06UWW3xwYNkL2y3tYNLfa8AxPj9SdUo4r5XisrWYxVMs8mtrfrCOet3Ble56silErvLtAATxgfaiuSF6lNhoHDkqOc9VgJeuZfcSdxfTQcGWmyY5AxJdk5OQnIxUj3JsKWyoFTzzcTNWRMrO1u5JSIcNJmodhuiCoQ",
null,
callback);

// then
Awaitility.await()
.untilAsserted(
() -> {
assertThat(out.capturedLines())
.anyMatch(l -> l.contains(statement))
.anyMatch(
l ->
l.contains(
"An error occurred while producing a message with key k to the topic test-topic."));
Comment on lines +76 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above it is documented, that the callback is called when the message is sent. In my understanding that means success, but here you verify that the callback is called and that an error occurred. That seems to be different to the documentation. Can you explain that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the documentation is not accurate, this should be triggered in both cases "when the request is complete".
Changed

});
}

@Test
@StdIo
void shouldLogErrors(StdOut out) {
// given large test value not supported by the broker
// when
testee.send(
"k",
"3nC8Vd2BE1ZHEnpozFmE7Jay5uwajvtFHI6lkD06UWW3xwYNkL2y3tYNLfa8AxPj9SdUo4r5XisrWYxVMs8mtrfrCOet3Ble56silErvLtAATxgfaiuSF6lNhoHDkqOc9VgJeuZfcSdxfTQcGWmyY5AxJdk5OQnIxUj3JsKWyoFTzzcTNWRMrO1u5JSIcNJmodhuiCoQ");

// then
Awaitility.await()
.untilAsserted(
() -> {
assertThat(out.capturedLines())
.anyMatch(
l ->
l.contains(
"An error occurred while producing a message with key k to the topic test-topic."))
.anyMatch(
l -> l.contains("org.apache.kafka.common.errors.RecordTooLargeException"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.mockito.Mockito.*;

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -42,6 +43,12 @@ public Future<RecordMetadata> send(Object key, Object value) {
public Future<RecordMetadata> send(Object key, Object value, Headers headers) {
return null;
}

@Override
public Future<RecordMetadata> send(
Object key, Object value, Headers headers, Callback callback) {
return null;
}
};
messageProducer.flush();
}
Expand Down