File tree 1 file changed +7
-2
lines changed
services/libs/queue/src/vendors/kafka
1 file changed +7
-2
lines changed Original file line number Diff line number Diff line change @@ -221,8 +221,8 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
221
221
222
222
this . log . trace ( { topic : queueConf . name } , 'Subscribed to topic! Starting the consmer...' )
223
223
await consumer . run ( {
224
- eachMessage : async ( { message } : EachMessagePayload ) => {
225
- if ( this . isAvailable ( maxConcurrentMessageProcessing ) ) {
224
+ eachMessage : async ( { message, topic } : EachMessagePayload ) => {
225
+ if ( message && message . value && this . isAvailable ( maxConcurrentMessageProcessing ) ) {
226
226
const now = performance . now ( )
227
227
228
228
this . log . trace (
@@ -243,6 +243,11 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
243
243
} finally {
244
244
this . removeJob ( )
245
245
}
246
+ } else if (
247
+ this . isAvailable ( maxConcurrentMessageProcessing ) &&
248
+ ( ! message || ! message . value )
249
+ ) {
250
+ this . log . debug ( { message, topic } , 'Received empty message, skipping...' )
246
251
} else {
247
252
this . log . debug ( 'Processor is busy, skipping message...' )
248
253
}
You can’t perform that action at this time.
0 commit comments