Skip to content

Commit d441226

Browse files
committed
- Refactor metric reporting for improved cardinality and aggregation logic
This commit message was generated with the help of LLMs.
1 parent fbc7e60 commit d441226

File tree

5 files changed

+134
-10
lines changed

5 files changed

+134
-10
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ We report the following metrics:
168168
QueueLatency - Time job spent waiting in queue before execution (Seconds)
169169
```
170170

171-
This metric includes JobClass and QueueName dimensions.
171+
This metric includes QueueName dimension and is aggregated per interval using CloudWatch StatisticSets.
172172

173173
### Rails
174174

@@ -218,3 +218,9 @@ If for some reason you want to disable an integration _after_ this registration,
218218
```ruby
219219

220220
```
221+
222+
## Comparisons and Design Notes
223+
224+
For a deep comparison with related libraries and concrete recommendations informed by their approaches, see:
225+
226+
- docs/cloudwatch_libs_comparison.md

docs/metrics.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,6 @@ This document explains each metric collected by Speedshop::Cloudwatch, how it's
132132

133133
### QueueLatency
134134
- **Unit:** Seconds
135-
- **Dimensions:** JobClass, QueueName
135+
- **Dimensions:** QueueName
136136
- **Source:** `Time.now.to_f - job.enqueued_at`
137-
- **Description:** Time a job spent waiting in the queue before execution started. Measured when the job begins executing. High latency indicates jobs are backing up for this job class and queue combination.
137+
- **Description:** Time a job spent waiting in the queue before execution started. Measured when the job begins executing. Values are aggregated per queue into CloudWatch StatisticSets for each reporting interval.

lib/speedshop/cloudwatch/active_job.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ def report_job_metrics
1111
begin
1212
if enqueued_at
1313
queue_time = Time.now.to_f - enqueued_at.to_f
14-
Reporter.instance.report(metric: :QueueLatency, value: queue_time, dimensions: {JobClass: self.class.name, QueueName: queue_name}, integration: :active_job)
14+
# Drop JobClass to reduce time series cardinality and allow aggregation into StatisticSets per queue
15+
Reporter.instance.report(metric: :QueueLatency, value: queue_time, dimensions: {QueueName: queue_name}, integration: :active_job)
1516
end
1617
rescue => e
1718
Speedshop::Cloudwatch.log_error("Failed to collect ActiveJob metrics: #{e.message}", e)

lib/speedshop/cloudwatch/puma.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def collect
1212
metric_name = m.to_s.split("_").map(&:capitalize).join.to_sym
1313
Reporter.instance.report(metric: metric_name, value: stats[m] || 0)
1414
end
15+
report_aggregate_worker_stats(stats)
1516
end
1617

1718
workers = stats[:worker_status] ? worker_statuses(stats) : [[stats, 0]]
@@ -32,6 +33,33 @@ def report_worker(stats, idx)
3233
Reporter.instance.report(metric: metric_name, value: stats[m] || 0, dimensions: {WorkerIndex: idx.to_s})
3334
end
3435
end
36+
37+
def report_aggregate_worker_stats(stats)
38+
statuses = stats[:worker_status].map { |w| w[:last_status] || {} }
39+
metrics = %i[running backlog pool_capacity max_threads]
40+
41+
metrics.each do |m|
42+
values = statuses.map { |s| s[m] }.compact
43+
next if values.empty?
44+
45+
sample_count = values.length
46+
sum = values.inject(0) { |acc, v| acc + v.to_f }
47+
minimum = values.min.to_f
48+
maximum = values.max.to_f
49+
50+
metric_name = m.to_s.split("_").map(&:capitalize).join.to_sym
51+
Reporter.instance.report(
52+
metric: metric_name,
53+
statistic_values: {
54+
sample_count: sample_count,
55+
sum: sum,
56+
minimum: minimum,
57+
maximum: maximum
58+
},
59+
integration: :puma
60+
)
61+
end
62+
end
3563
end
3664
end
3765
end

lib/speedshop/cloudwatch/reporter.rb

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def stop!
5454
thread_to_join&.join
5555
end
5656

57-
def report(metric:, value:, dimensions: {}, namespace: nil, integration: nil)
57+
def report(metric:, value: nil, statistic_values: nil, dimensions: {}, namespace: nil, integration: nil)
5858
metric_name = metric.to_sym
5959

6060
int = integration || find_integration_for_metric(metric_name)
@@ -68,11 +68,16 @@ def report(metric:, value:, dimensions: {}, namespace: nil, integration: nil)
6868
dimensions_array = dimensions.map { |k, v| {name: k.to_s, value: v.to_s} }
6969
all_dimensions = dimensions_array + custom_dimensions
7070

71-
@mutex.synchronize do
72-
@queue << {metric_name: metric_name.to_s, value: value, namespace: ns, unit: unit,
73-
dimensions: all_dimensions, timestamp: Time.now}
71+
datum = {metric_name: metric_name.to_s, namespace: ns, unit: unit,
72+
dimensions: all_dimensions, timestamp: Time.now}
73+
if statistic_values
74+
datum[:statistic_values] = statistic_values
75+
else
76+
datum[:value] = value
7477
end
7578

79+
@mutex.synchronize { @queue << datum }
80+
7681
start! unless started?
7782
end
7883

@@ -137,15 +142,99 @@ def flush_metrics
137142
metrics = @mutex.synchronize { @queue.empty? ? nil : @queue.dup.tap { @queue.clear } }
138143
return unless metrics
139144

145+
high_resolution = config.interval.to_i < 60
146+
140147
metrics.group_by { |m| m[:namespace] }.each do |namespace, ns_metrics|
141148
config.logger.debug "Speedshop::Cloudwatch: Sending #{ns_metrics.size} metrics to namespace #{namespace}"
142-
metric_data = ns_metrics.map { |m| m.slice(:metric_name, :value, :unit, :timestamp, :dimensions) }
143-
config.client.put_metric_data(namespace: namespace, metric_data: metric_data)
149+
150+
aggregated = aggregate_namespace_metrics(ns_metrics)
151+
152+
metric_data = aggregated.map do |m|
153+
datum = {
154+
metric_name: m[:metric_name],
155+
unit: m[:unit],
156+
timestamp: m[:timestamp],
157+
dimensions: m[:dimensions]
158+
}
159+
if m[:statistic_values]
160+
datum[:statistic_values] = m[:statistic_values]
161+
else
162+
datum[:value] = m[:value]
163+
end
164+
datum[:storage_resolution] = 1 if high_resolution
165+
datum
166+
end
167+
168+
metric_data.each_slice(20) do |batch|
169+
config.client.put_metric_data(namespace: namespace, metric_data: batch)
170+
end
144171
end
145172
rescue => e
146173
Speedshop::Cloudwatch.log_error("Failed to send metrics: #{e.message}", e)
147174
end
148175

176+
def aggregate_namespace_metrics(ns_metrics)
177+
# Group by metric_name + unit + dimensions (order-insensitive) to aggregate within a flush window.
178+
groups = {}
179+
180+
ns_metrics.each do |m|
181+
key = [m[:metric_name], m[:unit], normalized_dimensions_key(m[:dimensions])]
182+
(groups[key] ||= []) << m
183+
end
184+
185+
aggregated = []
186+
groups.each do |(_name, _unit, _dims_key), items|
187+
# If only one item and it already has value/statistic_values, keep as-is
188+
if items.size == 1
189+
aggregated << items.first
190+
next
191+
end
192+
193+
# Aggregate values and statistic_values into a single StatisticSet
194+
sample_count = 0.0
195+
sum = 0.0
196+
minimum = Float::INFINITY
197+
maximum = -Float::INFINITY
198+
199+
items.each do |it|
200+
if it[:statistic_values]
201+
sv = it[:statistic_values]
202+
sc = sv[:sample_count].to_f
203+
sample_count += sc
204+
sum += sv[:sum].to_f
205+
minimum = [minimum, sv[:minimum].to_f].min
206+
maximum = [maximum, sv[:maximum].to_f].max
207+
elsif it.key?(:value)
208+
v = it[:value].to_f
209+
sample_count += 1.0
210+
sum += v
211+
minimum = [minimum, v].min
212+
maximum = [maximum, v].max
213+
end
214+
end
215+
216+
aggregated << {
217+
metric_name: items.first[:metric_name],
218+
unit: items.first[:unit],
219+
dimensions: items.first[:dimensions],
220+
# Use flush time to represent the interval
221+
timestamp: Time.now,
222+
statistic_values: {
223+
sample_count: sample_count,
224+
sum: sum,
225+
minimum: minimum.finite? ? minimum : 0.0,
226+
maximum: maximum.finite? ? maximum : 0.0
227+
}
228+
}
229+
end
230+
231+
aggregated
232+
end
233+
234+
def normalized_dimensions_key(dims)
235+
(dims || []).sort_by { |d| d[:name].to_s }.map { |d| "#{d[:name]}=#{d[:value]}" }.join("|")
236+
end
237+
149238
def metric_allowed?(integration, metric_name)
150239
config.metrics[integration].include?(metric_name.to_sym)
151240
end

0 commit comments

Comments
 (0)