Skip to content

Do not allow prefetch increase beyond configured value on channel #1812

@antondalgren

Description

@antondalgren

Describe the bug
Allow changing the prefetch to a value higher than the configured value on the client channel can cause a blockage effectively DOSing other consumers on the same connection. Added in #1033

Describe your setup
Please describe the environment you are using by including the build info. Please also include any relevant information about the client software or amqp libraries that you may be using.

To retrieve the build info: bin/lavinmq --build-info

How to reproduce

Reproducible lavinmq spec
require "./spec_helper"

describe LavinMQ::Server do
  describe "Changing prefetch via API should not block other consumers" do
    it "should not let a slow consumer block a fast consumer after prefetch increase (same channel)" do
      with_http_server do |http, s|
        with_channel(s) do |ch|
          ch.prefetch 1
          slow_q = ch.queue("prefetch-bug-slow")
          fast_q = ch.queue("prefetch-bug-fast")

          slow_msgs = [] of AMQP::Client::DeliverMessage
          fast_msgs = [] of AMQP::Client::DeliverMessage

          # Consumer 1: slow consumer - takes 200ms per message, then acks
          slow_q.subscribe(no_ack: false) do |msg|
            slow_msgs << msg
            sleep 200.milliseconds
            msg.ack
          end

          # Consumer 2: fast consumer - acks immediately
          fast_q.subscribe(no_ack: false) do |msg|
            fast_msgs << msg
            msg.ack
          end

          # Publish a couple to confirm consumers work with prefetch=1
          slow_q.publish "msg"
          fast_q.publish "msg"
          wait_for { slow_msgs.size >= 1 && fast_msgs.size >= 1 }

          # Now increase prefetch to 5 via the API
          response = http.get("/api/channels")
          body = JSON.parse(response.body)
          channel = body.as_a.first
          url = "/api/channels/#{URI.encode_path(channel["name"].to_s)}"
          response = http.put(url, body: {"prefetch" => 5}.to_json)
          response.status_code.should eq 204

          # Publish a batch - server will send up to 5 to each consumer at once
          20.times { slow_q.publish "msg" }
          20.times { fast_q.publish "msg" }
          # Bug: The server sends up to 5 messages to the slow consumer.
          # Since both consumers share the same channel, the client processes
          # messages sequentially. The slow consumer's callbacks block the
          # fast consumer from receiving messages.
          fast_before = fast_msgs.size
          sleep 500.milliseconds
          fast_msgs.size.should be > fast_before
        end
      end
    end

    it "should not let a slow consumer block a fast consumer after prefetch increase (different channels)" do
      with_http_server do |http, s|
        conn = AMQP::Client.new(port: amqp_port(s), name: "prefetch-bug-spec").connect
        begin
          slow_ch = conn.channel
          slow_ch.prefetch 1
          fast_ch = conn.channel
          fast_ch.prefetch 1

          slow_q = slow_ch.queue("prefetch-bug-slow-2ch")
          fast_q = fast_ch.queue("prefetch-bug-fast-2ch")

          slow_msgs = [] of AMQP::Client::DeliverMessage
          fast_msgs = [] of AMQP::Client::DeliverMessage

          slow_q.subscribe(no_ack: false) do |msg|
            slow_msgs << msg
            sleep 200.milliseconds
            msg.ack
          end

          fast_q.subscribe(no_ack: false) do |msg|
            fast_msgs << msg
            msg.ack
          end

          slow_q.publish "msg"
          fast_q.publish "msg"
          wait_for { slow_msgs.size >= 1 && fast_msgs.size >= 1 }

          # Increase prefetch on both channels via the API
          response = http.get("/api/channels")
          body = JSON.parse(response.body)
          body.as_a.each do |channel|
            url = "/api/channels/#{URI.encode_path(channel["name"].to_s)}"
            response = http.put(url, body: {"prefetch" => 5}.to_json)
            response.status_code.should eq 204
          end

          20.times { slow_q.publish "msg" }
          20.times { fast_q.publish "msg" }
          fast_before = fast_msgs.size
          sleep 500.milliseconds
          fast_msgs.size.should be > fast_before
        ensure
          conn.close(no_wait: false)
        end
      end
    end
  end
end

Expected behavior
It should not be possible to increase it beyond what is configured on ch.prefetch

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions