Skip to content

RabbitMQ output plugin does not engage backpressure when queue is full #58

@IvanRibakov

Description

@IvanRibakov

Logstash information:

  1. Logstash version:
    # bin/logstash --version
    Using bundled JDK: /usr/share/logstash/jdk
    logstash 8.12.2
    
  2. Logstash installation source: logstash:8.12.2 Docker image
  3. How is Logstash being run: as a service managed by a Docker Compose
  4. How was the Logstash Plugin installed: I did not install it explicitly so I'm assuming a default version shipped with the Docker image is used

OS version

Docker host:

Linux pop-os 6.6.10-76060610-generic #202401051437~1709085277~22.04~31d73d8 SMP PREEMPT_DYNAMIC Wed F x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

I have configured queue with following arguments:

"arguments": {
    "x-queue-type": "classic",
    "x-max-length": 1000,
    "x-overflow": "reject-publish"
},

Expected behaviour:

  1. Upon reaching queue limit of 1000 enqueued messages, Logstash stops publishing
  2. Once space on the queue is freed up, Logstash resumes publishing

Observed behaviour:

  1. Logstash ignores queue being full and publishes all messages anyway, leading to a data loss

Related materials:

Steps to reproduce:

  1. docker compose up
  2. Wait for services to start up
  3. Navigate to RabbitMQ management console (http://<container_ip>:15672/#/queues/%2F/i3logs), browse i3logs Queue stats, confirm that queue has 1000 messages in the "Ready" state
  4. Use "Get messages" section at the bottom of the queue page to manually remove some messages (Ack mode: Reject requeue false, Messages: 100)
  5. Observe that number of "Ready" messages dropped to 900 and does NOT go back up 1000 again

Docker compose service definition:

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: rabbitmq
    volumes:
      - ./rabbitmq_conf/definitions.json:/etc/rabbitmq/definitions.json
      - ./rabbitmq_conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 10s
      retries: 10

  logstash:
    depends_on:
      rabbitmq:
       condition: service_healthy
    image: logstash:8.12.2
    user: root
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - ./logs:/var/log
    environment:
      - xpack.monitoring.enabled=false
      - LOG_LEVEL=info
rabbitmq_conf/definitions.json
{
    "rabbit_version": "3.6.15",
    "users": [
        {
            "name": "guest",
            "password_hash": "roeR8CMxbpWbDUzBwN7eQ+rdnnG6UfwICGG1smu1GdssyyQ/",
            "hashing_algorithm": "rabbit_password_hashing_sha256",
            "tags": "administrator"
        }
    ],
    "vhosts": [
        {
            "name": "/"
        }
    ],
    "permissions": [
        {
            "user": "guest",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "parameters": [],
    "global_parameters": [],
    "policies": [],
    "queues": [
        {
            "arguments": {
                "x-queue-type": "classic",
                "x-max-length": 1000,
                "x-overflow": "reject-publish"
            },
            "auto_delete": false,
            "durable": true,
            "name": "i3logs",
            "type": "classic",
            "vhost": "/"
        }
    ],
    "exchanges": [
        {
            "arguments": {},
            "auto_delete": false,
            "durable": true,
            "name": "i3logs",
            "type": "fanout",
            "vhost": "/"
        }
    ],
    "bindings": [
        {
            "arguments": {},
            "destination": "i3logs",
            "destination_type": "queue",
            "routing_key": "logstash",
            "source": "i3logs",
            "vhost": "/"
        }
    ]
}
rabbitmq_conf/rabbitmq.conf
loopback_users.guest = false
listeners.tcp.default = 5672
management.listener.port = 15672
management.listener.ssl = false
management.load_definitions = /etc/rabbitmq/definitions.json
log.console.level = info
logstash.conf
input {
  file {
    path => "/var/log/myproduct.stdout.log"
    start_position => "beginning"
  }
}

output {
  rabbitmq {
    exchange => "i3logs"
    exchange_type => "fanout"
    host => "rabbitmq"
    port => 5672
    persistent => true
    user => "guest"
    password => "guest"
    vhost => "/"
    codec => "json"
  }
}

Provide logs (if relevant):

INFO logs
logstash-1  | 2024/03/20 06:51:51 Setting 'xpack.monitoring.enabled' from environment.
logstash-1  | 2024/03/20 06:51:51 Setting 'log.level' from environment.
logstash-1  | Using bundled JDK: /usr/share/logstash/jdk
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
logstash-1  | Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
logstash-1  | [2024-03-20T06:52:03,576][WARN ][deprecation.logstash.runner] NOTICE: Running Logstash as superuser is not recommended and won't be allowed in the future. Set 'allow_superuser' to 'false' to avoid startup errors in future releases.
logstash-1  | [2024-03-20T06:52:03,583][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
logstash-1  | [2024-03-20T06:52:03,584][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
logstash-1  | [2024-03-20T06:52:03,585][INFO ][logstash.runner          ] JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, -Dls.cgroup.cpuacct.path.override=/, -Dls.cgroup.cpu.path.override=/, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED]
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
logstash-1  | [2024-03-20T06:52:03,593][INFO ][logstash.settings        ] Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
logstash-1  | [2024-03-20T06:52:03,595][INFO ][logstash.settings        ] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
logstash-1  | [2024-03-20T06:52:03,769][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"0dbd11ae-c44d-4c81-b50c-796472497f17", :path=>"/usr/share/logstash/data/uuid"}
logstash-1  | [2024-03-20T06:52:04,282][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
rabbitmq-1  | 2024-03-20 06:52:04.394571+00:00 [info] <0.886.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
rabbitmq-1  | 2024-03-20 06:52:04.394839+00:00 [info] <0.886.0> Successfully synced tables from a peer
logstash-1  | [2024-03-20T06:52:04,620][INFO ][org.reflections.Reflections] Reflections took 91 ms to scan 1 urls, producing 132 keys and 468 values
logstash-1  | [2024-03-20T06:52:04,807][INFO ][logstash.codecs.json     ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
logstash-1  | [2024-03-20T06:52:04,831][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
rabbitmq-1  | 2024-03-20 06:52:04.877477+00:00 [info] <0.894.0> accepting AMQP connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672)
rabbitmq-1  | 2024-03-20 06:52:04.898726+00:00 [info] <0.894.0> connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672): user 'guest' authenticated and granted access to vhost '/'
logstash-1  | [2024-03-20T06:52:04,913][INFO ][logstash.outputs.rabbitmq][main] Connected to RabbitMQ {:url=>"amqp://guest:XXXXXX@localhost:5672/"}
logstash-1  | [2024-03-20T06:52:04,945][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x61765f71 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
logstash-1  | [2024-03-20T06:52:05,472][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.53}
logstash-1  | [2024-03-20T06:52:05,480][INFO ][logstash.inputs.file     ][main] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/usr/share/logstash/data/plugins/inputs/file/.sincedb_08cfe5e821a4884a8b77971020dcc599", :path=>["/var/log/myproduct.log"]}
logstash-1  | [2024-03-20T06:52:05,481][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
logstash-1  | [2024-03-20T06:52:05,486][INFO ][filewatch.observingtail  ][main][7ad47ad9b8977afed9528ba0b335f1a77be695b9c7380d30afa97c0b7c37656b] START, creating Discoverer, Watch with file and sincedb collections
logstash-1  | [2024-03-20T06:52:05,489][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions