Description
I have been using RabbitMQ for some time, and recently there has been multiple issues with stuck messages in my queue.
I have a RabbitMQ container deployed in AWS (not managed), and in my node backend service I am using the npm packages amqplib and amqp-connection-manager to connect to the RabbitMQ service, through a custom module that I created.
Every few weeks, I have a situation where messages are stuck in my queue, and no consumer is up, altough I set it when the backend service initializing.
I do suspect that it is related to the ack logic (dup ack), since I have seen the error code 406 multiple times, but I can't pin it exactly on that.
The only thing I see for sure is that in the stuck messages the redelivered property is true, I am attaching some examples below:
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 191,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 190,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 189,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 188,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 187,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 186,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 185,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
},
{
"payload_bytes": 309,
"redelivered": true,
"exchange": "logExchange",
"routing_key": "data",
"message_count": 184,
"properties": {
"headers": {}
},
"payload": // some message (can't show it),
"payload_encoding": "string"
}
I am attaching the code for the consumer and producer below, plus some images describing the problem.
Producer Class
import {
ContactEvents,
DataEvents,
SocketAgent,
CentralEvents,
ExposureEvents,
} from "./events";
import { exchanges } from "./constants";
import { Logger } from "@private/logger";
import amqp, {
Channel,
ChannelWrapper,
AmqpConnectionManager,
} from "amqp-connection-manager";
import { checkEnv } from "@private/shared-utilities";
import { SocketUser } from "./events/SocketUser";
import { IAMServiceEvents } from "./events/IAMServiceEvents";
import { NotificationEvents } from "./events/notification_events";
const logger = new Logger("AMQP Producer");
const exchangeName = exchanges.log;
export class Producer {
connection: AmqpConnectionManager;
channel: ChannelWrapper;
contact: ContactEvents;
central: CentralEvents;
kbrainData: DataEvents;
socketAgent: SocketAgent;
socketUser: SocketUser;
iamService: IAMServiceEvents;
exposure: ExposureEvents;
notificationEvents: NotificationEvents;
constructor() {
checkEnv(["RABBITMQ_URL"]);
this.connection = amqp.connect(process.env.RABBITMQ_URL);
this.connection.on("connect", ({ connection, url }) => {
logger.logInfo(
`Producer connection established, URL: ${url}, Connection: ${connection.connection.serverProperties}`
);
});
this.connection.on("disconnect", (connectionError) => {
logger.logError(
`Producer connection disconnected: ${JSON.stringify(
connectionError.err
)}`
);
});
this.connection.on("connectFailed", (error) => {
logger.logError(
`Connection to ${error.url} failed: ${JSON.stringify(error.err)}`
);
});
this.connection.on("error", (error, info) => {
logger.logError(`Error in producer connection: ${JSON.stringify(error)}`);
});
this.connection.on("blocked", ({ reason }) => {
logger.logError(`Producer connection blocked: ${JSON.stringify(reason)}`);
});
this.connection.on("unblocked", () => {
logger.logError(`Producer connection unblocked`);
});
this.channel = this.connection.createChannel({
json: true,
setup: (channel: Channel) => {
return channel.assertExchange(exchangeName, "direct");
},
});
this.channel.on("connect", () => {
logger.logInfo("Producer channel established");
});
this.channel.on("error", (error, info) => {
logger.logError(
`Error in producer channel: ${JSON.stringify(
error
)}, Info: ${JSON.stringify(info)}`
);
});
this.channel.on("close", () => {
logger.logInfo("Producer channel closed");
});
this.contact = new ContactEvents(this);
this.central = new CentralEvents(this);
this.kbrainData = new DataEvents(this);
this.socketAgent = new SocketAgent(this);
this.socketUser = new SocketUser(this);
this.iamService = new IAMServiceEvents(this);
this.exposure = new ExposureEvents(this);
this.notificationEvents = new NotificationEvents(this);
}
async publishMessage(routingKey: string, message: Object) {
try {
await this.channel.publish(exchangeName, routingKey, {
logType: routingKey,
message: message,
dateTime: new Date(),
});
logger.logInfo(
`The message ${JSON.stringify(
message
)} is send to exchange ${exchangeName} for queue ${routingKey}`
);
} catch (error) {
logger.logError(
`Error: ${error}, while publishing message: ${JSON.stringify(message)}`
);
}
}
public checkConnection() {
return this.connection.isConnected();
}
}
export const producerInstance = new Producer();
Consumer Class
import { Logger } from "@private/logger";
import { exchanges } from "./constants";
import amqp, {
AmqpConnectionManager,
Channel,
ChannelWrapper,
} from "amqp-connection-manager";
import { ConsumeMessage } from "amqplib";
import { checkEnv } from "@private/shared-utilities";
const logger = new Logger("AMQP Consumer");
const exchangeName = exchanges.log;
export class Consumer {
private static instance: Consumer | null = null;
private connection: AmqpConnectionManager;
private channel: ChannelWrapper;
private handleMessageCB: Function;
private constructor(queueName: string, callback: Function) {
checkEnv(["RABBITMQ_URL"]);
this.connection = amqp.connect(process.env.RABBITMQ_URL);
this.setupConnectionListeners();
this.handleMessageCB = callback;
this.channel = this.setupChannel(queueName);
}
public static getInstance(queueName: string, callback: Function): Consumer {
if (!Consumer.instance) {
Consumer.instance = new Consumer(queueName, callback);
}
return Consumer.instance;
}
public isConnectionAlive(): boolean {
return this.connection.isConnected();
}
private setupConnectionListeners(): void {
this.connection.on("connect", ({ connection, url }) => {
logger.logInfo(
`Consumer connection established, URL: ${url}, Connection: ${connection.connection.serverProperties}`
);
});
this.connection.on("disconnect", (connectionError) => {
logger.logError(
`Consumer connection disconnected: ${connectionError.err}`
);
});
this.connection.on("connectFailed", (error) => {
logger.logError(`Connection to ${error.url} failed: ${error.err}`);
});
this.connection.on("error", (error) => {
logger.logError(`Error in consumer connection: ${error}`);
});
this.connection.on("blocked", ({ reason }) => {
logger.logError(`Consumer connection blocked: ${reason}`);
});
this.connection.on("unblocked", () => {
logger.logError(`Consumer connection unblocked`);
});
}
private setupChannel(queueName: string): ChannelWrapper {
const channel = this.connection.createChannel({
json: true,
setup: async (channel: Channel) => {
try {
logger.logInfo(`Asserting queue: ${queueName}`);
await channel.assertQueue(queueName, {
durable: true,
exclusive: false,
});
logger.logInfo(`Asserting exchange: ${exchangeName}`);
await channel.assertExchange(exchangeName, "direct");
logger.logInfo(
`Binding queue: ${queueName} to exchange: ${exchangeName}`
);
await channel.bindQueue(queueName, exchangeName, queueName);
logger.logInfo(`Starting consumer for queue: ${queueName}`);
await channel.consume(queueName, this.onMessage);
} catch (error) {
logger.logError(`Error setting up channel: ${error}`);
}
},
});
channel.on("connect", () => {
logger.logInfo("Consumer channel established");
});
channel.on("error", (error, info) => {
logger.logError(`Error in consumer channel: ${error}, Info: ${info}`);
});
channel.on("close", () => {
logger.logInfo("Consumer channel closed");
});
return channel;
}
private onMessage = async (data: ConsumeMessage | null) => {
logger.logInfo(
`Consumer channel received message: ${
data ? data.content.toString() : "null"
}`
);
if (!data) return logger.logError("Empty message");
let message = JSON.parse(data.content.toString()).message;
logger.logInfo("Consumer got event: " + message.eventType);
try {
const message = JSON.parse(data.content.toString()).message;
logger.logInfo(`Consumer got event: ${message.eventType}`);
if (this.handleMessageCB) {
await this.handleMessageCB(message);
this.channel?.ack(data);
logger.logInfo("Message acknowledged");
} else {
logger.logError("Message handler callback is not set");
}
} catch (e) {
logger.logError(
`Error handling message: ${
message.eventType
} in consumer: ${JSON.stringify(e)}`
);
}
};
}
In my backend service I use the module as such:
import { BadRequestError } from "@private/errors";
import { Consumer, queues } from "@private/event-bus";
import { handleAttackDataEvents } from "./events";
export let consumer: Consumer | null = null;
const initAMQP = async () => {
try {
consumer = Consumer.getInstance(queues.DATA, handleAttackDataEvents);
} catch (error) {
if (error instanceof Error) {
throw new BadRequestError(error.message);
}
}
};
export { initAMQP };
const main = async () => {
try {
// ... prev code
await initAMQP();
} catch (error) {
if (error instanceof Error) {
return logger.logError(error.message);
}
}
};
I have tried to replicate the problem with overloading the service, to no success, disconnecting the channel and the connection to the amqp, but it reconnects just fine.
Any help would be appreciated!!