Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class TracingInterceptor
# @param tracer [OpenTelemetry::Trace::Tracer] Tracer to use.
# @param header_key [String] Temporal header name to serialize spans to/from. Most users should not change this.
# @param propagator [Object] Propagator to use. Most users should not change this.
# @param always_create_workflow_spans [Boolean] When false, the default, spans are only created in workflows
# when an overarching span from the client is present. In cases of starting a workflow elsewhere, e.g. CLI or
# schedules, a client-created span is not present and workflow spans will not be created. Setting this to true
# will create spans in workflows no matter what, but there is a risk of them being orphans since they may not
# have a parent span after replaying.
def initialize(
tracer,
header_key: '_tracer-data',
Expand All @@ -31,11 +36,13 @@ def initialize(
::OpenTelemetry::Trace::Propagation::TraceContext::TextMapPropagator.new,
::OpenTelemetry::Baggage::Propagation::TextMapPropagator.new
]
)
),
always_create_workflow_spans: false
)
@tracer = tracer
@header_key = header_key
@propagator = propagator
@always_create_workflow_spans = always_create_workflow_spans
end

# @!visibility private
Expand Down Expand Up @@ -85,6 +92,11 @@ def _with_started_span(
end
end

# @!visibility private
def _always_create_workflow_spans
@always_create_workflow_spans
end

# @!visibility private
class ClientOutbound < Client::Interceptor::Outbound
def initialize(root, next_interceptor)
Expand Down Expand Up @@ -423,30 +435,34 @@ def self.completed_span(
even_during_replay: false
)
# Get root interceptor, which also checks if in workflow
root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor]
root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] #: TracingInterceptor?
raise 'Tracing interceptor not configured' unless root

# Do nothing if replaying and not wanted during replay
return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying?

# Do nothing if there is no span on the context. We do not want orphan spans coming from workflows, so we
# require a parent (i.e. a current).
# TODO(cretz): This matches Python behavior but not .NET behavior (which will create no matter what), is that
# ok?
return nil if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID
# If there is no span on the context and the user hasn't opted in to always creating, do not create. This
# prevents orphans if there was no span originally created from the client start-workflow call.
if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID &&
!root._always_create_workflow_spans
return nil
end

# Create attributes, adding user-defined ones
attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id,
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)

# Create span
time = Temporalio::Workflow.now
timestamp = (time.to_i * 1_000_000_000) + time.nsec
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: timestamp)
# Create span, which has to be done with illegal call disabling because OTel asks for full exception message
# which uses error highlighting and such which accesses File#path
Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do
time = Temporalio::Workflow.now
timestamp = (time.to_i * 1_000_000_000) + time.nsec
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: timestamp)
end
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion temporalio/sig/temporalio/contrib/open_telemetry.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module Temporalio
def initialize: (
untyped tracer,
?header_key: String,
?propagator: untyped
?propagator: untyped,
?always_create_workflow_spans: bool
) -> void

def _apply_context_to_headers: (Hash[String, untyped] headers, ?context: untyped) -> void
Expand All @@ -23,6 +24,7 @@ module Temporalio
?attributes: Hash[untyped, untyped]?,
?outbound_input: untyped
) { () -> T } -> T
def _always_create_workflow_spans: -> bool

class WorkflowInbound < Worker::Interceptor::Workflow::Inbound
def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void
Expand Down
160 changes: 112 additions & 48 deletions temporalio/test/contrib/open_telemetry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ def update(scenario)
) do
Temporalio::Workflow.execute_local_activity(TestActivity, :fail_first_attempt, start_to_close_timeout: 30)
end
when :child_workflow
# Start a child, send child signal and external signal, finish
when :child_workflow_child_signal
handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal)
handle.signal(TestWorkflow.signal, :mark_finished)
[handle.id, handle.first_execution_run_id, handle.result]
when :child_workflow_external_signal
handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal)
handle.signal(TestWorkflow.signal, :complete)
Temporalio::Workflow.external_workflow_handle(handle.id).signal(TestWorkflow.signal, :mark_finished)
[handle.id, handle.first_execution_run_id, handle.result]
else
Expand Down Expand Up @@ -138,11 +140,18 @@ def init_tracer_and_exporter
[tracer, exporter]
end

