Skip to content

Commit 9d08082

Browse files
authored
Merge pull request #171 from companieshouse/feature/JU-89-java-21-migration
custom kafka producer implementation
2 parents 6d97e9b + 02facff commit 9d08082

File tree

4 files changed

+107
-10
lines changed

4 files changed

+107
-10
lines changed

pom.xml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,17 @@
6161
<type>pom</type>
6262
<scope>import</scope>
6363
</dependency>
64+
<dependency>
65+
<groupId>org.testcontainers</groupId>
66+
<artifactId>testcontainers-bom</artifactId>
67+
<version>${test-containers.version}</version>
68+
<scope>import</scope>
69+
<type>pom</type>
70+
</dependency>
6471
</dependencies>
6572
</dependencyManagement>
6673

6774
<dependencies>
68-
<dependency>
69-
<groupId>org.testcontainers</groupId>
70-
<artifactId>testcontainers-bom</artifactId>
71-
<version>${test-containers.version}</version>
72-
<type>pom</type>
73-
<scope>import</scope>
74-
</dependency>
7575
<dependency>
7676
<groupId>org.springframework.boot</groupId>
7777
<artifactId>spring-boot-starter-actuator</artifactId>
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package uk.gov.companieshouse.orders.api.kafka;
2+
3+
import org.apache.kafka.clients.producer.KafkaProducer;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
import org.apache.kafka.clients.producer.RecordMetadata;
6+
import uk.gov.companieshouse.kafka.message.Message;
7+
import uk.gov.companieshouse.kafka.producer.ProducerConfig;
8+
import uk.gov.companieshouse.kafka.producer.factory.KafkaProducerFactory;
9+
10+
import java.util.Properties;
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.Future;
13+
14+
public class OrdersApiKafkaProducer {
15+
16+
private KafkaProducer<String, byte[]> kafkaProducer;
17+
18+
/**
19+
* Instantiate the necessary configuration for producing and a producer for Kafka.
20+
* The {@link KafkaProducerFactory} will be used to create the producer.
21+
*
22+
* @param config Configuration for the producer
23+
*/
24+
public OrdersApiKafkaProducer(ProducerConfig config) {
25+
this(config, new KafkaProducerFactory());
26+
}
27+
28+
/**
29+
* Instantiate the necessary configuration for producing and a producer for Kafka.
30+
*
31+
* @param config Configuration for the producer
32+
* @param producerFactory Factory to create the producer
33+
*/
34+
public OrdersApiKafkaProducer(ProducerConfig config, KafkaProducerFactory producerFactory) {
35+
Properties props = new Properties();
36+
37+
props.put("bootstrap.servers", String.join(",", config.getBrokerAddresses()));
38+
props.put("acks", config.getAcks().getCode());
39+
props.put("key.serializer", config.getKeySerializer());
40+
props.put("value.serializer", config.getValueSerializer());
41+
props.put("retries", config.getRetries());
42+
props.put("max.block.ms", config.getMaxBlockMilliseconds());
43+
props.put("request.timeout.ms", config.getRequestTimeoutMilliseconds());
44+
props.put("enable.idempotence", false);
45+
46+
if (config.isRoundRobinPartitioner()) {
47+
props.put("partition.assignment.strategy", "roundrobin");
48+
}
49+
50+
kafkaProducer = producerFactory.getProducer(props);
51+
}
52+
53+
/**
54+
* Send a message to a topic in Kafka.
55+
*
56+
* The data in the message is sent to Kafka in a byte array
57+
* to avoid unrecognised encoding characters which can occur
58+
* when Strings are used with Kafka.
59+
*
60+
*/
61+
public void send(Message msg) throws ExecutionException, InterruptedException {
62+
63+
ProducerRecord<String, byte[]> record = getProducerRecordFromMessage(msg);
64+
65+
kafkaProducer.send(record).get();
66+
}
67+
68+
/**
69+
* Send a message to a topic in Kafka, and return a {@link Future < RecordMetadata >} for processing manually.
70+
*
71+
* The data in the message is sent to Kafka in a byte array
72+
* to avoid unrecognised encoding characters which can occur
73+
* when Strings are used with Kafka.
74+
*
75+
*/
76+
public Future<RecordMetadata> sendAndReturnFuture(Message msg) {
77+
78+
ProducerRecord<String, byte[]> record = getProducerRecordFromMessage(msg);
79+
80+
return kafkaProducer.send(record);
81+
}
82+
83+
private ProducerRecord<String, byte[]> getProducerRecordFromMessage(Message msg) {
84+
85+
return new ProducerRecord<>(
86+
msg.getTopic(),
87+
msg.getPartition(),
88+
msg.getTimestamp(),
89+
msg.getKey(),
90+
msg.getValue()
91+
);
92+
}
93+
94+
public void close() {
95+
kafkaProducer.close();
96+
}
97+
98+
}

src/main/java/uk/gov/companieshouse/orders/api/kafka/OrdersKafkaProducer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
@Service
2525
public class OrdersKafkaProducer implements InitializingBean {
2626
private static final Logger LOGGER = LoggerFactory.getLogger(APPLICATION_NAME_SPACE);
27-
private CHKafkaProducer chKafkaProducer;
27+
private OrdersApiKafkaProducer chKafkaProducer;
2828
@Value("${spring.kafka.producer.bootstrap-servers}")
2929
private String brokerAddresses;
3030

@@ -61,6 +61,6 @@ public void afterPropertiesSet() {
6161
config.setAcks(Acks.WAIT_FOR_ALL);
6262
config.setRetries(10);
6363
config.setMaxBlockMilliseconds(10000);
64-
chKafkaProducer = new CHKafkaProducer(config);
64+
chKafkaProducer = new OrdersApiKafkaProducer(config);
6565
}
6666
}

src/main/resources/application.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ spring.data.mongodb.uri=${MONGODB_URL}
1616
spring.data.mongodb.field-naming-strategy=uk.gov.companieshouse.orders.api.model.NoIsSnakeCaseFieldNamingStrategy
1717

1818
spring.kafka.producer.bootstrap-servers: ${KAFKA_BROKER_ADDR}
19-
spring.kafka.producer.properties.enable.idempotence=false
2019

2120
feature.options.ordersSearchEndpointEnabled=${ORDERS_SEARCH_ENDPOINT_ENABLED:false}
2221
feature.options.multiItemBasketSearchEnabled=${ORDERS_SEARCH_MULTIBASKET_ENABLED:false}

0 commit comments

Comments
 (0)