Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/speedshop/cloudwatch/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ def report_job_metrics
begin
if enqueued_at
queue_time = Time.now.to_f - enqueued_at.to_f
dimensions = [{name: "JobClass", value: self.class.name}, {name: "QueueName", value: queue_name}]
Cloudwatch.reporter.report("QueueLatency", queue_time, integration: :active_job, unit: "Seconds", dimensions: dimensions)
Cloudwatch.reporter.report(metric: :QueueLatency, value: queue_time, dimensions: {JobClass: self.class.name, QueueName: queue_name})
end
rescue => e
Speedshop::Cloudwatch.log_error("Failed to collect ActiveJob metrics: #{e.message}", e)
Expand Down
12 changes: 11 additions & 1 deletion lib/speedshop/cloudwatch/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module Speedshop
module Cloudwatch
class Configuration
attr_accessor :interval, :client, :metrics, :namespaces, :logger, :sidekiq_queues, :dimensions
attr_accessor :interval, :client, :metrics, :namespaces, :logger, :sidekiq_queues, :dimensions, :units

def initialize
@interval = 60
Expand All @@ -19,6 +19,16 @@ def initialize
rack: [:RequestQueueTime],
active_job: [:QueueLatency]
}
@units = {
Workers: "Count", BootedWorkers: "Count", OldWorkers: "Count", Running: "Count",
Backlog: "Count", PoolCapacity: "Count", MaxThreads: "Count",
EnqueuedJobs: "Count", ProcessedJobs: "Count", FailedJobs: "Count",
ScheduledJobs: "Count", RetryJobs: "Count", DeadJobs: "Count",
Processes: "Count", Capacity: "Count", QueueSize: "Count",
DefaultQueueLatency: "Seconds", QueueLatency: "Seconds",
Utilization: "Percent",
RequestQueueTime: "Milliseconds"
}
@namespaces = {puma: "Puma", sidekiq: "Sidekiq", rack: "Rack", active_job: "ActiveJob"}
@sidekiq_queues = nil
@dimensions = {}
Expand Down
20 changes: 15 additions & 5 deletions lib/speedshop/cloudwatch/metric_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,29 @@ def stop!
thread_to_join&.join
end

def report(metric_name, value, integration:, unit: "None", dimensions: [])
namespace = @config.namespaces[integration]
def report(metric:, value:, dimensions: {}, namespace: nil)
metric_name = metric.to_sym

integration = find_integration_for_metric(metric_name)
return unless integration

ns = namespace || @config.namespaces[integration]
unit = @config.units[metric_name] || "None"

if [:rack, :active_job].include?(integration)
@mutex.synchronize { @registered_integrations << integration }
end

return unless metric_allowed?(integration, metric_name)

all_dimensions = dimensions + custom_dimensions
dimensions_array = dimensions.map { |k, v| {name: k.to_s, value: v.to_s} }
all_dimensions = dimensions_array + custom_dimensions