def trace(tracer_and_exporter: init_tracer_and_exporter, &)
def trace(
tracer_and_exporter: init_tracer_and_exporter,
always_create_workflow_spans: false,
check_root: true,
&
)
tracer, exporter = tracer_and_exporter

# Make client with interceptors
interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(tracer)
interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(
tracer, always_create_workflow_spans:
)
new_options = env.client.options.with(interceptors: [interceptor])
client = Temporalio::Client.new(**new_options.to_h) # steep:ignore

Expand All @@ -153,13 +162,22 @@ def trace(tracer_and_exporter: init_tracer_and_exporter, &)

# Convert spans, confirm there is only the outer, and return children
spans = ExpectedSpan.from_span_data(exporter.finished_spans)
assert_equal 1, spans.size
assert_equal 'root', spans.first&.name
if check_root
assert_equal 1, spans.size
assert_equal 'root', spans.first&.name
end
spans.first
end

def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &)
trace(tracer_and_exporter:) do |client|
def trace_workflow(
scenario,
tracer_and_exporter: init_tracer_and_exporter,
start_with_untraced_client: false,
always_create_workflow_spans: false,
check_root: true,
&
)
trace(tracer_and_exporter:, always_create_workflow_spans:, check_root:) do |client|
# Must capture and attach outer context
outer_context = OpenTelemetry::Context.current
attach_token = nil
Expand All @@ -169,7 +187,8 @@ def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &)
client:,
activities: [TestActivity.new(tracer_and_exporter.first)],
# Have to reattach outer context inside worker run to check outer span
on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) }
on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) },
start_workflow_client: start_with_untraced_client ? env.client : client
) do |handle|
yield handle
ensure
Expand Down Expand Up @@ -398,42 +417,61 @@ def test_client_fail
end

def test_child_and_external
exp_root = ExpectedSpan.new(name: 'root')
act_root = trace_workflow(:wait_on_signal) do |handle|
exp_cl_attrs = { 'temporalWorkflowID' => handle.id }
exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id })
exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs)
exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs)

# Wait for task completion so update isn't accidentally first before run
assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) }

# Update calls child and sends signals to it in two ways
child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update,
:child_workflow, id: 'my-update-id')
exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update',
attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }))
# Expected span for update
exp_hnd_update = exp_start_wf.add_child(
name: 'HandleUpdate:update',
attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }),
links: [exp_update]
)
# Expected for children
exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id }
exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs)
exp_child_start
.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
# Two signals we send to the child
exp_sig_child = exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs)
exp_sig_ext = exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs)
exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_child])
exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_ext])

assert_equal 'workflow-done', child_result
# We have to test child signal and external signal separately because sending both back-to-back can result in
# rare cases where one is delivered before the other (yes, even if you wait on the first to get an initiated
# event)
%i[child_workflow_child_signal child_workflow_external_signal].each do |scenario|
exp_root = ExpectedSpan.new(name: 'root')
act_root = trace_workflow(:wait_on_signal) do |handle|
exp_cl_attrs = { 'temporalWorkflowID' => handle.id }
exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id })
exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs)
exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs)

# Wait for task completion so update isn't accidentally first before run
assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) }

# Update calls child and sends signals to it in two ways
child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update,
scenario, id: 'my-update-id')
exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update',
attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }))
# Expected span for update
exp_hnd_update = exp_start_wf.add_child(
name: 'HandleUpdate:update',
attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }),
links: [exp_update]
)
# Expected for children
exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id }
exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs)
exp_child_start
.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs)

# There are cases where signal comes _before_ start and cases where signal _comes_ after and server gives us
# no way of knowing that a child _actually_ began running, so we check whether task completed comes before
# signal
assert_equal 'workflow-done', child_result
child_events = env.client.workflow_handle(child_id.to_s).fetch_history_events.to_a
signal_comes_first = child_events.index(&:workflow_execution_signaled_event_attributes).to_i <
child_events.index(&:workflow_task_completed_event_attributes).to_i
# Signal we send to the child
exp_sig = if scenario == :child_workflow_child_signal
exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs)
else
exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs)
end
exp_child_start.add_child(
name: 'HandleSignal:signal',
attributes: exp_child_run_attrs,
links: [exp_sig],
insert_at: signal_comes_first ? 0 : 1
)
end
assert_equal exp_root.to_s_indented, act_root.to_s_indented,
"Expected:\n#{exp_root.to_s_indented}\nActual:#{act_root.to_s_indented}"
end
assert_equal exp_root.to_s_indented, act_root.to_s_indented
end

