diff --git a/temporalio/lib/temporalio/contrib/open_telemetry.rb b/temporalio/lib/temporalio/contrib/open_telemetry.rb index 5838b5cf..a894e300 100644 --- a/temporalio/lib/temporalio/contrib/open_telemetry.rb +++ b/temporalio/lib/temporalio/contrib/open_telemetry.rb @@ -23,6 +23,11 @@ class TracingInterceptor # @param tracer [OpenTelemetry::Trace::Tracer] Tracer to use. # @param header_key [String] Temporal header name to serialize spans to/from. Most users should not change this. # @param propagator [Object] Propagator to use. Most users should not change this. + # @param always_create_workflow_spans [Boolean] When false, the default, spans are only created in workflows + # when an overarching span from the client is present. In cases of starting a workflow elsewhere, e.g. CLI or + # schedules, a client-created span is not present and workflow spans will not be created. Setting this to true + # will create spans in workflows no matter what, but there is a risk of them being orphans since they may not + # have a parent span after replaying. def initialize( tracer, header_key: '_tracer-data', @@ -31,11 +36,13 @@ def initialize( ::OpenTelemetry::Trace::Propagation::TraceContext::TextMapPropagator.new, ::OpenTelemetry::Baggage::Propagation::TextMapPropagator.new ] - ) + ), + always_create_workflow_spans: false ) @tracer = tracer @header_key = header_key @propagator = propagator + @always_create_workflow_spans = always_create_workflow_spans end # @!visibility private @@ -85,6 +92,11 @@ def _with_started_span( end end + # @!visibility private + def _always_create_workflow_spans + @always_create_workflow_spans + end + # @!visibility private class ClientOutbound < Client::Interceptor::Outbound def initialize(root, next_interceptor) @@ -423,30 +435,34 @@ def self.completed_span( even_during_replay: false ) # Get root interceptor, which also checks if in workflow - root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] + root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] #: TracingInterceptor? raise 'Tracing interceptor not configured' unless root # Do nothing if replaying and not wanted during replay return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying? - # Do nothing if there is no span on the context. We do not want orphan spans coming from workflows, so we - # require a parent (i.e. a current). - # TODO(cretz): This matches Python behavior but not .NET behavior (which will create no matter what), is that - # ok? - return nil if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID + # If there is no span on the context and the user hasn't opted in to always creating, do not create. This + # prevents orphans if there was no span originally created from the client start-workflow call. + if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID && + !root._always_create_workflow_spans + return nil + end # Create attributes, adding user-defined ones attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id, 'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes) - # Create span - time = Temporalio::Workflow.now - timestamp = (time.to_i * 1_000_000_000) + time.nsec - span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore - # Record exception if present - span.record_exception(exception) if exception - # Finish the span (returns self) - span.finish(end_timestamp: timestamp) + # Create span, which has to be done with illegal call disabling because OTel asks for full exception message + # which uses error highlighting and such which accesses File#path + Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do + time = Temporalio::Workflow.now + timestamp = (time.to_i * 1_000_000_000) + time.nsec + span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore + # Record exception if present + span.record_exception(exception) if exception + # Finish the span (returns self) + span.finish(end_timestamp: timestamp) + end end end end diff --git a/temporalio/sig/temporalio/contrib/open_telemetry.rbs b/temporalio/sig/temporalio/contrib/open_telemetry.rbs index a0c60416..de2158cf 100644 --- a/temporalio/sig/temporalio/contrib/open_telemetry.rbs +++ b/temporalio/sig/temporalio/contrib/open_telemetry.rbs @@ -11,7 +11,8 @@ module Temporalio def initialize: ( untyped tracer, ?header_key: String, - ?propagator: untyped + ?propagator: untyped, + ?always_create_workflow_spans: bool ) -> void def _apply_context_to_headers: (Hash[String, untyped] headers, ?context: untyped) -> void @@ -23,6 +24,7 @@ module Temporalio ?attributes: Hash[untyped, untyped]?, ?outbound_input: untyped ) { () -> T } -> T + def _always_create_workflow_spans: -> bool class WorkflowInbound < Worker::Interceptor::Workflow::Inbound def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void diff --git a/temporalio/test/contrib/open_telemetry_test.rb b/temporalio/test/contrib/open_telemetry_test.rb index 0c64e83c..e8dd7a57 100644 --- a/temporalio/test/contrib/open_telemetry_test.rb +++ b/temporalio/test/contrib/open_telemetry_test.rb @@ -96,10 +96,12 @@ def update(scenario) ) do Temporalio::Workflow.execute_local_activity(TestActivity, :fail_first_attempt, start_to_close_timeout: 30) end - when :child_workflow - # Start a child, send child signal and external signal, finish + when :child_workflow_child_signal + handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal) + handle.signal(TestWorkflow.signal, :mark_finished) + [handle.id, handle.first_execution_run_id, handle.result] + when :child_workflow_external_signal handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal) - handle.signal(TestWorkflow.signal, :complete) Temporalio::Workflow.external_workflow_handle(handle.id).signal(TestWorkflow.signal, :mark_finished) [handle.id, handle.first_execution_run_id, handle.result] else @@ -138,11 +140,18 @@ def init_tracer_and_exporter [tracer, exporter] end - def trace(tracer_and_exporter: init_tracer_and_exporter, &) + def trace( + tracer_and_exporter: init_tracer_and_exporter, + always_create_workflow_spans: false, + check_root: true, + & + ) tracer, exporter = tracer_and_exporter # Make client with interceptors - interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(tracer) + interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new( + tracer, always_create_workflow_spans: + ) new_options = env.client.options.with(interceptors: [interceptor]) client = Temporalio::Client.new(**new_options.to_h) # steep:ignore @@ -153,13 +162,22 @@ def trace(tracer_and_exporter: init_tracer_and_exporter, &) # Convert spans, confirm there is only the outer, and return children spans = ExpectedSpan.from_span_data(exporter.finished_spans) - assert_equal 1, spans.size - assert_equal 'root', spans.first&.name + if check_root + assert_equal 1, spans.size + assert_equal 'root', spans.first&.name + end spans.first end - def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &) - trace(tracer_and_exporter:) do |client| + def trace_workflow( + scenario, + tracer_and_exporter: init_tracer_and_exporter, + start_with_untraced_client: false, + always_create_workflow_spans: false, + check_root: true, + & + ) + trace(tracer_and_exporter:, always_create_workflow_spans:, check_root:) do |client| # Must capture and attach outer context outer_context = OpenTelemetry::Context.current attach_token = nil @@ -169,7 +187,8 @@ def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &) client:, activities: [TestActivity.new(tracer_and_exporter.first)], # Have to reattach outer context inside worker run to check outer span - on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) } + on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) }, + start_workflow_client: start_with_untraced_client ? env.client : client ) do |handle| yield handle ensure @@ -398,42 +417,61 @@ def test_client_fail end def test_child_and_external - exp_root = ExpectedSpan.new(name: 'root') - act_root = trace_workflow(:wait_on_signal) do |handle| - exp_cl_attrs = { 'temporalWorkflowID' => handle.id } - exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) - exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) - exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) - - # Wait for task completion so update isn't accidentally first before run - assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } - - # Update calls child and sends signals to it in two ways - child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update, - :child_workflow, id: 'my-update-id') - exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', - attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) - # Expected span for update - exp_hnd_update = exp_start_wf.add_child( - name: 'HandleUpdate:update', - attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), - links: [exp_update] - ) - # Expected for children - exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id } - exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs) - exp_child_start - .add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs) - .add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs) - # Two signals we send to the child - exp_sig_child = exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs) - exp_sig_ext = exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs) - exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_child]) - exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_ext]) - - assert_equal 'workflow-done', child_result + # We have to test child signal and external signal separately because sending both back-to-back can result in + # rare cases where one is delivered before the other (yes, even if you wait on the first to get an initiated + # event) + %i[child_workflow_child_signal child_workflow_external_signal].each do |scenario| + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace_workflow(:wait_on_signal) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Wait for task completion so update isn't accidentally first before run + assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } + + # Update calls child and sends signals to it in two ways + child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update, + scenario, id: 'my-update-id') + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) + # Expected span for update + exp_hnd_update = exp_start_wf.add_child( + name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + links: [exp_update] + ) + # Expected for children + exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id } + exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs) + exp_child_start + .add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs) + .add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs) + + # There are cases where signal comes _before_ start and cases where signal _comes_ after and server gives us + # no way of knowing that a child _actually_ began running, so we check whether task completed comes before + # signal + assert_equal 'workflow-done', child_result + child_events = env.client.workflow_handle(child_id.to_s).fetch_history_events.to_a + signal_comes_first = child_events.index(&:workflow_execution_signaled_event_attributes).to_i < + child_events.index(&:workflow_task_completed_event_attributes).to_i + # Signal we send to the child + exp_sig = if scenario == :child_workflow_child_signal + exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs) + else + exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs) + end + exp_child_start.add_child( + name: 'HandleSignal:signal', + attributes: exp_child_run_attrs, + links: [exp_sig], + insert_at: signal_comes_first ? 0 : 1 + ) + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented, + "Expected:\n#{exp_root.to_s_indented}\nActual:#{act_root.to_s_indented}" end - assert_equal exp_root.to_s_indented, act_root.to_s_indented end def test_continue_as_new @@ -458,6 +496,29 @@ def test_continue_as_new assert_equal exp_root.to_s_indented, act_root.to_s_indented end + def test_always_create_workflow_spans + # Untraced client has no spans by default + act = trace_workflow(:complete, start_with_untraced_client: true, check_root: false) do |handle| + assert_equal 'workflow-done', handle.result + end + assert_empty act.children + + # Untraced client has no spans by default + exp_root = ExpectedSpan.new(name: 'root') + act = trace_workflow( + :complete, + start_with_untraced_client: true, + always_create_workflow_spans: true, + check_root: false + ) do |handle| + exp_attrs = { 'temporalWorkflowID' => handle.id, 'temporalRunID' => handle.result_run_id } + exp_run_wf = exp_root.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_attrs) + exp_run_wf.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_attrs) + assert_equal 'workflow-done', handle.result + end + assert_equal exp_root.children.first&.to_s_indented, act.to_s_indented + end + ExpectedSpan = Data.define(:name, :children, :attributes, :links, :exception_message) # rubocop:disable Layout/ClassStructure class ExpectedSpan @@ -493,13 +554,16 @@ def self.from_span_data(all_spans) end def initialize(name:, children: [], attributes: {}, links: [], exception_message: nil) - children = children.to_set super end - def add_child(name:, attributes: {}, links: [], exception_message: nil) + def add_child(name:, attributes: {}, links: [], exception_message: nil, insert_at: nil) span = ExpectedSpan.new(name:, attributes:, links:, exception_message:) - children << span + if insert_at.nil? + children << span + else + children.insert(insert_at, span) + end span end diff --git a/temporalio/test/sig/contrib/open_telemetry_test.rbs b/temporalio/test/sig/contrib/open_telemetry_test.rbs index d7d32bd5..9043264d 100644 --- a/temporalio/test/sig/contrib/open_telemetry_test.rbs +++ b/temporalio/test/sig/contrib/open_telemetry_test.rbs @@ -2,11 +2,16 @@ module Contrib class OpenTelemetryTest < Test def init_tracer_and_exporter: -> [untyped, untyped] def trace: ( - ?tracer_and_exporter: [untyped, untyped] + ?tracer_and_exporter: [untyped, untyped], + ?always_create_workflow_spans: bool, + ?check_root: bool ) { (Temporalio::Client) -> void } -> untyped def trace_workflow: ( Symbol scenario, - ?tracer_and_exporter: [untyped, untyped] + ?tracer_and_exporter: [untyped, untyped], + ?start_with_untraced_client: bool, + ?always_create_workflow_spans: bool, + ?check_root: bool ) { (Temporalio::Client::WorkflowHandle) -> void } -> untyped class ExpectedSpan @@ -30,7 +35,8 @@ module Contrib name: String, ?attributes: Hash[untyped, untyped], ?links: Array[ExpectedSpan], - ?exception_message: String? + ?exception_message: String?, + ?insert_at: Integer? ) -> ExpectedSpan def to_s_indented: (?indent: String) -> String diff --git a/temporalio/test/sig/workflow_utils.rbs b/temporalio/test/sig/workflow_utils.rbs index 3b7144fa..42e2853d 100644 --- a/temporalio/test/sig/workflow_utils.rbs +++ b/temporalio/test/sig/workflow_utils.rbs @@ -17,7 +17,8 @@ module WorkflowUtils ?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum, ?max_heartbeat_throttle_interval: Float, ?task_timeout: duration?, - ?on_worker_run: Proc? + ?on_worker_run: Proc?, + ?start_workflow_client: Temporalio::Client ) -> Object? | [T] ( singleton(Temporalio::Workflow::Definition) workflow, @@ -37,7 +38,8 @@ module WorkflowUtils ?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum, ?max_heartbeat_throttle_interval: Float, ?task_timeout: duration?, - ?on_worker_run: Proc? + ?on_worker_run: Proc?, + ?start_workflow_client: Temporalio::Client ) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T def assert_eventually_task_fail: ( diff --git a/temporalio/test/workflow_utils.rb b/temporalio/test/workflow_utils.rb index ea0c914b..973e1ecf 100644 --- a/temporalio/test/workflow_utils.rb +++ b/temporalio/test/workflow_utils.rb @@ -29,7 +29,8 @@ def execute_workflow( max_heartbeat_throttle_interval: 60.0, task_timeout: nil, interceptors: [], - on_worker_run: nil + on_worker_run: nil, + start_workflow_client: client ) worker = Temporalio::Worker.new( client:, @@ -45,7 +46,7 @@ def execute_workflow( ) worker.run do on_worker_run&.call - handle = client.start_workflow( + handle = start_workflow_client.start_workflow( workflow, *args, id:,