Skip to content

Add DLX backoff support #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ jobs:
- uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1
with:
ruby-version: "3.0"
ruby-version: "3.4"
bundler-cache: true
- name: Run Linter
run: bundle exec rubocop
- name: Run audit
run: bundle exec rake bundle:audit
- name: Run specs
run: bundle exec rspec
- name: Coveralls
Expand All @@ -34,7 +32,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: ["3.0", "3.1", "3.2", "3.3"]
ruby: ["3.1", "3.2", "3.3", "3.4"]

steps:
- uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ Style/Alias:
Style/HashConversion:
Exclude:
- spec/**/*

RSpec/ExampleLength:
Max: 50
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.2.0] - 2025-02-10
### Added
- Add `ExponentialBackoffHandler` for handling errors in rabbit messages
- Optional `queue_suffix` config for read queues

## [1.1.0] - 2024-12-06
### Added
- **Receiving**
Expand Down
4 changes: 3 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
source "https://rubygems.org"
gemspec

gem "benchmark"
gem "bundler"
gem "bundler-audit"
gem "ostruct"
gem "pry"
gem "rails"
gem "rake"
Expand All @@ -13,3 +14,4 @@ gem "rspec-its"
gem "rubocop-config-umbrellio"
gem "simplecov"
gem "simplecov-lcov"
gem "sneakers_handlers", github: "umbrellio/sneakers_handlers"
20 changes: 14 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
GIT
remote: https://github.com/umbrellio/sneakers_handlers.git
revision: d4948218a76f2e510a14a72437f4dbd1ecdbef79
specs:
sneakers_handlers (0.1.0)
kicks

PATH
remote: .
specs:
rabbit_messaging (1.1.0)
rabbit_messaging (1.2.0)
bunny (~> 2.0)
kicks (~> 3)
lamian
Expand Down Expand Up @@ -88,11 +95,9 @@ GEM
amq-protocol (2.3.2)
ast (2.4.2)
base64 (0.2.0)
benchmark (0.4.0)
bigdecimal (3.1.8)
builder (3.3.0)
bundler-audit (0.9.2)
bundler (>= 1.2.0, < 3)
thor (~> 1.0)
bunny (2.23.0)
amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2)
Expand Down Expand Up @@ -151,6 +156,7 @@ GEM
nokogiri (1.16.8)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
ostruct (0.6.1)
parallel (1.26.3)
parser (3.3.5.0)
ast (~> 2.4.1)
Expand Down Expand Up @@ -293,8 +299,9 @@ PLATFORMS
ruby

DEPENDENCIES
benchmark
bundler
bundler-audit
ostruct
pry
rabbit_messaging!
rails
Expand All @@ -304,6 +311,7 @@ DEPENDENCIES
rubocop-config-umbrellio
simplecov
simplecov-lcov
sneakers_handlers!

BUNDLED WITH
2.5.22
2.6.3
60 changes: 39 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,42 @@ require "rabbit_messaging"

- `Rabbit.config` provides setters for following options:

* `group_id` (`Symbol`), *required*
- `group_id` (`Symbol`), *required*