@mutex.synchronize do
@queue << {metric_name: metric_name, value: value, namespace: namespace, unit: unit,
@queue << {metric_name: metric_name.to_s, value: value, namespace: ns, unit: unit,
dimensions: all_dimensions, timestamp: Time.now}
end

# Lazy-init of the reporter thread
start! unless started?
end

Expand Down Expand Up @@ -134,6 +140,10 @@ def metric_allowed?(integration, metric_name)
def custom_dimensions
@config.dimensions.map { |name, value| {name: name.to_s, value: value.to_s} }
end

def find_integration_for_metric(metric_name)
@config.metrics.find { |int, metrics| metrics.include?(metric_name.to_sym) }&.first
end
end
end
end
9 changes: 4 additions & 5 deletions lib/speedshop/cloudwatch/puma.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def collect_metrics
if stats[:worker_status]
%i[workers booted_workers old_workers].each do |m|
# Submit to SnakeCase tyranny
metric_name = m.to_s.split("_").map(&:capitalize).join
@reporter.report(metric_name, stats[m] || 0, integration: :puma, unit: "Count")
metric_name = m.to_s.split("_").map(&:capitalize).join.to_sym
@reporter.report(metric: metric_name, value: stats[m] || 0)
end
end

Expand All @@ -34,10 +34,9 @@ def worker_statuses(stats)
end

def report_worker(stats, idx)
dims = [{name: "WorkerIndex", value: idx.to_s}]
%i[running backlog pool_capacity max_threads].each do |m|
metric_name = m.to_s.split("_").map(&:capitalize).join
@reporter.report(metric_name, stats[m] || 0, integration: :puma, unit: "Count", dimensions: dims)
metric_name = m.to_s.split("_").map(&:capitalize).join.to_sym
@reporter.report(metric: metric_name, value: stats[m] || 0, dimensions: {WorkerIndex: idx.to_s})
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/speedshop/cloudwatch/rack_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def call(env)
# Header contains milliseconds since epoch (with optional "t=" prefix).
if (header = env["HTTP_X_REQUEST_START"] || env["HTTP_X_QUEUE_START"])
queue_time = (Time.now.to_f * 1000) - header.gsub("t=", "").to_f
Speedshop::Cloudwatch.reporter.report("RequestQueueTime", queue_time, integration: :rack, unit: "Milliseconds")
Speedshop::Cloudwatch.reporter.report(metric: :RequestQueueTime, value: queue_time)
end
rescue => e
Speedshop::Cloudwatch.log_error("Failed to collect Rack metrics: #{e.message}", e)
Expand Down
24 changes: 11 additions & 13 deletions lib/speedshop/cloudwatch/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,33 @@ def report_stats(stats)
EnqueuedJobs: stats.enqueued, ProcessedJobs: stats.processed, FailedJobs: stats.failed,
ScheduledJobs: stats.scheduled_size, RetryJobs: stats.retry_size, DeadJobs: stats.dead_size,
Workers: stats.workers_size, Processes: stats.processes_size
}.each { |m, v| @reporter.report(m.to_s, v, integration: :sidekiq, unit: "Count") }
@reporter.report("DefaultQueueLatency", stats.default_queue_latency, integration: :sidekiq, unit: "Seconds")
}.each { |m, v| @reporter.report(metric: m, value: v) }
@reporter.report(metric: :DefaultQueueLatency, value: stats.default_queue_latency)
end

def report_utilization(processes)
capacity = processes.sum { |p| p["concurrency"] }
@reporter.report("Capacity", capacity, integration: :sidekiq, unit: "Count")
@reporter.report(metric: :Capacity, value: capacity)

utilization = avg_utilization(processes) * 100.0
@reporter.report("Utilization", utilization, integration: :sidekiq, unit: "Percent") unless utilization.nan?
@reporter.report(metric: :Utilization, value: utilization) unless utilization.nan?

processes.group_by { |p| p["tag"] }.each do |tag, procs|
next unless tag
dims = [{name: "Tag", value: tag}]
capacity = procs.sum { |p| p["concurrency"] }
@reporter.report("Capacity", capacity, integration: :sidekiq, unit: "Count", dimensions: dims)
@reporter.report(metric: :Capacity, value: capacity, dimensions: {Tag: tag})
util = avg_utilization(procs) * 100.0
@reporter.report("Utilization", util, integration: :sidekiq, unit: "Percent", dimensions: dims) unless util.nan?
@reporter.report(metric: :Utilization, value: util, dimensions: {Tag: tag}) unless util.nan?
end
end

def report_process_metrics(processes)
processes.each do |p|
next if p["concurrency"].zero?
util = p["busy"] / p["concurrency"].to_f * 100.0
dims = [{name: "Hostname", value: p["hostname"]}]
dims << {name: "Tag", value: p["tag"]} if p["tag"] && !p["tag"].to_s.empty?
@reporter.report("Utilization", util, integration: :sidekiq, unit: "Percent", dimensions: dims)
dims = {Hostname: p["hostname"]}
dims[:Tag] = p["tag"] if p["tag"] && !p["tag"].to_s.empty?
@reporter.report(metric: :Utilization, value: util, dimensions: dims)
end
end

Expand All @@ -86,9 +85,8 @@ def report_queue_metrics
all_queues = ::Sidekiq::Queue.all
queues = (configured.nil? || configured.empty?) ? all_queues : all_queues.select { |q| configured.include?(q.name) }
queues.each do |q|
dims = [{name: "QueueName", value: q.name}]
@reporter.report("QueueLatency", q.latency, integration: :sidekiq, unit: "Seconds", dimensions: dims)
@reporter.report("QueueSize", q.size, integration: :sidekiq, unit: "Count", dimensions: dims)
@reporter.report(metric: :QueueLatency, value: q.latency, dimensions: {QueueName: q.name})
@reporter.report(metric: :QueueSize, value: q.size, dimensions: {QueueName: q.name})
end
end

