Skip to content

Commit 479ab13

Browse files
brianstrauchclaude
andcommitted
Hold paused activities open until assertion observes paused=true
Replace the "treat gone as success" workaround with explicit synchronization between pause_and_assert and the test activities it pauses. The helper registers a threading.Event keyed by activity_id before calling pause_activity; heartbeat_activity and sync_heartbeat_activity wait on that event after catching their pause-induced cancellation. This keeps the activity in the pending list until the helper has observed paused=true, eliminating the race without weakening the assertion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fbeaeec commit 479ab13

2 files changed

Lines changed: 46 additions & 14 deletions

File tree

tests/helpers/__init__.py

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging.handlers
44
import queue
55
import socket
6+
import threading
67
import time
78
import uuid
89
from collections.abc import Awaitable, Callable, Iterator, Sequence
@@ -265,8 +266,35 @@ async def get_pending_activity_info(
265266
return None
266267

267268

269+
_wait_for_pause_events: dict[str, threading.Event] = {}
270+
271+
272+
def wait_for_pause_release(activity_id: str) -> None:
273+
"""Block a sync activity until pause_and_assert releases it.
274+
275+
No-op if no event is registered: pause_and_assert may finish (and clean up
276+
the event) before the activity catches its cancel, since the server has
277+
already committed paused=true by the time pause_activity returns.
278+
"""
279+
event = _wait_for_pause_events.get(activity_id)
280+
if event is not None:
281+
event.wait()
282+
283+
284+
async def async_wait_for_pause_release(activity_id: str) -> None:
285+
"""Block an async activity until pause_and_assert releases it. No-op if not registered."""
286+
event = _wait_for_pause_events.get(activity_id)
287+
if event is not None:
288+
await asyncio.get_running_loop().run_in_executor(None, event.wait)
289+
290+
268291
async def pause_and_assert(client: Client, handle: WorkflowHandle, activity_id: str):
269-
"""Pause the given activity and assert it becomes paused."""
292+
"""Pause the given activity and assert it becomes paused.
293+
294+
Registers an event before calling the pause API so cooperating test
295+
activities (those that catch the pause-induced cancel via
296+
wait_for_pause_release) hang until we have observed paused=true.
297+
"""
270298
desc = await handle.describe()
271299
req = PauseActivityRequest(
272300
namespace=client.namespace,
@@ -276,19 +304,19 @@ async def pause_and_assert(client: Client, handle: WorkflowHandle, activity_id:
276304
),
277305
id=activity_id,
278306
)
279-
await client.workflow_service.pause_activity(req)
280-
281-
# Assert eventually paused. If the activity catches the pause-induced
282-
# cancellation and returns, it leaves the pending list before we poll —
283-
# treat that as success since these test activities only stop running
284-
# because of the cancellation we triggered.
285-
async def check_paused() -> None:
286-
info = await get_pending_activity_info(handle, activity_id)
287-
if info is None:
288-
return
289-
assert info.paused, f"Activity {activity_id} not yet paused"
290-
291-
await assert_eventually(check_paused)
307+
308+
_wait_for_pause_events[activity_id] = threading.Event()
309+
try:
310+
await client.workflow_service.pause_activity(req)
311+
312+
async def check_paused() -> None:
313+
info = await assert_pending_activity_exists_eventually(handle, activity_id)
314+
assert info.paused, f"Activity {activity_id} not yet paused"
315+
316+
await assert_eventually(check_paused)
317+
finally:
318+
_wait_for_pause_events[activity_id].set()
319+
del _wait_for_pause_events[activity_id]
292320

293321

294322
async def unpause_and_assert(client: Client, handle: WorkflowHandle, activity_id: str):

tests/worker/test_workflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,14 @@
130130
assert_pending_activity_exists_eventually,
131131
assert_task_fail_eventually,
132132
assert_workflow_exists_eventually,
133+
async_wait_for_pause_release,
133134
ensure_search_attributes_present,
134135
find_free_port,
135136
get_pending_activity_info,
136137
new_worker,
137138
pause_and_assert,
138139
unpause_and_assert,
140+
wait_for_pause_release,
139141
workflow_update_exists,
140142
)
141143
from tests.helpers.cache_eviction import (
@@ -7782,6 +7784,7 @@ async def heartbeat_activity(
77827784
except (CancelledError, asyncio.CancelledError) as err:
77837785
if not catch_err:
77847786
raise err
7787+
await async_wait_for_pause_release(activity.info().activity_id)
77857788
return activity.cancellation_details()
77867789
finally:
77877790
activity.heartbeat("finally-complete")
@@ -7801,6 +7804,7 @@ def sync_heartbeat_activity(
78017804
except (CancelledError, asyncio.CancelledError) as err:
78027805
if not catch_err:
78037806
raise err
7807+
wait_for_pause_release(activity.info().activity_id)
78047808
return activity.cancellation_details()
78057809
finally:
78067810
activity.heartbeat("finally-complete")

0 commit comments

Comments
 (0)