Skip to content

Panic on concurrent queue declaration #145

@ShaddyDC

Description

@ShaddyDC

We recently ran into an issue where some of our tests using amqprs 1.7.0 would fail. I honestly don't know why they worked before when we didn't change anything relevant, but since about 5 days ago we would get a panic when declaring a queue repeatedly in a short time frame.
I managed to create this minimum sample from the pub-sub example:

use amqprs::{
    callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
    channel::{
        BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
    },
    connection::{Connection, OpenConnectionArguments},
    consumer::DefaultConsumer,
    BasicProperties,
};
use futures_util::future::join_all;
use tokio::time;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
    // construct a subscriber that prints formatted traces to stdout
    // global subscriber with log level according to RUST_LOG
    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .try_init()
        .ok();

    // open a connection to RabbitMQ server
    let connection = Connection::open(&OpenConnectionArguments::new(
        "localhost", 5672, "guest", "guest",
    ))
    .await
    .unwrap();
    connection
        .register_callback(DefaultConnectionCallback)
        .await
        .unwrap();

    // open a channel on the connection
    let channel = connection.open_channel(None).await.unwrap();
    channel
        .register_callback(DefaultChannelCallback)
        .await
        .unwrap();

    // declare a durable queue
    let futures = (0..2).map(|_| {
        channel.queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
    });

    let results = join_all(futures).await;

    for result in results {
        result.unwrap().unwrap();
    }
    let (queue_name, _, _) = channel
        .queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
        .await
        .unwrap()
        .unwrap();

    // bind the queue to exchange
    let routing_key = "amqprs.example";
    let exchange_name = "amq.topic";
    channel
        .queue_bind(QueueBindArguments::new(
            &queue_name,
            exchange_name,
            routing_key,
        ))
        .await
        .unwrap();

    //////////////////////////////////////////////////////////////////////////////
    // start consumer with given name
    let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub");

    channel
        .basic_consume(DefaultConsumer::new(args.no_ack), args)
        .await
        .unwrap();

    //////////////////////////////////////////////////////////////////////////////
    // publish message
    let content = String::from(
        r#"
            {
                "publisher": "example"
                "data": "Hello, amqprs!"
            }
        "#,
    )
    .into_bytes();

    // create arguments for basic_publish
    let args = BasicPublishArguments::new(exchange_name, routing_key);

    channel
        .basic_publish(BasicProperties::default(), content, args)
        .await
        .unwrap();

    // keep the `channel` and `connection` object from dropping before pub/sub is done.
    // channel/connection will be closed when drop.
    time::sleep(time::Duration::from_secs(1)).await;
    // explicitly close

    channel.close().await.unwrap();
    connection.close().await.unwrap();
}

The log looks like this:

