Skip to content

Commit 8c547e8

Browse files
authored
Always check if producer is initialized (#2915)
1 parent af917a5 commit 8c547e8

File tree

1 file changed

+16
-6
lines changed
  • services/libs/queue/src/vendors/kafka

1 file changed

+16
-6
lines changed

services/libs/queue/src/vendors/kafka/client.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
2828
private consumers: Map<string, Consumer>
2929
private processingMessages: number
3030
private started: boolean
31-
private producer: Producer
31+
private producer?: Producer | undefined = undefined
3232

3333
public constructor(
3434
public readonly client: Kafka,
@@ -42,9 +42,18 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
4242
this.consumers = new Map<string, Consumer>()
4343
this.reconnectAttempts = new Map<string, number>()
4444
this.consumerStatus = new Map<string, boolean>()
45+
}
46+
47+
async getProducer(): Promise<Producer> {
48+
if (!this.producer) {
49+
const producer = this.client.producer()
50+
await producer.connect()
51+
this.producer = producer
52+
}
4553

46-
this.producer = this.client.producer()
54+
return this.producer
4755
}
56+
4857
async getQueueMessageCount(conf: IKafkaChannelConfig): Promise<number> {
4958
const groupId = conf.name
5059
const topic = conf.name
@@ -89,8 +98,9 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
8998
service: SERVICE,
9099
})
91100

101+
const producer = await this.getProducer()
92102
// send message to kafka
93-
const result = await this.producer.send({
103+
const result = await producer.send({
94104
topic: channel.name,
95105
messages: [
96106
{
@@ -227,8 +237,6 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
227237
const admin = this.client.admin()
228238
await admin.connect()
229239

230-
await this.producer.connect()
231-
232240
let partitionCount
233241

234242
if (level && config.partitions[level]) {
@@ -337,7 +345,9 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
337345
})
338346
}
339347

340-
const result = await this.producer.send({
348+
const producer = await this.getProducer()
349+
350+
const result = await producer.send({
341351
topic: channel.name,
342352
messages: messages.map((m) => ({
343353
value: JSON.stringify(m.payload),

0 commit comments

Comments
 (0)