diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 461657e..bf45c1c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,12 +13,10 @@ jobs: - uses: actions/checkout@v2 - uses: ruby/setup-ruby@v1 with: - ruby-version: "3.0" + ruby-version: "3.4" bundler-cache: true - name: Run Linter run: bundle exec rubocop - - name: Run audit - run: bundle exec rake bundle:audit - name: Run specs run: bundle exec rspec - name: Coveralls @@ -34,7 +32,7 @@ jobs: strategy: fail-fast: false matrix: - ruby: ["3.0", "3.1", "3.2", "3.3"] + ruby: ["3.1", "3.2", "3.3", "3.4"] steps: - uses: actions/checkout@v2 diff --git a/.rubocop.yml b/.rubocop.yml index 5efbf6d..48662ac 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -26,3 +26,6 @@ Style/Alias: Style/HashConversion: Exclude: - spec/**/* + +RSpec/ExampleLength: + Max: 50 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c43867..e317c25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog All notable changes to this project will be documented in this file. +## [1.2.0] - 2025-02-10 +### Added +- Add `ExponentialBackoffHandler` for handling errors in rabbit messages +- Optional `queue_suffix` config for read queues + ## [1.1.0] - 2024-12-06 ### Added - **Receiving** diff --git a/Gemfile b/Gemfile index 60d8562..8c4e4b1 100644 --- a/Gemfile +++ b/Gemfile @@ -3,8 +3,9 @@ source "https://rubygems.org" gemspec +gem "benchmark" gem "bundler" -gem "bundler-audit" +gem "ostruct" gem "pry" gem "rails" gem "rake" @@ -13,3 +14,4 @@ gem "rspec-its" gem "rubocop-config-umbrellio" gem "simplecov" gem "simplecov-lcov" +gem "sneakers_handlers", github: "umbrellio/sneakers_handlers" diff --git a/Gemfile.lock b/Gemfile.lock index 8d3783b..2585a55 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,14 @@ +GIT + remote: https://github.com/umbrellio/sneakers_handlers.git + revision: d4948218a76f2e510a14a72437f4dbd1ecdbef79 + specs: + sneakers_handlers (0.1.0) + kicks + PATH remote: . specs: - rabbit_messaging (1.1.0) + rabbit_messaging (1.2.0) bunny (~> 2.0) kicks (~> 3) lamian @@ -88,11 +95,9 @@ GEM amq-protocol (2.3.2) ast (2.4.2) base64 (0.2.0) + benchmark (0.4.0) bigdecimal (3.1.8) builder (3.3.0) - bundler-audit (0.9.2) - bundler (>= 1.2.0, < 3) - thor (~> 1.0) bunny (2.23.0) amq-protocol (~> 2.3, >= 2.3.1) sorted_set (~> 1, >= 1.0.2) @@ -151,6 +156,7 @@ GEM nokogiri (1.16.8) mini_portile2 (~> 2.8.2) racc (~> 1.4) + ostruct (0.6.1) parallel (1.26.3) parser (3.3.5.0) ast (~> 2.4.1) @@ -293,8 +299,9 @@ PLATFORMS ruby DEPENDENCIES + benchmark bundler - bundler-audit + ostruct pry rabbit_messaging! rails @@ -304,6 +311,7 @@ DEPENDENCIES rubocop-config-umbrellio simplecov simplecov-lcov + sneakers_handlers! BUNDLED WITH - 2.5.22 + 2.6.3 diff --git a/README.md b/README.md index 232c7d6..5cac3c4 100644 --- a/README.md +++ b/README.md @@ -33,38 +33,42 @@ require "rabbit_messaging" - `Rabbit.config` provides setters for following options: - * `group_id` (`Symbol`), *required* + - `group_id` (`Symbol`), *required* Shared identifier which used to select api. As usual, it should be same as default project_id (I.e. we have project 'support', which runs only one application in production. So on, it's group_id should be :support) - * `project_id` (`Symbol`), *required* + - `project_id` (`Symbol`), *required* Personal identifier which used to select exact service. As usual, it should be same as default project_id with optional stage_id. (I.e. we have project 'support', in production it's project_id is :support, but in staging it uses :support1 and :support2 ids for corresponding stages) + - `queue_suffix` (`String`) - * `exception_notifier` (`Proc`) + Optional suffix added to the read queue name. For example, in case of `group_id = "grp"`, `project_id = "prj"` and + `queue_suffix = "sfx"`, Rabbit will read from queue named `"grp.prj.sfx"`. + + - `exception_notifier` (`Proc`) You must provide your own notifier like this to notify about exceptions: - + ```ruby config.exception_notifier = proc { |e| MyCoolNotifier.notify!(e) } ``` - * `hooks` (`Hash`) + - `hooks` (`Hash`) :before_fork and :after_fork hooks, used same way as in unicorn / puma / que / etc - * `environment` (one of `:test`, `:development`, `:production`), *default:* `:production` + - `environment` (one of `:test`, `:development`, `:production`), *default:- `:production` Internal environment of gem. - * `:test` environment stubs publishing and does not suppress errors - * `:development` environment auto-creates queues and uses default exchange - * `:production` environment enables handlers caching and gets maximum strictness + - `:test` environment stubs publishing and does not suppress errors + - `:development` environment auto-creates queues and uses default exchange + - `:production` environment enables handlers caching and gets maximum strictness By default gem skips publishing in test and development environments. If you want to change that then manually set `Rabbit.skip_publishing_in` with an array of environments. @@ -73,13 +77,13 @@ require "rabbit_messaging" Rabbit.skip_publishing_in = %i[test] ``` - * `receiving_job_class_callable` (`Proc`) + - `receiving_job_class_callable` (`Proc`) Custom ActiveJob subclass to work with received messages. Receives the following attributes as `kwarg`-arguments: - * `:arguments` - information about message type (`type`), application id (`app_id`), message id (`message_id`); - * `:delivery_info` - information about `exchange`, `routing_key`, etc; - * `:message` - received RabbitMQ message (often in a `string` format); + - `:arguments` - information about message type (`type`), application id (`app_id`), message id (`message_id`); + - `:delivery_info` - information about `exchange`, `routing_key`, etc; + - `:message` - received RabbitMQ message (often in a `string` format); ```ruby { @@ -93,7 +97,7 @@ require "rabbit_messaging" } ``` - * `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`) + - `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`) Before and after hooks with message processing in the middle. Where `before_receiving_hooks` and `after_receiving_hooks` are empty arrays by default. @@ -107,8 +111,22 @@ require "rabbit_messaging" config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_3 }) config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_4 }) + ``` + + - `use_backoff_handler` (`Boolean`) + + If set to `true`, use `ExponentialBackoffHandler`. You will also need add the following line to your Gemfile: + ```ruby + gem "sneakers_handlers", github: "umbrellio/sneakers_handlers" ``` + + See https://github.com/umbrellio/sneakers_handlers for more details. + + + - `backoff_handler_max_retries` (`Integer`) + + Number of retries that `ExponentialBackoffHandler` will use before sending job to the error queue. 5 by default. --- ### Client @@ -127,16 +145,16 @@ Rabbit.publish( - This code sends messages via basic_publish with following parameters: - * `routing_key`: `"support"` - * `exchange`: `"group_id.project_id.fanout"` (default is `"group_id.poject_id"`) - * `mandatory`: `true` (same as confirm_select) + - `routing_key`: `"support"` + - `exchange`: `"group_id.project_id.fanout"` (default is `"group_id.poject_id"`) + - `mandatory`: `true` (same as confirm_select) It is set to raise error if routing failed - * `persistent`: `true` - * `type`: `"ping"` - * `content_type`: `"application/json"` (always) - * `app_id`: `"group_id.project_id"` + - `persistent`: `true` + - `type`: `"ping"` + - `content_type`: `"application/json"` (always) + - `app_id`: `"group_id.project_id"` - Messages are logged to `/log/rabbit.log` diff --git a/Rakefile b/Rakefile index c22c1eb..39ad954 100644 --- a/Rakefile +++ b/Rakefile @@ -1,7 +1,6 @@ # frozen_string_literal: true require "bundler/gem_tasks" -require "bundler/audit/task" require "rspec/core/rake_task" require "rubocop" require "rubocop-rspec" @@ -17,6 +16,5 @@ RuboCop::RakeTask.new(:rubocop) do |t| end RSpec::Core::RakeTask.new(:rspec) -Bundler::Audit::Task.new task default: :rspec diff --git a/bin/console b/bin/console index d1571c9..4d37116 100755 --- a/bin/console +++ b/bin/console @@ -1,5 +1,9 @@ #!/usr/bin/env ruby +# Required for tainbox :( +require "active_support/deprecation" +require "active_support/deprecator" + require "bundler/setup" require "rabbit_messaging" diff --git a/lib/rabbit.rb b/lib/rabbit.rb index d1dc755..e24a896 100644 --- a/lib/rabbit.rb +++ b/lib/rabbit.rb @@ -16,10 +16,11 @@ module Rabbit class Config include Tainbox - attribute :group_id, Symbol - attribute :project_id, Symbol + attribute :group_id, :Symbol + attribute :project_id, :Symbol + attribute :queue_suffix, :String attribute :hooks, default: {} - attribute :environment, Symbol, default: :production + attribute :environment, :Symbol, default: :production attribute :queue_name_conversion attribute :receiving_job_class_callable attribute :handler_resolver_callable @@ -27,6 +28,8 @@ class Config attribute :before_receiving_hooks, default: [] attribute :after_receiving_hooks, default: [] attribute :skip_publishing_in, default: %i[test development] + attribute :use_backoff_handler, :Boolean, default: false + attribute :backoff_handler_max_retries, Integer, default: 6 attribute :receive_logger, default: lambda { Logger.new(Rails.root.join("log", "incoming_rabbit_messages.log")) @@ -58,7 +61,9 @@ def app_name [group_id, project_id].join(".") end - alias_method :read_queue, :app_name + def read_queue + [app_name, queue_suffix].reject { |x| x.nil? || x.empty? }.join(".") + end end extend self diff --git a/lib/rabbit/daemon.rb b/lib/rabbit/daemon.rb index 3605b7c..60eb462 100644 --- a/lib/rabbit/daemon.rb +++ b/lib/rabbit/daemon.rb @@ -18,25 +18,34 @@ def run(logger: Sneakers.logger) Lamian.extend_logger(logger) end + self.logger = logger + Sneakers.configure(**sneakers_config(logger: logger)) Sneakers.server = true Rabbit.config.validate! - Receiving::Worker.from_queue(Rabbit.config.read_queue) + + Receiving::Worker.from_queue(Rabbit.config.read_queue, **worker_options) Sneakers::Runner.new([Receiving::Worker]).run end def config - Rails.application.config_for("sneakers").symbolize_keys + @config ||= Rails.application.config_for("sneakers").symbolize_keys end def connection - bunny_config = config.delete(:bunny_options).to_h.symbolize_keys - Bunny.new(bunny_config) + @connection ||= begin + bunny_config = config.delete(:bunny_options).to_h.symbolize_keys + bunny_logger = logger.dup + bunny_logger.level = bunny_config.delete(:log_level) || :info + Bunny.new(**bunny_config, logger: bunny_logger) + end end private + attr_accessor :logger + def sneakers_config(logger:) { connection: connection, @@ -51,5 +60,20 @@ def sneakers_config(logger:) **config, } end + + def worker_options + return {} unless Rabbit.config.use_backoff_handler + + require "sneakers_handlers" + + { + handler: SneakersHandlers::ExponentialBackoffHandler, + max_retries: Rabbit.config.backoff_handler_max_retries, + arguments: { + "x-dead-letter-exchange" => "#{Rabbit.config.read_queue}.dlx", + "x-dead-letter-routing-key" => "#{Rabbit.config.read_queue}.dlx", + }, + } + end end end diff --git a/lib/rabbit/publishing.rb b/lib/rabbit/publishing.rb index 219454b..6090d21 100644 --- a/lib/rabbit/publishing.rb +++ b/lib/rabbit/publishing.rb @@ -50,11 +50,11 @@ def log(message) @logger ||= Rabbit.config.publish_logger metadata = [ - message.real_exchange_name, message.routing_key, message.headers, + message.real_exchange_name, message.routing_key, JSON.dump(message.headers), message.event, message.confirm_select? ? "confirm" : "no-confirm" ] - @logger.debug "#{metadata.join ' / '}: #{message.data}" + @logger.debug "#{metadata.join ' / '}: #{JSON.dump(message.data)}" end end end diff --git a/lib/rabbit/version.rb b/lib/rabbit/version.rb index 46ad804..384cb84 100644 --- a/lib/rabbit/version.rb +++ b/lib/rabbit/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Rabbit - VERSION = "1.1.0" + VERSION = "1.2.0" end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 67adfbf..21ba005 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,6 +20,7 @@ SimpleCov.add_filter "spec" SimpleCov.start +# Required for tainbox :( require "active_support/deprecation" require "active_support/deprecator" @@ -48,4 +49,7 @@ config.default_formatter = "doc" if config.files_to_run.one? config.expose_dsl_globally = true + + config.order = :random + Kernel.srand(config.seed) end diff --git a/spec/support/spec_support.rb b/spec/support/spec_support.rb index 37ca2b0..2d584f5 100644 --- a/spec/support/spec_support.rb +++ b/spec/support/spec_support.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true RSpec.configure do |config| - rabbit_original_config = Rabbit.config.dup - config.after { Rabbit.instance_variable_set(:@config, rabbit_original_config) } + rabbit_original_config = Rabbit.config.deep_dup + config.before { Rabbit.instance_variable_set(:@config, rabbit_original_config.deep_dup) } end diff --git a/spec/units/rabbit/daemon_spec.rb b/spec/units/rabbit/daemon_spec.rb new file mode 100644 index 0000000..6c94ff4 --- /dev/null +++ b/spec/units/rabbit/daemon_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +RSpec.describe Rabbit::Daemon do + before do + allow(app_double).to receive(:config_for).with("sneakers").and_return(sneakers_config) + allow(runner_double).to receive(:run) + allow(logger_double).to receive(:dup).and_return(sneaker_logger_double) + allow(sneaker_logger_double).to receive(:level=) + + allow(Bunny).to receive(:new).and_return(bunny_double) + allow(Rails).to receive(:application).and_return(app_double) + allow(Rails).to receive(:env).and_return("test") + allow(Sneakers).to receive(:configure) + + allow(Sneakers::Runner).to receive(:new).and_return(runner_double) + allow(Rabbit::Receiving::Worker).to receive(:from_queue) + end + + let(:app_double) { double(:rails_app) } + let(:worker_double) { double(:receiving_worker) } + let(:runner_double) { double(:sneakers_runner) } + let(:sneaker_logger_double) { double(:sneaker_logger) } + let(:logger_double) { double(:logger) } + let(:bunny_double) { double(:bunny) } + + let(:sneakers_config) do + { + foo: 1, + bunny_options: { + bar: 2, + log_level: "warn", + }, + } + end + + it "setups sneakers properly and runs daemon" do + Rabbit::Daemon.run(logger: logger_double) + + expect(Sneakers).to have_received(:configure).with( + connection: bunny_double, + env: "test", + exchange_type: :direct, + exchange: "test_group_id.test_project_id", + hooks: {}, + supervisor: true, + daemonize: false, + exit_on_detach: true, + log: logger_double, + foo: 1, + ) + + expect(Rabbit::Receiving::Worker) + .to have_received(:from_queue).with("test_group_id.test_project_id") + + expect(Bunny).to have_received(:new).with(logger: sneaker_logger_double, bar: 2) + + expect(sneaker_logger_double).to have_received(:level=).with("warn") + expect(runner_double).to have_received(:run) + end + + context "backoff handler enabled" do + before { Rabbit.config.use_backoff_handler = true } + before { Rabbit.config.backoff_handler_max_retries = 25 } + before { Rabbit.config.queue_suffix = "v2" } + + it "uses handler" do + Rabbit::Daemon.run(logger: logger_double) + + expect(Rabbit::Receiving::Worker).to have_received(:from_queue).with( + "test_group_id.test_project_id.v2", + handler: SneakersHandlers::ExponentialBackoffHandler, + max_retries: 25, + arguments: { + "x-dead-letter-exchange" => "test_group_id.test_project_id.v2.dlx", + "x-dead-letter-routing-key" => "test_group_id.test_project_id.v2.dlx", + }, + ) + end + end +end diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index 7dd2f22..a6f89dc 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -49,11 +49,8 @@ ) end - it "publishes" do # rubocop:disable RSpec/ExampleLength + it "publishes" do if expect_to_use_job - log_line = 'test_group_id.test_project_id.some_exchange / some_queue / ' \ - '{"foo"=>"bar"} / some_event / confirm: {"hello"=>"world"}' - set_params = { queue: "default_prepared" } expect(Rabbit::Publishing::Job).to receive(:set).with(set_params).and_call_original perform_params = { @@ -66,17 +63,17 @@ headers: { "foo" => "bar" }, message_id: "uuid", } - expect_any_instance_of(ActiveJob::ConfiguredJob).to receive(:perform_later) - .with(perform_params) - .and_call_original + expect_any_instance_of(ActiveJob::ConfiguredJob) + .to receive(:perform_later).with(perform_params).and_call_original else - log_line = 'test_group_id.test_project_id.some_exchange / some_queue / ' \ - '{"foo"=>"bar"} / some_event / confirm: {:hello=>:world}' expect(Rabbit::Publishing::Job).not_to receive(:perform_later) end - expect(publish_logger).to receive(:debug).with(log_line).once + expect(publish_logger).to receive(:debug).with(<<~MSG.strip).once + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ + confirm: {"hello":"world"} + MSG described_class.publish(message_options) end @@ -100,4 +97,28 @@ include_examples "publishes" end + + describe "config" do + describe "#read_queue" do + specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id") } + + context "with nil suffix provided" do + before { Rabbit.config.queue_suffix = nil } + + specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id") } + end + + context "with blank suffix provided" do + before { Rabbit.config.queue_suffix = "" } + + specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id") } + end + + context "with suffix provided" do + before { Rabbit.config.queue_suffix = "smth" } + + specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id.smth") } + end + end + end end