Skip to content

Commit 5ff4e35

Browse files
authored
Add OTel tracing for standalone activities (#1471)
* feat(otel): add otel tracing for standalone activities for both legacy and new interceptors. * docs: update README section on testing to reflect current implementation. * style: remove unnecessary comments in tests.
1 parent 4b69cd4 commit 5ff4e35

6 files changed

Lines changed: 258 additions & 7 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
__pycache__
33
/build
44
/dist
5+
temporalio/bridge/libtemporal_sdk_bridge.dylib.dSYM/
56
temporalio/bridge/target/
67
temporalio/bridge/temporal_sdk_bridge*
78
/tests/helpers/golangserver/golangserver

README.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2059,28 +2059,33 @@ The environment is now ready to develop in.
20592059

20602060
#### Testing
20612061

2062-
To execute tests:
2062+
To execute tests (in parallel if possible):
20632063

20642064
```bash
20652065
poe test
20662066
```
20672067

2068-
`poe test` spreads tests across multiple worker processes by default. If you
2069-
need a serial run for debugging, invoke pytest directly:
2068+
To execute tests serially:
20702069

20712070
```bash
20722071
uv run pytest
20732072
```
20742073

2075-
This runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test
2076-
server, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running
2077-
server, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how
2078-
to run a single test with debug logs on the console:
2074+
To execute a single test:
20792075

20802076
```bash
20812077
poe test -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught
20822078
```
20832079

2080+
**Temporal Server**
2081+
2082+
- Tests that use the workflow test environment run against the [Temporal CLI dev server](https://docs.temporal.io/cli#start-dev-server).
2083+
- By default, workflow-environment tests automatically start a local dev server.
2084+
- On first run, the dev server binary may be downloaded so network access is required if no server is currently running.
2085+
- To run workflow-environment tests against the time-skipping test server, pass `--workflow-environment time-skipping`.
2086+
- To run workflow-environment tests against the `default` namespace of an already-running server, pass the `host:port` to `--workflow-environment`.
2087+
- Unit tests that do not use the workflow environment do not start a dev server.
2088+
20842089
#### Proto Generation and Testing
20852090

20862091
If you have docker available, run

temporalio/contrib/opentelemetry/_interceptor.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,60 @@ async def start_update_with_start_workflow(
341341

342342
return await super().start_update_with_start_workflow(input)
343343

344+
async def start_activity(
345+
self, input: temporalio.client.StartActivityInput
346+
) -> temporalio.client.ActivityHandle[Any]:
347+
with self.root._start_as_current_span(
348+
f"StartActivity:{input.activity_type}",
349+
attributes={
350+
"temporalActivityID": input.id,
351+
"temporalActivityType": input.activity_type,
352+
},
353+
input_with_headers=input,
354+
kind=opentelemetry.trace.SpanKind.CLIENT,
355+
):
356+
return await super().start_activity(input)
357+
358+
async def cancel_activity(
359+
self, input: temporalio.client.CancelActivityInput
360+
) -> None:
361+
with self.root._start_as_current_span(
362+
"CancelActivity",
363+
attributes={"temporalActivityID": input.activity_id},
364+
kind=opentelemetry.trace.SpanKind.CLIENT,
365+
):
366+
return await super().cancel_activity(input)
367+
368+
async def terminate_activity(
369+
self, input: temporalio.client.TerminateActivityInput
370+
) -> None:
371+
with self.root._start_as_current_span(
372+
"TerminateActivity",
373+
attributes={"temporalActivityID": input.activity_id},
374+
kind=opentelemetry.trace.SpanKind.CLIENT,
375+
):
376+
return await super().terminate_activity(input)
377+
378+
async def describe_activity(
379+
self, input: temporalio.client.DescribeActivityInput
380+
) -> temporalio.client.ActivityExecutionDescription:
381+
with self.root._start_as_current_span(
382+
"DescribeActivity",
383+
attributes={"temporalActivityID": input.activity_id},
384+
kind=opentelemetry.trace.SpanKind.CLIENT,
385+
):
386+
return await super().describe_activity(input)
387+
388+
async def count_activities(
389+
self, input: temporalio.client.CountActivitiesInput
390+
) -> temporalio.client.ActivityExecutionCount:
391+
with self.root._start_as_current_span(
392+
"CountActivities",
393+
attributes={},
394+
kind=opentelemetry.trace.SpanKind.CLIENT,
395+
):
396+
return await super().count_activities(input)
397+
344398

345399
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
346400
def __init__(

temporalio/contrib/opentelemetry/_otel_interceptor.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,70 @@ async def start_update_with_start_workflow(
303303
)
304304
return await super().start_update_with_start_workflow(input)
305305

306+
async def start_activity(
307+
self, input: temporalio.client.StartActivityInput
308+
) -> temporalio.client.ActivityHandle[Any]:
309+
with _maybe_span(
310+
get_tracer(__name__),
311+
f"StartActivity:{input.activity_type}",
312+
add_temporal_spans=self._add_temporal_spans,
313+
attributes={
314+
"temporalActivityID": input.id,
315+
"temporalActivityType": input.activity_type,
316+
},
317+
kind=opentelemetry.trace.SpanKind.CLIENT,
318+
):
319+
input.headers = _context_to_headers(input.headers)
320+
return await super().start_activity(input)
321+
322+
async def cancel_activity(
323+
self, input: temporalio.client.CancelActivityInput
324+
) -> None:
325+
with _maybe_span(
326+
get_tracer(__name__),
327+
"CancelActivity",
328+
add_temporal_spans=self._add_temporal_spans,
329+
attributes={"temporalActivityID": input.activity_id},
330+
kind=opentelemetry.trace.SpanKind.CLIENT,
331+
):
332+
return await super().cancel_activity(input)
333+
334+
async def terminate_activity(
335+
self, input: temporalio.client.TerminateActivityInput
336+
) -> None:
337+
with _maybe_span(
338+
get_tracer(__name__),
339+
"TerminateActivity",
340+
add_temporal_spans=self._add_temporal_spans,
341+
attributes={"temporalActivityID": input.activity_id},
342+
kind=opentelemetry.trace.SpanKind.CLIENT,
343+
):
344+
return await super().terminate_activity(input)
345+
346+
async def describe_activity(
347+
self, input: temporalio.client.DescribeActivityInput
348+
) -> temporalio.client.ActivityExecutionDescription:
349+
with _maybe_span(
350+
get_tracer(__name__),
351+
"DescribeActivity",
352+
add_temporal_spans=self._add_temporal_spans,
353+
attributes={"temporalActivityID": input.activity_id},
354+
kind=opentelemetry.trace.SpanKind.CLIENT,
355+
):
356+
return await super().describe_activity(input)
357+
358+
async def count_activities(
359+
self, input: temporalio.client.CountActivitiesInput
360+
) -> temporalio.client.ActivityExecutionCount:
361+
with _maybe_span(
362+
get_tracer(__name__),
363+
"CountActivities",
364+
add_temporal_spans=self._add_temporal_spans,
365+
attributes={},
366+
kind=opentelemetry.trace.SpanKind.CLIENT,
367+
):
368+
return await super().count_activities(input)
369+
306370

307371
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
308372
def __init__(

tests/contrib/opentelemetry/test_opentelemetry.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,70 @@ async def test_opentelemetry_interceptor_works_if_no_context(
944944
# * signal failure and wft failure from signal
945945

946946

947+
async def test_opentelemetry_standalone_activity_tracing(
948+
client: Client, env: WorkflowEnvironment
949+
):
950+
if env.supports_time_skipping:
951+
pytest.skip(
952+
"Java test server: https://github.com/temporalio/sdk-java/issues/2741"
953+
)
954+
exporter = InMemorySpanExporter()
955+
provider = TracerProvider()
956+
provider.add_span_processor(SimpleSpanProcessor(exporter))
957+
tracer = get_tracer(__name__, tracer_provider=provider)
958+
client_config = client.config()
959+
client_config["interceptors"] = [TracingInterceptor(tracer)]
960+
client = Client(**client_config)
961+
962+
task_queue = f"task_queue_{uuid.uuid4()}"
963+
async with Worker(
964+
client,
965+
task_queue=task_queue,
966+
activities=[tracing_activity],
967+
):
968+
handle = await client.start_activity(
969+
tracing_activity,
970+
TracingActivityParam(heartbeat=False),
971+
id=f"activity_{uuid.uuid4()}",
972+
task_queue=task_queue,
973+
schedule_to_close_timeout=timedelta(seconds=10),
974+
)
975+
await handle.result()
976+
977+
# Use a queue with no worker so activities stay in SCHEDULED state,
978+
# allowing describe/cancel/terminate to be called without a race.
979+
no_worker_queue = f"task_queue_{uuid.uuid4()}"
980+
981+
cancel_handle = await client.start_activity(
982+
tracing_activity,
983+
TracingActivityParam(heartbeat=False),
984+
id=f"activity_{uuid.uuid4()}",
985+
task_queue=no_worker_queue,
986+
schedule_to_close_timeout=timedelta(seconds=30),
987+
)
988+
await cancel_handle.describe()
989+
await cancel_handle.cancel()
990+
991+
terminate_handle = await client.start_activity(
992+
tracing_activity,
993+
TracingActivityParam(heartbeat=False),
994+
id=f"activity_{uuid.uuid4()}",
995+
task_queue=no_worker_queue,
996+
schedule_to_close_timeout=timedelta(seconds=30),
997+
)
998+
await terminate_handle.terminate()
999+
1000+
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
1001+
"StartActivity:tracing_activity",
1002+
" RunActivity:tracing_activity",
1003+
"StartActivity:tracing_activity",
1004+
"DescribeActivity",
1005+
"CancelActivity",
1006+
"StartActivity:tracing_activity",
1007+
"TerminateActivity",
1008+
]
1009+
1010+
9471011
def test_opentelemetry_safe_detach():
9481012
class _fake_self:
9491013
def _load_workflow_context_carrier(*_args):

tests/contrib/opentelemetry/test_opentelemetry_plugin.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,69 @@ async def test_otel_tracing_workflow_failure(
567567
), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}"
568568

569569

570+
async def test_otel_standalone_activity_tracing(
571+
client: Client,
572+
env: WorkflowEnvironment,
573+
reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter]
574+
):
575+
if env.supports_time_skipping:
576+
pytest.skip(
577+
"Java test server: https://github.com/temporalio/sdk-java/issues/2741"
578+
)
579+
exporter = InMemorySpanExporter()
580+
provider = create_tracer_provider()
581+
provider.add_span_processor(SimpleSpanProcessor(exporter))
582+
opentelemetry.trace.set_tracer_provider(provider)
583+
584+
new_config = client.config()
585+
new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)]
586+
new_client = Client(**new_config)
587+
588+
async with new_worker(
589+
new_client,
590+
activities=[simple_no_context_activity],
591+
) as worker:
592+
handle = await new_client.start_activity(
593+
simple_no_context_activity,
594+
id=f"activity_{uuid.uuid4()}",
595+
task_queue=worker.task_queue,
596+
schedule_to_close_timeout=timedelta(seconds=10),
597+
)
598+
await handle.result()
599+
600+
# Use a queue with no worker so activities stay in SCHEDULED state,
601+
# allowing describe/cancel/terminate to be called without a race.
602+
no_worker_queue = f"task_queue_{uuid.uuid4()}"
603+
604+
cancel_handle = await new_client.start_activity(
605+
simple_no_context_activity,
606+
id=f"activity_{uuid.uuid4()}",
607+
task_queue=no_worker_queue,
608+
schedule_to_close_timeout=timedelta(seconds=30),
609+
)
610+
await cancel_handle.describe()
611+
await cancel_handle.cancel()
612+
613+
terminate_handle = await new_client.start_activity(
614+
simple_no_context_activity,
615+
id=f"activity_{uuid.uuid4()}",
616+
task_queue=no_worker_queue,
617+
schedule_to_close_timeout=timedelta(seconds=30),
618+
)
619+
await terminate_handle.terminate()
620+
621+
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
622+
"StartActivity:simple_no_context_activity",
623+
" RunActivity:simple_no_context_activity",
624+
" Activity",
625+
"StartActivity:simple_no_context_activity",
626+
"DescribeActivity",
627+
"CancelActivity",
628+
"StartActivity:simple_no_context_activity",
629+
"TerminateActivity",
630+
]
631+
632+
570633
def test_replay_safe_span_delegates_extra_attributes():
571634
"""Test that _ReplaySafeSpan delegates attribute access to the underlying span.
572635

0 commit comments

Comments
 (0)