forked from umbrellio/rabbit_messaging
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrabbit.rb
95 lines (74 loc) · 2.44 KB
/
rabbit.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# frozen_string_literal: true
require "tainbox"
require "rabbit/version"
require "rabbit/daemon"
require "rabbit/publishing"
require "rabbit/event_handler"
require "rabbit/extensions/bunny/channel"
module Rabbit
InvalidConfig = Class.new(StandardError)
MessageNotDelivered = Class.new(StandardError)
class Config
include Tainbox
attribute :group_id, Symbol
attribute :project_id, Symbol
attribute :hooks, default: {}
attribute :environment, Symbol, default: :production
attribute :queue_name_conversion
attribute :receiving_job_class_callable
attribute :handler_resolver_callable
attribute :exception_notifier
attribute :before_receiving_hooks, default: []
attribute :after_receiving_hooks, default: []
attribute :skip_publishing_in, default: %i[test development]
attribute :receive_logger, default: lambda {
Logger.new(Rails.root.join("log", "incoming_rabbit_messages.log"))
}
attribute :publish_logger, default: lambda {
Logger.new(Rails.root.join("log", "rabbit.log"))
}
attribute :malformed_logger, default: lambda {
Logger.new(Rails.root.join("log", "malformed_messages.log"))
}
def validate!
raise InvalidConfig, "missing project_id" unless project_id
raise InvalidConfig, "missing group_id" unless group_id
raise InvalidConfig, "missing exception_notifier" unless exception_notifier
unless environment.in? %i[test development production]
raise "environment should be one of (test, development, production)"
end
end
def skip_publish?
skip_publishing_in.include?(environment)
end
def app_name
[group_id, project_id].join(".")
end
alias_method :read_queue, :app_name
end
extend self
def config
@config ||= Config.new
yield(@config) if block_given?
@config
end
def configure
yield(config)
config.validate!
end
def publish(message_options)
message = Publishing::Message.new(message_options)
if message.realtime?
Publishing.publish(message)
else
Publishing::Job.set(queue: default_queue_name).perform_later(message.to_hash)
end
end
def queue_name(queue, ignore_conversion: false)
return queue if ignore_conversion
config.queue_name_conversion ? config.queue_name_conversion.call(queue) : queue
end
def default_queue_name(ignore_conversion: false)
queue_name(:default, ignore_conversion: ignore_conversion)
end
end