diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb index 35593185f..30036f318 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb @@ -16,6 +16,7 @@ module Metrics require 'opentelemetry/sdk/metrics/aggregation' require 'opentelemetry/sdk/metrics/configuration_patch' require 'opentelemetry/sdk/metrics/export' +require 'opentelemetry/sdk/metrics/fork_hooks' require 'opentelemetry/sdk/metrics/instrument' require 'opentelemetry/sdk/metrics/meter' require 'opentelemetry/sdk/metrics/meter_provider' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb index f9ee2c07f..567ea4fdb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb @@ -25,6 +25,7 @@ def initialize def metrics_configuration_hook OpenTelemetry.meter_provider = Metrics::MeterProvider.new(resource: @resource) configure_metric_readers + attach_fork_hooks! end def configure_metric_readers @@ -52,6 +53,10 @@ def wrapped_metric_exporters_from_env end end end + + def attach_fork_hooks! + ForkHooks.attach! + end end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 33551f135..cc779c413 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -78,6 +78,13 @@ def force_flush(timeout: nil) Export::FAILURE end + def after_fork + @exporter.reset if @exporter.respond_to?(:reset) + collect # move past previously reported metrics from parent process + @thread = nil + start + end + # Check both @thread and @continue object to determine if current # PeriodicMetricReader is still alive. If one of them is true/alive, # then PeriodicMetricReader is determined as alive diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb new file mode 100644 index 000000000..0f3f29ce9 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + # ForkHooks implements methods to run callbacks before and after forking a Process by overriding Process::_fork + # This is used to ensure that the PeriodicMetricReader is restarted after forking + module ForkHooks + def self.attach! + return if @fork_hooks_attached + + Process.singleton_class.prepend(ForkHooks) + @fork_hooks_attached = true + end + + def self.after_fork + ::OpenTelemetry.meter_provider.metric_readers.each do |reader| + reader.after_fork if reader.respond_to?(:after_fork) + end + end + + def _fork + parent_pid = Process.pid + super.tap do + ForkHooks.after_fork unless Process.pid == parent_pid + end + end + end + end + end +end diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 194d8eed1..2d61bc4bd 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -5,6 +5,7 @@ # SPDX-License-Identifier: Apache-2.0 require 'test_helper' +require 'json' describe OpenTelemetry::SDK do describe '#periodic_metric_reader' do @@ -82,6 +83,46 @@ _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end + unless Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes + it 'is restarted after forking' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + read, write = IO.pipe + + pid = fork do + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + sleep(8) + snapshot = metric_exporter.metric_snapshots + + json = snapshot.map { |reading| { name: reading.name } }.to_json + write.puts json + end + + Timeout.timeout(10) do + Process.waitpid(pid) + end + + periodic_metric_reader.shutdown + snapshot = JSON.parse(read.gets.chomp) + _(snapshot.size).must_equal(1) + _(snapshot[0]).must_equal('name' => 'counter') + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + end + end + it 'shutdown break the export interval cycle' do OpenTelemetry::SDK.configure diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb new file mode 100644 index 000000000..a4f129ffd --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +return if Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes + +require 'test_helper' +require 'json' + +describe OpenTelemetry::SDK::Metrics::ForkHooks do + def fork_with_fork_hooks(after_fork_lambda) + with_pipe do |inner_read_io, inner_write_io| + child_pid = fork do # fork twice to avoid prepending fork in the parent process + setup_fork_hooks(after_fork_lambda) do + grandchild_pid = fork {} + Timeout.timeout(5) { Process.waitpid(grandchild_pid) } + message = { 'child_pid' => Process.pid, 'grandchild_pid' => grandchild_pid }.to_json + inner_write_io.puts message + rescue StandardError => e + message = { 'error' => e.message }.to_json + inner_write_io.puts message + end + end + Timeout.timeout(10) { Process.waitpid(child_pid) } + received_from_child = JSON.parse(inner_read_io.gets.chomp) + refute_includes(received_from_child, 'error') + grandchild_pid = received_from_child['grandchild_pid'] + refute_equal(child_pid, Process.pid) + refute_equal(child_pid, grandchild_pid) + [child_pid, grandchild_pid] + end + end + + def setup_fork_hooks(after_hook) + OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do + Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks) + yield if block_given? + end + end + + def with_pipe + read_io, write_io = IO.pipe + yield(read_io, write_io) + ensure + read_io.close unless read_io.closed? + write_io.close unless write_io.closed? + end + + it 'runs the after_hook after forking' do + with_pipe do |after_fork_read_io, after_fork_write_io| + after_fork_lambda = proc do + message = { 'after_fork_pid' => Process.pid }.to_json + after_fork_write_io.puts message + end + + forking_pid, forked_pid = fork_with_fork_hooks(after_fork_lambda) + pid_from_after_fork = JSON.parse(after_fork_read_io.gets.chomp)['after_fork_pid'].to_i + + refute_equal(pid_from_after_fork, Process.pid) + refute_equal(pid_from_after_fork, forking_pid) + assert_equal(forked_pid, pid_from_after_fork) + end + end + + it 'calls after_fork on metric readers' do + reader1 = Class.new do + attr_reader :after_fork_called + + def after_fork + @after_fork_called = true + end + end.new + + reader2 = OpenStruct.new + + meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new + meter_provider.add_metric_reader(reader1) + meter_provider.add_metric_reader(reader2) + ::OpenTelemetry.stub(:meter_provider, meter_provider) do + OpenTelemetry::SDK::Metrics::ForkHooks.after_fork + end + assert(reader1.after_fork_called) + end +end diff --git a/metrics_sdk/test/test_helper.rb b/metrics_sdk/test/test_helper.rb index 11076bde1..cd2775c75 100644 --- a/metrics_sdk/test/test_helper.rb +++ b/metrics_sdk/test/test_helper.rb @@ -20,7 +20,7 @@ def reset_metrics_sdk :@meter_provider, OpenTelemetry::Internal::ProxyMeterProvider.new ) - + OpenTelemetry::SDK::Metrics::ForkHooks.instance_variable_set(:@fork_hooks_attached, false) OpenTelemetry.logger = Logger.new(File::NULL) OpenTelemetry.error_handler = nil end