-
Notifications
You must be signed in to change notification settings - Fork 284
Expand file tree
/
Copy pathasynchronous_metric_stream.rb
More file actions
114 lines (98 loc) · 4.12 KB
/
Copy pathasynchronous_metric_stream.rb
File metadata and controls
114 lines (98 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# frozen_string_literal: true
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
module OpenTelemetry
module SDK
module Metrics
module State
# @api private
#
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
# public API. It extends MetricStream to support asynchronous instruments.
class AsynchronousMetricStream < MetricStream
DEFAULT_TIMEOUT = 30
def initialize(
name,
description,
unit,
instrument_kind,
meter_provider,
instrumentation_scope,
aggregation,
callback,
timeout,
attributes,
exemplar_filter,
exemplar_reservoir
)
# Call parent constructor with common parameters
super(name, description, unit, instrument_kind, meter_provider, instrumentation_scope, aggregation, exemplar_filter, exemplar_reservoir)
# Initialize asynchronous-specific attributes
@callback = callback
@start_time = OpenTelemetry::Common::Utilities.time_in_nanoseconds
@timeout = timeout
@attributes = attributes
end
# When collect, if there are asynchronous SDK Instruments involved, their callback functions will be triggered.
# Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#collect
# invoke_callback will update the data_points in aggregation
def collect(start_time, end_time)
invoke_callback(@timeout, @attributes)
# Call parent collect method for the core collection logic
super
end
def invoke_callback(timeout, attributes)
if @registered_views.empty?
resolved_cardinality_limit = @cardinality_limit || DEFAULT_CARDINALITY_LIMIT
@mutex.synchronize do
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
if value.is_a?(Numeric)
exemplar_offer = should_offer_exemplar?(value, attributes)
@default_aggregation.update(value, attributes, @data_points, resolved_cardinality_limit, exemplar_offer: exemplar_offer)
end
end
end
else
@registered_views.each do |view, data_points|
resolved_cardinality_limit = resolve_cardinality_limit(view)
@mutex.synchronize do
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
next unless value.is_a?(Numeric) # ignore if value is not valid number
merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
if view.valid_aggregation?
exemplar_offer = should_offer_exemplar?(value, merged_attributes)
view.aggregation.update(value, attributes, data_points, resolved_cardinality_limit, exemplar_offer: exemplar_offer)
end
end
end
end
end
end
private
def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
result = nil
thread = Thread.new do
result = callback.call
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Error invoking callback.')
result = :error
end
unless thread.join(timeout)
thread.kill
OpenTelemetry.handle_error(message: "Timeout while invoking callback after #{timeout} seconds")
return nil
end
result == :error ? nil : result
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Unexpected error in callback execution.')
nil
end
end
end
end
end
end