Expand Down
33 changes: 12 additions & 21 deletions test/active_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,20 @@ def test_reports_queue_time_when_job_executes
job = TestJob.new("test_arg")
job.enqueued_at = Time.now.to_f - 2.5

reported_metric = nil
reported_value = nil
reported_integration = nil
reported_dimensions = nil
reported_kwargs = nil

reporter = Speedshop::Cloudwatch.reporter
reporter.stub :report, ->(metric, value, integration:, unit: nil, dimensions: nil) {
reported_metric = metric
reported_value = value
reported_integration = integration
reported_dimensions = dimensions
reporter.stub :report, ->(**kwargs) {
reported_kwargs = kwargs
} do
job.perform_now
end

assert_equal "QueueLatency", reported_metric
assert_operator reported_value, :>=, 2.5
assert_equal :active_job, reported_integration
assert_equal 2, reported_dimensions.size
assert_equal "JobClass", reported_dimensions[0][:name]
assert_equal "TestJob", reported_dimensions[0][:value]
assert_equal "QueueName", reported_dimensions[1][:name]
assert_equal "default", reported_dimensions[1][:value]
assert_equal :QueueLatency, reported_kwargs[:metric]
assert_operator reported_kwargs[:value], :>=, 2.5
assert_equal 2, reported_kwargs[:dimensions].size
assert_equal "TestJob", reported_kwargs[:dimensions][:JobClass]
assert_equal "default", reported_kwargs[:dimensions][:QueueName]
end

def test_does_not_report_when_enqueued_at_is_nil
Expand Down Expand Up @@ -98,15 +89,15 @@ def test_uses_configured_namespace
job = TestJob.new("test_arg")
job.enqueued_at = Time.now.to_f - 1.0

reported_integration = nil
reported_kwargs = nil
reporter = Speedshop::Cloudwatch.reporter
reporter.stub :report, ->(metric, value, integration:, **kwargs) {
reported_integration = integration
reporter.stub :report, ->(**kwargs) {
reported_kwargs = kwargs
} do
job.perform_now
end

assert_equal :active_job, reported_integration
assert_equal :QueueLatency, reported_kwargs[:metric]
end

def test_respects_active_job_metrics_whitelist
Expand Down
40 changes: 20 additions & 20 deletions test/metric_reporter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def teardown
end

def test_queues_metrics
@reporter.report("test_metric", 42, integration: :test)
@reporter.report("another_metric", 100, integration: :test, unit: "Count")
@reporter.report(metric: :test_metric, value: 42)
@reporter.report(metric: :another_metric, value: 100)
end

def test_can_start_and_stop
Expand All @@ -27,7 +27,7 @@ def test_can_start_and_stop
end

def test_filters_unregistered_puma_metrics
@reporter.report("Workers", 4, integration: :puma)
@reporter.report(metric: :Workers, value: 4)

queue = @reporter.queue
assert_empty queue
Expand All @@ -36,16 +36,16 @@ def test_filters_unregistered_puma_metrics
def test_respects_puma_metrics_whitelist
@reporter.register_collector(:puma) {}
@config.metrics[:puma] = [:Workers]
@reporter.report("Workers", 4, integration: :puma)
@reporter.report("BootedWorkers", 4, integration: :puma)
@reporter.report(metric: :Workers, value: 4)
@reporter.report(metric: :BootedWorkers, value: 4)

queue = @reporter.queue
assert_equal 1, queue.size
assert_equal "Workers", queue.first[:metric_name]
end

def test_filters_unregistered_sidekiq_metrics
@reporter.report("EnqueuedJobs", 10, integration: :sidekiq)
@reporter.report(metric: :EnqueuedJobs, value: 10)

queue = @reporter.queue
assert_empty queue
Expand All @@ -54,9 +54,9 @@ def test_filters_unregistered_sidekiq_metrics
def test_respects_sidekiq_metrics_whitelist
@reporter.register_collector(:sidekiq) {}
@config.metrics[:sidekiq] = [:EnqueuedJobs, :QueueLatency]
@reporter.report("EnqueuedJobs", 10, integration: :sidekiq)
@reporter.report("ProcessedJobs", 100, integration: :sidekiq)
@reporter.report("QueueLatency", 5.2, integration: :sidekiq)
@reporter.report(metric: :EnqueuedJobs, value: 10)
@reporter.report(metric: :ProcessedJobs, value: 100)
@reporter.report(metric: :QueueLatency, value: 5.2)

queue = @reporter.queue
assert_equal 2, queue.size
Expand All @@ -68,9 +68,9 @@ def test_respects_sidekiq_metrics_whitelist

def test_allows_unknown_namespaces
@config.namespaces[:custom] = "CustomNamespace"
@config.metrics[:custom] = [:custom_metric]
@config.metrics[:custom] = [:my_custom_metric]
@reporter.register_collector(:custom) {}
@reporter.report("custom_metric", 42, integration: :custom)
@reporter.report(metric: :my_custom_metric, value: 42)

queue = @reporter.queue
assert_equal 1, queue.size
Expand Down Expand Up @@ -133,7 +133,7 @@ def test_started_detects_dead_thread
def test_adds_custom_dimensions_to_metrics
@config.dimensions = {ServiceName: "myservice-api", Environment: "production"}
@reporter.register_collector(:test) {}
@reporter.report("test_metric", 42, integration: :test, dimensions: [{name: "Region", value: "us-east-1"}])
@reporter.report(metric: :test_metric, value: 42, dimensions: {Region: "us-east-1"})

queue = @reporter.queue
assert_equal 1, queue.size
Expand All @@ -154,7 +154,7 @@ def test_adds_custom_dimensions_to_metrics

def test_works_without_custom_dimensions
@reporter.register_collector(:test) {}
@reporter.report("test_metric", 42, integration: :test, dimensions: [{name: "Region", value: "us-east-1"}])
@reporter.report(metric: :test_metric, value: 42, dimensions: {Region: "us-east-1"})

queue = @reporter.queue
assert_equal 1, queue.size
Expand All @@ -167,7 +167,7 @@ def test_works_without_custom_dimensions
def test_custom_dimensions_with_no_metric_dimensions
@config.dimensions = {ServiceName: "myservice-api"}
@reporter.register_collector(:test) {}
@reporter.report("test_metric", 42, integration: :test)
@reporter.report(metric: :test_metric, value: 42)

queue = @reporter.queue
assert_equal 1, queue.size
Expand All @@ -181,7 +181,7 @@ def test_lazy_startup_on_first_report
@reporter.register_collector(:test) {}
refute @reporter.started?

@reporter.report("test_metric", 42, integration: :test)
@reporter.report(metric: :test_metric, value: 42)

assert @reporter.started?
assert @reporter.thread.alive?
Expand All @@ -191,31 +191,31 @@ def test_lazy_startup_does_not_double_start
@reporter.register_collector(:test) {}
refute @reporter.started?

@reporter.report("metric1", 1, integration: :test)
@reporter.report(metric: :metric1, value: 1)
thread1 = @reporter.thread

@reporter.report("metric2", 2, integration: :test)
@reporter.report(metric: :metric2, value: 2)
thread2 = @reporter.thread

assert_same thread1, thread2
end

def test_lazy_startup_restarts_after_stop
@reporter.register_collector(:test) {}
@reporter.report("metric1", 1, integration: :test)
@reporter.report(metric: :metric1, value: 1)
assert @reporter.started?

@reporter.stop!
refute @reporter.started?

@reporter.report("metric2", 2, integration: :test)
@reporter.report(metric: :metric2, value: 2)
assert @reporter.started?
end

def test_lazy_startup_with_unregistered_integration
refute @reporter.started?

@reporter.report("Workers", 4, integration: :puma)
@reporter.report(metric: :Workers, value: 4)

refute @reporter.started?
assert_empty @reporter.queue
Expand Down
6 changes: 3 additions & 3 deletions test/rack_middleware_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ def test_uses_configured_namespace

reporter = Speedshop::Cloudwatch.reporter

reported_integration = nil
reporter.stub :report, ->(metric_name, value, integration:, **kwargs) { reported_integration = integration } do
reported_kwargs = nil
reporter.stub :report, ->(**kwargs) { reported_kwargs = kwargs } do
call_middleware_with_header("HTTP_X_REQUEST_START")
end

assert_equal :rack, reported_integration
assert_equal :RequestQueueTime, reported_kwargs[:metric]
end

def test_logs_error_when_collection_fails
Expand Down
5 changes: 3 additions & 2 deletions test/support/doubles.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ def initialize
@metrics_collected = []
end

def report(metric_name, value, **options)
@metrics_collected << {name: metric_name, value: value, **options}
def report(metric:, value:, dimensions: {}, namespace: nil)
dims = dimensions.map { |k, v| {name: k.to_s, value: v.to_s} }
@metrics_collected << {name: metric.to_s, value: value, dimensions: dims, namespace: namespace}
end

def register_collector(integration, &block)
Expand Down
Loading