Skip to content

Commit 631f689

Browse files
committed
feat(otel): remove span creation for saa operations that do not currently propagate tracing headers.
1 parent 8a6d0e0 commit 631f689

4 files changed

Lines changed: 28 additions & 147 deletions

File tree

temporalio/contrib/opentelemetry/_interceptor.py

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -355,46 +355,6 @@ async def start_activity(
355355
):
356356
return await super().start_activity(input)
357357

358-
async def cancel_activity(
359-
self, input: temporalio.client.CancelActivityInput
360-
) -> None:
361-
with self.root._start_as_current_span(
362-
"CancelActivity",
363-
attributes={"temporalActivityID": input.activity_id},
364-
kind=opentelemetry.trace.SpanKind.CLIENT,
365-
):
366-
return await super().cancel_activity(input)
367-
368-
async def terminate_activity(
369-
self, input: temporalio.client.TerminateActivityInput
370-
) -> None:
371-
with self.root._start_as_current_span(
372-
"TerminateActivity",
373-
attributes={"temporalActivityID": input.activity_id},
374-
kind=opentelemetry.trace.SpanKind.CLIENT,
375-
):
376-
return await super().terminate_activity(input)
377-
378-
async def describe_activity(
379-
self, input: temporalio.client.DescribeActivityInput
380-
) -> temporalio.client.ActivityExecutionDescription:
381-
with self.root._start_as_current_span(
382-
"DescribeActivity",
383-
attributes={"temporalActivityID": input.activity_id},
384-
kind=opentelemetry.trace.SpanKind.CLIENT,
385-
):
386-
return await super().describe_activity(input)
387-
388-
async def count_activities(
389-
self, input: temporalio.client.CountActivitiesInput
390-
) -> temporalio.client.ActivityExecutionCount:
391-
with self.root._start_as_current_span(
392-
"CountActivities",
393-
attributes={},
394-
kind=opentelemetry.trace.SpanKind.CLIENT,
395-
):
396-
return await super().count_activities(input)
397-
398358