2024-09-04T10:21:14.226083Z TRACE amqprs::net::split_connection: 521 bytes read from network
2024-09-04T10:21:14.226184Z TRACE amqprs::net::split_connection: RECV on channel 0: Start(MethodHeader { class_id: 10, method_id: 10 }, Start { version_major: 0, version_minor: 9, server_properties: FieldTable(476, {ShortStr(12, "capabilities"): F(FieldTable(199, {ShortStr(18, "publisher_confirms"): t(true), ShortStr(28, "authentication_failure_close"): t(true), ShortStr(18, "connection.blocked"): t(true), ShortStr(22, "consumer_cancel_notify"): t(true), ShortStr(15, "direct_reply_to"): t(true), ShortStr(26, "exchange_exchange_bindings"): t(true), ShortStr(16, "per_consumer_qos"): t(true), ShortStr(19, "consumer_priorities"): t(true), ShortStr(10, "basic.nack"): t(true)})), ShortStr(12, "cluster_name"): S(LongStr(19, "rabbit@f65d2c47302b")), ShortStr(9, "copyright"): S(LongStr(60, "Copyright (c) 2007-2024 Broadcom Inc and/or its subsidiaries")), ShortStr(11, "information"): S(LongStr(57, "Licensed under the MPL 2.0. Website: https://rabbitmq.com")), ShortStr(8, "platform"): S(LongStr(19, "Erlang/OTP 26.2.5.2")), ShortStr(7, "product"): S(LongStr(8, "RabbitMQ")), ShortStr(7, "version"): S(LongStr(6, "3.13.7"))}), mechanisms: LongStr(14, "PLAIN AMQPLAIN"), locales: LongStr(5, "en_US") })
2024-09-04T10:21:14.226245Z TRACE amqprs::net::split_connection: SENT on channel 0: StartOk(MethodHeader { class_id: 10, method_id: 11 }, StartOk { client_properties: FieldTable(142, {ShortStr(7, "product"): S(LongStr(6, "AMQPRS")), ShortStr(8, "platform"): S(LongStr(4, "Rust")), ShortStr(15, "connection_name"): S(LongStr(25, "AMQPRS000@localhost:5672/")), ShortStr(7, "version"): S(LongStr(3, "0.1")), ShortStr(12, "capabilities"): F(FieldTable(25, {ShortStr(22, "consumer_cancel_notify"): t(true)}))}), machanisms: ShortStr(5, "PLAIN"), response: LongStr(12, "\0guest\0guest"), locale: ShortStr(5, "en_US") })
2024-09-04T10:21:14.226615Z TRACE amqprs::net::split_connection: 20 bytes read from network
2024-09-04T10:21:14.226627Z TRACE amqprs::net::split_connection: RECV on channel 0: Tune(MethodHeader { class_id: 10, method_id: 30 }, Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-09-04T10:21:14.226637Z TRACE amqprs::net::split_connection: SENT on channel 0: TuneOk(MethodHeader { class_id: 10, method_id: 31 }, TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-09-04T10:21:14.226665Z TRACE amqprs::net::split_connection: SENT on channel 0: Open(MethodHeader { class_id: 10, method_id: 40 }, Open { virtual_host: ShortStr(1, "/"), capabilities: ShortStr(0, ""), insist: 0 })
2024-09-04T10:21:14.269047Z TRACE amqprs::net::split_connection: 13 bytes read from network
2024-09-04T10:21:14.269127Z TRACE amqprs::net::split_connection: RECV on channel 0: OpenOk(MethodHeader { class_id: 10, method_id: 41 }, OpenOk { know_hosts: ShortStr(0, "") })
2024-09-04T10:21:14.269574Z  INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2024-09-04T10:21:14.269593Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269671Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269713Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269829Z TRACE amqprs::net::split_connection: SENT on channel 1: OpenChannel(MethodHeader { class_id: 20, method_id: 10 }, OpenChannel { out_of_band: ShortStr(0, "") })
2024-09-04T10:21:14.269957Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 692594584 }
2024-09-04T10:21:14.270771Z TRACE amqprs::net::split_connection: 16 bytes read from network
2024-09-04T10:21:14.270860Z TRACE amqprs::net::split_connection: RECV on channel 1: OpenChannelOk(MethodHeader { class_id: 20, method_id: 11 }, OpenChannelOk { channel_id: LongStr(0, "") })
2024-09-04T10:21:14.270909Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 693544892 }
2024-09-04T10:21:14.271004Z  INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.271038Z TRACE amqprs::api::channel::dispatcher: starts up dispatcher task of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.272320Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.272470Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), bits: 2, arguments: FieldTable(0, {}) })
2024-09-04T10:21:14.272579Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 695221079 }
2024-09-04T10:21:14.272616Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), bits: 2, arguments: FieldTable(0, {}) })
2024-09-04T10:21:14.272665Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 695308609 }
2024-09-04T10:21:14.273137Z TRACE amqprs::net::split_connection: 84 bytes read from network
2024-09-04T10:21:14.273196Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 })
2024-09-04T10:21:14.273251Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 695894645 }
2024-09-04T10:21:14.273303Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 })
2024-09-04T10:21:14.273332Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 695976560 }
thread 'thread 'tokio-runtime-workermain' panicked at ' panicked at /home/space/.cargo/registry/src/index.crates.io-6f17d22bba15001f/amqprs-1.7.0/src/api/channel/dispatcher.rssrc/main.rs::49053::4516:
:
internal error: entered unreachable code: responder must be registered for DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 }) on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'called `Result::unwrap()` on an `Err` value: InternalChannelError("channel closed")

note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-09-04T10:21:14.273603Z TRACE amqprs::api::channel: drop channel 1
2024-09-04T10:21:14.273669Z  INFO amqprs::api::channel: try to close channel 1 at drop
2024-09-04T10:21:14.

I'm not sure if this is an unintended way to use the library, but I don't think even then you should get an "unreachable code" panic.

The reason we're declaring a bunch of queues at once in the first place is that we have a job system where jobs are put on arbitrary queues, and workers that listen to it are only created on demand once there are actually jobs waiting in the queue.
We don't usually in practice re-declare the same queue multiple times in a small time frame, but we can't rule it out either. I guess for now we can manually check if we already declared a specific queue within a certain time frame, as it works if instead of doing join_all we use them in a loop and await them individually:

    for _ in 0..5 {
        let (queue_name, _, _) = channel
            .queue_declare(QueueDeclareArguments::durable_client_named(
                "amqprs.examples.basic",
            ))
            .await
            .unwrap()
            .unwrap();
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    documentationImprovements or additions to documentation

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions