Open
Description
I was looking at the code and I see:
/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
}
I see that under reconnection _channel
is undefined.
Imagine that we get 1000 messages from the server and we want to consume them sequentially in a stream:
message<1>.ack() // OK
...reconnection
message<2>.ack() // FALSY OK, no channel set, ignored call to `_channel.ack`
message<3>.ack() // FALSY OK
...connected // Consumer restart
message<2>.ack() // OK
Any message during reconnection will be falsy acked and we can't known when we can stop the stream.
Am I wrong?
Metadata
Assignees
Labels
No labels