399359
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
400360
def __init__(

temporalio/contrib/opentelemetry/_otel_interceptor.py

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -319,55 +319,6 @@ async def start_activity(
319319
input.headers = _context_to_headers(input.headers)
320320
return await super().start_activity(input)
321321

322-
async def cancel_activity(
323-
self, input: temporalio.client.CancelActivityInput
324-
) -> None:
325-
with _maybe_span(
326-
get_tracer(__name__),
327-
"CancelActivity",
328-
add_temporal_spans=self._add_temporal_spans,
329-
attributes={"temporalActivityID": input.activity_id},
330-
kind=opentelemetry.trace.SpanKind.CLIENT,
331-
):
332-
return await super().cancel_activity(input)
333-
334-
async def terminate_activity(
335-
self, input: temporalio.client.TerminateActivityInput
336-
) -> None:
337-
with _maybe_span(
338-
get_tracer(__name__),
339-
"TerminateActivity",
340-
add_temporal_spans=self._add_temporal_spans,
341-
attributes={"temporalActivityID": input.activity_id},
342-
kind=opentelemetry.trace.SpanKind.CLIENT,
343-
):
344-
return await super().terminate_activity(input)
345-
346-
async def describe_activity(
347-
self, input: temporalio.client.DescribeActivityInput
348-
) -> temporalio.client.ActivityExecutionDescription:
349-
with _maybe_span(
350-
get_tracer(__name__),
351-
"DescribeActivity",
352-
add_temporal_spans=self._add_temporal_spans,
353-
attributes={"temporalActivityID": input.activity_id},
354-
kind=opentelemetry.trace.SpanKind.CLIENT,
355-
):
356-
return await super().describe_activity(input)
357-
358-
async def count_activities(
359-
self, input: temporalio.client.CountActivitiesInput
360-
) -> temporalio.client.ActivityExecutionCount:
361-
with _maybe_span(
362-
get_tracer(__name__),
363-
"CountActivities",
364-
add_temporal_spans=self._add_temporal_spans,
365-
attributes={},
366-
kind=opentelemetry.trace.SpanKind.CLIENT,
367-
):
368-
return await super().count_activities(input)
369-
370-
371322
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
372323
def __init__(
373324
self,

tests/contrib/opentelemetry/test_opentelemetry.py

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,7 @@ async def test_opentelemetry_standalone_activity_tracing(
960960
client = Client(**client_config)
961961

962962
task_queue = f"task_queue_{uuid.uuid4()}"
963+
activity_id = f"activity_{uuid.uuid4()}"
963964
async with Worker(
964965
client,
965966
task_queue=task_queue,
@@ -968,44 +969,25 @@ async def test_opentelemetry_standalone_activity_tracing(
968969
handle = await client.start_activity(
969970
tracing_activity,
970971
TracingActivityParam(heartbeat=False),
971-
id=f"activity_{uuid.uuid4()}",
972+
id=activity_id,
972973
task_queue=task_queue,
973974
schedule_to_close_timeout=timedelta(seconds=10),
974975
)
975976
await handle.result()
976977

977-
# Use a queue with no worker so activities stay in SCHEDULED state,
978-
# allowing describe/cancel/terminate to be called without a race.
979-
no_worker_queue = f"task_queue_{uuid.uuid4()}"
980-
981-
cancel_handle = await client.start_activity(
982-
tracing_activity,
983-
TracingActivityParam(heartbeat=False),
984-
id=f"activity_{uuid.uuid4()}",
985-
task_queue=no_worker_queue,
986-
schedule_to_close_timeout=timedelta(seconds=30),
987-
)
988-
await cancel_handle.describe()
989-
await cancel_handle.cancel()
990-
991-
terminate_handle = await client.start_activity(
992-
tracing_activity,
993-
TracingActivityParam(heartbeat=False),
994-
id=f"activity_{uuid.uuid4()}",
995-
task_queue=no_worker_queue,
996-
schedule_to_close_timeout=timedelta(seconds=30),
997-
)
998-
await terminate_handle.terminate()
999-
1000-
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
978+
finished_spans = exporter.get_finished_spans()
979+
assert dump_spans(finished_spans, with_attributes=False) == [
1001980
"StartActivity:tracing_activity",
1002981
" RunActivity:tracing_activity",
1003-
"StartActivity:tracing_activity",
1004-
"DescribeActivity",
1005-
"CancelActivity",
1006-
"StartActivity:tracing_activity",
1007-
"TerminateActivity",
1008982
]
983+
start_activity_span = next(
984+
s for s in finished_spans if s.name == "StartActivity:tracing_activity"
985+
)
986+
assert start_activity_span.attributes is not None
987+
assert start_activity_span.attributes["temporalActivityID"] == activity_id
988+
assert (
989+
start_activity_span.attributes["temporalActivityType"] == "tracing_activity"
990+
)
1009991

1010992

1011993
def test_opentelemetry_safe_detach():

tests/contrib/opentelemetry/test_opentelemetry_plugin.py

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -585,49 +585,37 @@ async def test_otel_standalone_activity_tracing(
585585
new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)]
586586
new_client = Client(**new_config)
587587

588+
activity_id = f"activity_{uuid.uuid4()}"
588589
async with new_worker(
589590
new_client,
590591
activities=[simple_no_context_activity],
591592
) as worker:
592593
handle = await new_client.start_activity(
593594
simple_no_context_activity,
594-
id=f"activity_{uuid.uuid4()}",
595+
id=activity_id,
595596
task_queue=worker.task_queue,
596597
schedule_to_close_timeout=timedelta(seconds=10),
597598
)
598599
await handle.result()
599600

600-
# Use a queue with no worker so activities stay in SCHEDULED state,
601-
# allowing describe/cancel/terminate to be called without a race.
602-
no_worker_queue = f"task_queue_{uuid.uuid4()}"
603-
604-
cancel_handle = await new_client.start_activity(
605-
simple_no_context_activity,
606-
id=f"activity_{uuid.uuid4()}",
607-
task_queue=no_worker_queue,
608-
schedule_to_close_timeout=timedelta(seconds=30),
609-
)
610-
await cancel_handle.describe()
611-
await cancel_handle.cancel()
612-
613-
terminate_handle = await new_client.start_activity(
614-
simple_no_context_activity,
615-
id=f"activity_{uuid.uuid4()}",
616-
task_queue=no_worker_queue,
617-
schedule_to_close_timeout=timedelta(seconds=30),
618-
)
619-
await terminate_handle.terminate()
620-
621-
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
601+
finished_spans = exporter.get_finished_spans()
602+
assert dump_spans(finished_spans, with_attributes=False) == [
622603
"StartActivity:simple_no_context_activity",
623604
" RunActivity:simple_no_context_activity",
624605
" Activity",
625-
"StartActivity:simple_no_context_activity",
626-
"DescribeActivity",
627-
"CancelActivity",
628-
"StartActivity:simple_no_context_activity",
629-
"TerminateActivity",
630606
]
607+
start_activity_span = next(
608+
s
609+
for s in finished_spans
610+
if s.name == "StartActivity:simple_no_context_activity"
611+
and s.attributes is not None
612+
and s.attributes.get("temporalActivityID") == activity_id
613+
)
614+
assert start_activity_span.attributes is not None
615+
assert (
616+
start_activity_span.attributes["temporalActivityType"]
617+
== "simple_no_context_activity"
618+
)
631619

632620

633621
def test_replay_safe_span_delegates_extra_attributes():

0 commit comments

Comments
 (0)