Shared identifier which used to select api. As usual, it should be same as default project_id
(I.e. we have project 'support', which runs only one application in production.
So on, it's group_id should be :support)

* `project_id` (`Symbol`), *required*
- `project_id` (`Symbol`), *required*

Personal identifier which used to select exact service.
As usual, it should be same as default project_id with optional stage_id.
(I.e. we have project 'support', in production it's project_id is :support,
but in staging it uses :support1 and :support2 ids for corresponding stages)

- `queue_suffix` (`String`)

* `exception_notifier` (`Proc`)
Optional suffix added to the read queue name. For example, in case of `group_id = "grp"`, `project_id = "prj"` and
`queue_suffix = "sfx"`, Rabbit will read from queue named `"grp.prj.sfx"`.

- `exception_notifier` (`Proc`)
You must provide your own notifier like this to notify about exceptions:

```ruby
config.exception_notifier = proc { |e| MyCoolNotifier.notify!(e) }
```

* `hooks` (`Hash`)
- `hooks` (`Hash`)

:before_fork and :after_fork hooks, used same way as in unicorn / puma / que / etc

* `environment` (one of `:test`, `:development`, `:production`), *default:* `:production`
- `environment` (one of `:test`, `:development`, `:production`), *default:- `:production`

Internal environment of gem.

* `:test` environment stubs publishing and does not suppress errors
* `:development` environment auto-creates queues and uses default exchange
* `:production` environment enables handlers caching and gets maximum strictness
- `:test` environment stubs publishing and does not suppress errors
- `:development` environment auto-creates queues and uses default exchange
- `:production` environment enables handlers caching and gets maximum strictness

By default gem skips publishing in test and development environments.
If you want to change that then manually set `Rabbit.skip_publishing_in` with an array of environments.
Expand All @@ -73,13 +77,13 @@ require "rabbit_messaging"
Rabbit.skip_publishing_in = %i[test]
```

* `receiving_job_class_callable` (`Proc`)
- `receiving_job_class_callable` (`Proc`)

Custom ActiveJob subclass to work with received messages. Receives the following attributes as `kwarg`-arguments:

* `:arguments` - information about message type (`type`), application id (`app_id`), message id (`message_id`);
* `:delivery_info` - information about `exchange`, `routing_key`, etc;
* `:message` - received RabbitMQ message (often in a `string` format);
- `:arguments` - information about message type (`type`), application id (`app_id`), message id (`message_id`);
- `:delivery_info` - information about `exchange`, `routing_key`, etc;
- `:message` - received RabbitMQ message (often in a `string` format);

```ruby
{
Expand All @@ -93,7 +97,7 @@ require "rabbit_messaging"
}
```

* `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`)
- `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`)

Before and after hooks with message processing in the middle. Where `before_receiving_hooks` and `after_receiving_hooks` are empty arrays by default.

Expand All @@ -107,8 +111,22 @@ require "rabbit_messaging"

config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_3 })
config.after_receiving_hooks.append(proc { |message, arguments| do_stuff_4 })
```

- `use_backoff_handler` (`Boolean`)

If set to `true`, use `ExponentialBackoffHandler`. You will also need add the following line to your Gemfile:

```ruby
gem "sneakers_handlers", github: "umbrellio/sneakers_handlers"
```

See https://github.com/umbrellio/sneakers_handlers for more details.


- `backoff_handler_max_retries` (`Integer`)

Number of retries that `ExponentialBackoffHandler` will use before sending job to the error queue. 5 by default.
---

### Client
Expand All @@ -127,16 +145,16 @@ Rabbit.publish(

- This code sends messages via basic_publish with following parameters:

* `routing_key`: `"support"`
* `exchange`: `"group_id.project_id.fanout"` (default is `"group_id.poject_id"`)
* `mandatory`: `true` (same as confirm_select)
- `routing_key`: `"support"`
- `exchange`: `"group_id.project_id.fanout"` (default is `"group_id.poject_id"`)
- `mandatory`: `true` (same as confirm_select)

It is set to raise error if routing failed

* `persistent`: `true`
* `type`: `"ping"`
* `content_type`: `"application/json"` (always)
* `app_id`: `"group_id.project_id"`
- `persistent`: `true`
- `type`: `"ping"`
- `content_type`: `"application/json"` (always)
- `app_id`: `"group_id.project_id"`

- Messages are logged to `/log/rabbit.log`

Expand Down
2 changes: 0 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require "bundler/gem_tasks"
require "bundler/audit/task"
require "rspec/core/rake_task"
require "rubocop"
require "rubocop-rspec"
Expand All @@ -17,6 +16,5 @@ RuboCop::RakeTask.new(:rubocop) do |t|
end

RSpec::Core::RakeTask.new(:rspec)
Bundler::Audit::Task.new

task default: :rspec
4 changes: 4 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/usr/bin/env ruby

# Required for tainbox :(
require "active_support/deprecation"
require "active_support/deprecator"

require "bundler/setup"
require "rabbit_messaging"

Expand Down
13 changes: 9 additions & 4 deletions lib/rabbit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ module Rabbit
class Config
include Tainbox

attribute :group_id, Symbol
attribute :project_id, Symbol
attribute :group_id, :Symbol
attribute :project_id, :Symbol
attribute :queue_suffix, :String
attribute :hooks, default: {}
attribute :environment, Symbol, default: :production
attribute :environment, :Symbol, default: :production
attribute :queue_name_conversion
attribute :receiving_job_class_callable
attribute :handler_resolver_callable
attribute :exception_notifier
attribute :before_receiving_hooks, default: []
attribute :after_receiving_hooks, default: []
attribute :skip_publishing_in, default: %i[test development]
attribute :use_backoff_handler, :Boolean, default: false
attribute :backoff_handler_max_retries, Integer, default: 6

attribute :receive_logger, default: lambda {
Logger.new(Rails.root.join("log", "incoming_rabbit_messages.log"))
Expand Down Expand Up @@ -58,7 +61,9 @@ def app_name
[group_id, project_id].join(".")
end

alias_method :read_queue, :app_name
def read_queue
[app_name, queue_suffix].reject { |x| x.nil? || x.empty? }.join(".")
end
end

extend self
Expand Down
32 changes: 28 additions & 4 deletions lib/rabbit/daemon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,34 @@ def run(logger: Sneakers.logger)
Lamian.extend_logger(logger)
end

self.logger = logger

Sneakers.configure(**sneakers_config(logger: logger))
Sneakers.server = true

Rabbit.config.validate!
Receiving::Worker.from_queue(Rabbit.config.read_queue)

Receiving::Worker.from_queue(Rabbit.config.read_queue, **worker_options)
Sneakers::Runner.new([Receiving::Worker]).run
end

def config
Rails.application.config_for("sneakers").symbolize_keys
@config ||= Rails.application.config_for("sneakers").symbolize_keys
end

def connection
bunny_config = config.delete(:bunny_options).to_h.symbolize_keys
Bunny.new(bunny_config)
@connection ||= begin
bunny_config = config.delete(:bunny_options).to_h.symbolize_keys
bunny_logger = logger.dup
bunny_logger.level = bunny_config.delete(:log_level) || :info
Bunny.new(**bunny_config, logger: bunny_logger)
end
end

private

attr_accessor :logger

def sneakers_config(logger:)
{
connection: connection,
Expand All @@ -51,5 +60,20 @@ def sneakers_config(logger:)
**config,
}
end

def worker_options
return {} unless Rabbit.config.use_backoff_handler

require "sneakers_handlers"

{
handler: SneakersHandlers::ExponentialBackoffHandler,
max_retries: Rabbit.config.backoff_handler_max_retries,
arguments: {
"x-dead-letter-exchange" => "#{Rabbit.config.read_queue}.dlx",
"x-dead-letter-routing-key" => "#{Rabbit.config.read_queue}.dlx",
},
}
end
end
end
4 changes: 2 additions & 2 deletions lib/rabbit/publishing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def log(message)
@logger ||= Rabbit.config.publish_logger

metadata = [
message.real_exchange_name, message.routing_key, message.headers,
message.real_exchange_name, message.routing_key, JSON.dump(message.headers),
message.event, message.confirm_select? ? "confirm" : "no-confirm"
]

@logger.debug "#{metadata.join ' / '}: #{message.data}"
@logger.debug "#{metadata.join ' / '}: #{JSON.dump(message.data)}"
end
end
end
2 changes: 1 addition & 1 deletion lib/rabbit/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Rabbit
VERSION = "1.1.0"
VERSION = "1.2.0"
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SimpleCov.add_filter "spec"
SimpleCov.start

# Required for tainbox :(
require "active_support/deprecation"
require "active_support/deprecator"

Expand Down Expand Up @@ -48,4 +49,7 @@

config.default_formatter = "doc" if config.files_to_run.one?
config.expose_dsl_globally = true

config.order = :random
Kernel.srand(config.seed)
end
Loading