Skip to content

Commit de948fe

Browse files
Merge branch 'main' into amazzeo/sano
2 parents 3ccf191 + c4f1371 commit de948fe

8 files changed

Lines changed: 41 additions & 154 deletions

File tree

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,6 +2103,11 @@ tests.
21032103

21042104
### Style
21052105

2106+
```
2107+
# runs ruff + cargo fmt
2108+
poe format
2109+
```
2110+
21062111
* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
21072112
* We use [ruff](https://docs.astral.sh/ruff/) for formatting, so that takes precedence
21082113
* In tests and example code, can import individual classes/functions to make it more readable. Can also do this for

temporalio/contrib/opentelemetry/_interceptor.py

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -355,46 +355,6 @@ async def start_activity(
355355
):
356356
return await super().start_activity(input)
357357

358-
async def cancel_activity(
359-
self, input: temporalio.client.CancelActivityInput
360-
) -> None:
361-
with self.root._start_as_current_span(
362-
"CancelActivity",
363-
attributes={"temporalActivityID": input.activity_id},
364-
kind=opentelemetry.trace.SpanKind.CLIENT,
365-
):
366-
return await super().cancel_activity(input)
367-
368-
async def terminate_activity(
369-
self, input: temporalio.client.TerminateActivityInput
370-
) -> None:
371-
with self.root._start_as_current_span(
372-
"TerminateActivity",
373-
attributes={"temporalActivityID": input.activity_id},
374-
kind=opentelemetry.trace.SpanKind.CLIENT,
375-
):
376-
return await super().terminate_activity(input)
377-
378-
async def describe_activity(
379-
self, input: temporalio.client.DescribeActivityInput
380-
) -> temporalio.client.ActivityExecutionDescription:
381-
with self.root._start_as_current_span(
382-
"DescribeActivity",
383-
attributes={"temporalActivityID": input.activity_id},
384-
kind=opentelemetry.trace.SpanKind.CLIENT,
385-
):
386-
return await super().describe_activity(input)
387-
388-
async def count_activities(
389-
self, input: temporalio.client.CountActivitiesInput
390-
) -> temporalio.client.ActivityExecutionCount:
391-
with self.root._start_as_current_span(
392-
"CountActivities",
393-
attributes={},
394-
kind=opentelemetry.trace.SpanKind.CLIENT,
395-
):
396-
return await super().count_activities(input)
397-
398358

399359
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
400360
def __init__(

temporalio/contrib/opentelemetry/_otel_interceptor.py

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -319,54 +319,6 @@ async def start_activity(
319319
input.headers = _context_to_headers(input.headers)
320320
return await super().start_activity(input)
321321

322-
async def cancel_activity(
323-
self, input: temporalio.client.CancelActivityInput
324-
) -> None:
325-
with _maybe_span(
326-
get_tracer(__name__),
327-
"CancelActivity",
328-
add_temporal_spans=self._add_temporal_spans,
329-
attributes={"temporalActivityID": input.activity_id},
330-
kind=opentelemetry.trace.SpanKind.CLIENT,
331-
):
332-
return await super().cancel_activity(input)
333-
334-
async def terminate_activity(
335-
self, input: temporalio.client.TerminateActivityInput
336-
) -> None:
337-
with _maybe_span(
338-
get_tracer(__name__),
339-
"TerminateActivity",
340-
add_temporal_spans=self._add_temporal_spans,
341-
attributes={"temporalActivityID": input.activity_id},
342-
kind=opentelemetry.trace.SpanKind.CLIENT,
343-
):
344-
return await super().terminate_activity(input)
345-
346-
async def describe_activity(
347-
self, input: temporalio.client.DescribeActivityInput
348-
) -> temporalio.client.ActivityExecutionDescription:
349-
with _maybe_span(
350-
get_tracer(__name__),
351-
"DescribeActivity",
352-
add_temporal_spans=self._add_temporal_spans,
353-
attributes={"temporalActivityID": input.activity_id},
354-
kind=opentelemetry.trace.SpanKind.CLIENT,
355-
):
356-
return await super().describe_activity(input)
357-
358-
async def count_activities(
359-
self, input: temporalio.client.CountActivitiesInput
360-
) -> temporalio.client.ActivityExecutionCount:
361-
with _maybe_span(
362-
get_tracer(__name__),
363-
"CountActivities",
364-
add_temporal_spans=self._add_temporal_spans,
365-
attributes={},
366-
kind=opentelemetry.trace.SpanKind.CLIENT,
367-
):
368-
return await super().count_activities(input)
369-
370322

371323
class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
372324
def __init__(

tests/contrib/langgraph/e2e_functional_entrypoints.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,13 @@ async def interrupt_entrypoint(value: str) -> dict:
133133

134134

135135
@task
136-
async def slow_task(x: int) -> int:
137-
await asyncio.sleep(1)
136+
async def waiting_task(x: int) -> int:
137+
# Wait (forever) until start_to_close_timeout or worker shutdown cancellation
138+
await asyncio.Event().wait()
138139
return x
139140

140141

141142
@entrypoint()
142143
async def slow_entrypoint(value: int) -> dict:
143-
result = await slow_task(value)
144+
result = await waiting_task(value)
144145
return {"result": result}

tests/contrib/langgraph/test_e2e_functional.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@
4646
reset_task_execution_counts,
4747
simple_functional_entrypoint,
4848
slow_entrypoint,
49-
slow_task,
5049
step_1,
5150
step_2,
5251
step_3,
5352
step_4,
5453
step_5,
54+
waiting_task,
5555
)
5656
from tests.contrib.langgraph.e2e_functional_workflows import (
5757
ContinueAsNewFunctionalWorkflow,
@@ -316,10 +316,10 @@ async def test_per_task_activity_options_override(self, client: Client) -> None:
316316
plugins=[
317317
LangGraphPlugin(
318318
entrypoints={"e2e_slow_functional": slow_entrypoint},
319-
tasks=[slow_task],
319+
tasks=[waiting_task],
320320
default_activity_options=_DEFAULT_ACTIVITY_OPTIONS,
321321
activity_options={
322-
"slow_task": {
322+
"waiting_task": {
323323
"execute_in": "activity",
324324
"start_to_close_timeout": timedelta(milliseconds=100),
325325
"retry_policy": RetryPolicy(maximum_attempts=1),

tests/contrib/langgraph/test_timeout.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from asyncio import sleep
1+
import asyncio
22
from datetime import timedelta
33
from typing import Any
44
from uuid import uuid4
@@ -19,7 +19,8 @@ class State(TypedDict):
1919

2020

2121
async def node(state: State) -> dict[str, str]: # pyright: ignore[reportUnusedParameter]
22-
await sleep(1) # 1 second
22+
# Wait (forever) until start_to_close_timeout or worker shutdown cancellation
23+
await asyncio.Event().wait()
2324
return {"value": "done"}
2425

2526

tests/contrib/opentelemetry/test_opentelemetry.py

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,7 @@ async def test_opentelemetry_standalone_activity_tracing(
960960
client = Client(**client_config)
961961

962962
task_queue = f"task_queue_{uuid.uuid4()}"
963+
activity_id = f"activity_{uuid.uuid4()}"
963964
async with Worker(
964965
client,
965966
task_queue=task_queue,
@@ -968,44 +969,23 @@ async def test_opentelemetry_standalone_activity_tracing(
968969
handle = await client.start_activity(
969970
tracing_activity,
970971
TracingActivityParam(heartbeat=False),
971-
id=f"activity_{uuid.uuid4()}",
972+
id=activity_id,
972973
task_queue=task_queue,
973974
schedule_to_close_timeout=timedelta(seconds=10),
974975
)
975976
await handle.result()
976977

977-
# Use a queue with no worker so activities stay in SCHEDULED state,
978-
# allowing describe/cancel/terminate to be called without a race.
979-
no_worker_queue = f"task_queue_{uuid.uuid4()}"
980-
981-
cancel_handle = await client.start_activity(
982-
tracing_activity,
983-
TracingActivityParam(heartbeat=False),
984-
id=f"activity_{uuid.uuid4()}",
985-
task_queue=no_worker_queue,
986-
schedule_to_close_timeout=timedelta(seconds=30),
987-
)
988-
await cancel_handle.describe()
989-
await cancel_handle.cancel()
990-
991-
terminate_handle = await client.start_activity(
992-
tracing_activity,
993-
TracingActivityParam(heartbeat=False),
994-
id=f"activity_{uuid.uuid4()}",
995-
task_queue=no_worker_queue,
996-
schedule_to_close_timeout=timedelta(seconds=30),
997-
)
998-
await terminate_handle.terminate()
999-
1000-
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
978+
finished_spans = exporter.get_finished_spans()
979+
assert dump_spans(finished_spans, with_attributes=False) == [
1001980
"StartActivity:tracing_activity",
1002981
" RunActivity:tracing_activity",
1003-
"StartActivity:tracing_activity",
1004-
"DescribeActivity",
1005-
"CancelActivity",
1006-
"StartActivity:tracing_activity",
1007-
"TerminateActivity",
1008982
]
983+
start_activity_span = next(
984+
s for s in finished_spans if s.name == "StartActivity:tracing_activity"
985+
)
986+
assert start_activity_span.attributes is not None
987+
assert start_activity_span.attributes["temporalActivityID"] == activity_id
988+
assert start_activity_span.attributes["temporalActivityType"] == "tracing_activity"
1009989

1010990

1011991
def test_opentelemetry_safe_detach():

tests/contrib/opentelemetry/test_opentelemetry_plugin.py

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -585,49 +585,37 @@ async def test_otel_standalone_activity_tracing(
585585
new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)]
586586
new_client = Client(**new_config)
587587

588+
activity_id = f"activity_{uuid.uuid4()}"
588589
async with new_worker(
589590
new_client,
590591
activities=[simple_no_context_activity],
591592
) as worker:
592593
handle = await new_client.start_activity(
593594
simple_no_context_activity,
594-
id=f"activity_{uuid.uuid4()}",
595+
id=activity_id,
595596
task_queue=worker.task_queue,
596597
schedule_to_close_timeout=timedelta(seconds=10),
597598
)
598599
await handle.result()
599600

600-
# Use a queue with no worker so activities stay in SCHEDULED state,
601-
# allowing describe/cancel/terminate to be called without a race.
602-
no_worker_queue = f"task_queue_{uuid.uuid4()}"
603-
604-
cancel_handle = await new_client.start_activity(
605-
simple_no_context_activity,
606-
id=f"activity_{uuid.uuid4()}",
607-
task_queue=no_worker_queue,
608-
schedule_to_close_timeout=timedelta(seconds=30),
609-
)
610-
await cancel_handle.describe()
611-
await cancel_handle.cancel()
612-
613-
terminate_handle = await new_client.start_activity(
614-
simple_no_context_activity,
615-
id=f"activity_{uuid.uuid4()}",
616-
task_queue=no_worker_queue,
617-
schedule_to_close_timeout=timedelta(seconds=30),
618-
)
619-
await terminate_handle.terminate()
620-
621-
assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
601+
finished_spans = exporter.get_finished_spans()
602+
assert dump_spans(finished_spans, with_attributes=False) == [
622603
"StartActivity:simple_no_context_activity",
623604
" RunActivity:simple_no_context_activity",
624605
" Activity",
625-
"StartActivity:simple_no_context_activity",
626-
"DescribeActivity",
627-
"CancelActivity",
628-
"StartActivity:simple_no_context_activity",
629-
"TerminateActivity",
630606
]
607+
start_activity_span = next(
608+
s
609+
for s in finished_spans
610+
if s.name == "StartActivity:simple_no_context_activity"
611+
and s.attributes is not None
612+
and s.attributes.get("temporalActivityID") == activity_id
613+
)
614+
assert start_activity_span.attributes is not None
615+
assert (
616+
start_activity_span.attributes["temporalActivityType"]
617+
== "simple_no_context_activity"
618+
)
631619

632620

633621
def test_replay_safe_span_delegates_extra_attributes():

0 commit comments

Comments
 (0)