-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathnotify.rb
More file actions
126 lines (101 loc) · 3.06 KB
/
notify.rb
File metadata and controls
126 lines (101 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# frozen_string_literal: true
require 'travis/scheduler/helper/coder'
require 'travis/scheduler/serialize/live'
require 'travis/scheduler/serialize/worker'
module Travis
module Scheduler
module Service
class Notify < Struct.new(:context, :data)
include Helper::Runner
include Helper::Metrics
include Helper::Logging
include Helper::Context
include Helper::Coder
include Registry
register :service, :notify
MSGS = {
publish: 'Publishing worker payload for job id=%s queue=%s to %s.',
redirect: 'Found job.queue: %s. Redirecting to: %s.'
}.freeze
def run
# fail('kaputt. testing exception tracking.') if job.repository.owner_name == 'svenfuchs'
set_queue
notify_workers
notify_live
end
private
def set_queue
inline :set_queue, job, jid:, src:
end
def notify_workers
info "Publishing worker payload for job=#{job.id} queue=#{job.queue}"
Travis::Honeycomb.context.add('job_id', job.id)
Travis::Honeycomb.context.add('queue', job.queue)
rollout? ? notify_job_board : notify_rabbitmq
end
def notify_job_board
info :publish, job.id, job.queue, 'job board'
JobBoard.post(job.id, worker_payload)
end
def notify_rabbitmq
info :publish, job.id, job.queue, 'rabbitmq'
w = worker_payload
amqp.publish(w, properties: { type: 'test', persistent: true })
end
def notify_live
# we need to always make sure that the data is fresh, because Active
# Record doesn't always refresh the updated_at column
job.reload
Live.push(live_payload, live_params)
end
def rollout?
Rollout.matches?(:job_board, uid: owner.id, owner: owner.login, redis:)
end
def worker_payload
deep_clean(Serialize::Worker.new(job, config).data)
end
time :worker_payload
def live_payload
Serialize::Live.new(job).data
end
time :live_payload
def live_params
{ event: 'job:queued', user_ids: }
end
time :live_params
def user_ids
job.repository.permissions.pluck(:user_id)
end
def owner
job.owner
end
def job
@job ||= Job.find(job_id)
end
def job_id
data[:job] && data[:job][:id] or raise("No job id given: #{data}")
end
def amqp
Amqp::Publisher.new(job.queue)
end
def redirect_queue
queue = redirections[job.queue] or return
info format(MSGS[:redirect], job.queue, queue)
job.update!(queue:)
end
def redirections
config[:queue_redirections] || {}
end
def amqp
Amqp::Publisher.new(job.queue)
end
def jid
data[:jid]
end
def src
data[:src]
end
end
end
end
end