def test_continue_as_new
Expand All @@ -458,6 +496,29 @@ def test_continue_as_new
assert_equal exp_root.to_s_indented, act_root.to_s_indented
end

def test_always_create_workflow_spans
# Untraced client has no spans by default
act = trace_workflow(:complete, start_with_untraced_client: true, check_root: false) do |handle|
assert_equal 'workflow-done', handle.result
end
assert_empty act.children

# Untraced client has no spans by default
exp_root = ExpectedSpan.new(name: 'root')
act = trace_workflow(
:complete,
start_with_untraced_client: true,
always_create_workflow_spans: true,
check_root: false
) do |handle|
exp_attrs = { 'temporalWorkflowID' => handle.id, 'temporalRunID' => handle.result_run_id }
exp_run_wf = exp_root.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_attrs)
exp_run_wf.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_attrs)
assert_equal 'workflow-done', handle.result
end
assert_equal exp_root.children.first&.to_s_indented, act.to_s_indented
end

ExpectedSpan = Data.define(:name, :children, :attributes, :links, :exception_message) # rubocop:disable Layout/ClassStructure

class ExpectedSpan
Expand Down Expand Up @@ -493,13 +554,16 @@ def self.from_span_data(all_spans)
end

def initialize(name:, children: [], attributes: {}, links: [], exception_message: nil)
children = children.to_set
super
end

def add_child(name:, attributes: {}, links: [], exception_message: nil)
def add_child(name:, attributes: {}, links: [], exception_message: nil, insert_at: nil)
span = ExpectedSpan.new(name:, attributes:, links:, exception_message:)
children << span
if insert_at.nil?
children << span
else
children.insert(insert_at, span)
end
span
end

Expand Down
12 changes: 9 additions & 3 deletions temporalio/test/sig/contrib/open_telemetry_test.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ module Contrib
class OpenTelemetryTest < Test
def init_tracer_and_exporter: -> [untyped, untyped]
def trace: (
?tracer_and_exporter: [untyped, untyped]
?tracer_and_exporter: [untyped, untyped],
?always_create_workflow_spans: bool,
?check_root: bool
) { (Temporalio::Client) -> void } -> untyped
def trace_workflow: (
Symbol scenario,
?tracer_and_exporter: [untyped, untyped]
?tracer_and_exporter: [untyped, untyped],
?start_with_untraced_client: bool,
?always_create_workflow_spans: bool,
?check_root: bool
) { (Temporalio::Client::WorkflowHandle) -> void } -> untyped

class ExpectedSpan
Expand All @@ -30,7 +35,8 @@ module Contrib
name: String,
?attributes: Hash[untyped, untyped],
?links: Array[ExpectedSpan],
?exception_message: String?
?exception_message: String?,
?insert_at: Integer?
) -> ExpectedSpan

def to_s_indented: (?indent: String) -> String
Expand Down
6 changes: 4 additions & 2 deletions temporalio/test/sig/workflow_utils.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ module WorkflowUtils
?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum,
?max_heartbeat_throttle_interval: Float,
?task_timeout: duration?,
?on_worker_run: Proc?
?on_worker_run: Proc?,
?start_workflow_client: Temporalio::Client
) -> Object? |
[T] (
singleton(Temporalio::Workflow::Definition) workflow,
Expand All @@ -37,7 +38,8 @@ module WorkflowUtils
?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum,
?max_heartbeat_throttle_interval: Float,
?task_timeout: duration?,
?on_worker_run: Proc?
?on_worker_run: Proc?,
?start_workflow_client: Temporalio::Client
) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T

def assert_eventually_task_fail: (
Expand Down
5 changes: 3 additions & 2 deletions temporalio/test/workflow_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def execute_workflow(
max_heartbeat_throttle_interval: 60.0,
task_timeout: nil,
interceptors: [],
on_worker_run: nil
on_worker_run: nil,
start_workflow_client: client
)
worker = Temporalio::Worker.new(
client:,
Expand All @@ -45,7 +46,7 @@ def execute_workflow(
)
worker.run do
on_worker_run&.call
handle = client.start_workflow(
handle = start_workflow_client.start_workflow(
workflow,
*args,
id:,
Expand Down
Loading