-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathpublisher.rb
More file actions
55 lines (44 loc) · 1.5 KB
/
publisher.rb
File metadata and controls
55 lines (44 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# frozen_string_literal: true
require 'multi_json'
module Travis
module Amqp
class Publisher
class << self
def channel
@channel ||= Amqp.connection.create_channel.tap do
Amqp.logger.debug 'Created AMQP channel.'
end
end
end
attr_reader :name, :type, :routing_key, :options
def initialize(routing_key, options = {})
@routing_key = routing_key
@options = options.dup
@name = @options.delete(:name) || ''
@type = @options.delete(:type) || 'direct'
end
def publish(data, options = {})
return unless data
Amqp.logger.warn "Queue #{routing_key} doesn't exist!" if ENV['AMQP_QUEUE_VALIDATION'] && !Amqp.connection.queue_exists?(routing_key)
data = MultiJson.encode(data)
exchange.publish(data, deep_merge(default_data, options))
debug "Published AMQP message to #{routing_key}."
end
protected
def default_data
{ key: routing_key, properties: { message_id: rand(100_000_000_000).to_s } }
end
def exchange
@exchange ||= self.class.channel.exchange(name, type: type.to_sym, durable: true, auto_delete: false)
end
def deep_merge(hash, other)
hash.merge(other, &(merger = proc { |_key, v1, v2|
v1.is_a?(Hash) && v2.is_a?(Hash) ? v1.merge(v2, &merger) : v2
}))
end
def debug(msg)
Amqp.logger.debug(msg)
end
end
end
end