Skip to content

Add a workflow level check for the existence of a local activity #857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ To remove restrictions around a particular block of code, use `with temporalio.w
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.

To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
it. This will run the entire workflow outside of the sandbox which means it can share global state and other bad
things.

To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
Expand Down
11 changes: 11 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,17 @@ async def _run_activity(
except Exception:
temporalio.activity.logger.exception("Failed completing activity task")

def assert_activity_valid(self, activity) -> None:
if self._dynamic_activity:
return
activity_def = self._activities.get(activity)
if not activity_def:
activity_names = ", ".join(sorted(self._activities.keys()))
raise ValueError(
f"Activity function {activity} "
f"is not registered on this worker, available activities: {activity_names}",
)


@dataclass
class _RunningActivity:
Expand Down
1 change: 1 addition & 0 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def on_eviction_hook(
disable_eager_activity_execution=False,
disable_safe_eviction=self._config["disable_safe_workflow_eviction"],
should_enforce_versioning_behavior=False,
assert_local_activity_valid=lambda a: None,
)
# Create bridge worker
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
Expand Down
10 changes: 10 additions & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ def __init__(
and deployment_config.default_versioning_behavior
== temporalio.common.VersioningBehavior.UNSPECIFIED
)

def check_activity(activity):
if self._activity_worker is None:
raise ValueError(
f"Activity function {activity} "
f"is not registered on this worker, no available activities.",
)
self._activity_worker.assert_activity_valid(activity)

self._workflow_worker = _WorkflowWorker(
bridge_worker=lambda: self._bridge_worker,
namespace=client.namespace,
Expand All @@ -418,6 +427,7 @@ def __init__(
on_eviction_hook=None,
disable_safe_eviction=disable_safe_workflow_eviction,
should_enforce_versioning_behavior=should_enforce_versioning_behavior,
assert_local_activity_valid=check_activity,
)

if tuner is not None:
Expand Down
7 changes: 6 additions & 1 deletion temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
],
disable_safe_eviction: bool,
should_enforce_versioning_behavior: bool,
assert_local_activity_valid: Callable[[str], None],
) -> None:
self._bridge_worker = bridge_worker
self._namespace = namespace
Expand All @@ -104,8 +105,12 @@ def __init__(
if interceptor_class:
self._interceptor_classes.append(interceptor_class)
self._extern_functions.update(
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter)
**_WorkflowExternFunctions(
__temporal_get_metric_meter=lambda: metric_meter,
__temporal_assert_local_activity_valid=assert_local_activity_valid,
)
)

self._workflow_failure_exception_types = workflow_failure_exception_types
self._running_workflows: Dict[str, _RunningWorkflow] = {}
self._disable_eager_activity_execution = disable_eager_activity_execution
Expand Down
5 changes: 5 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,10 @@ def workflow_start_local_activity(
else:
raise TypeError("Activity must be a string or callable")

cast(_WorkflowExternFunctions, self._extern_functions)[
"__temporal_assert_local_activity_valid"
](name)

return self._outbound.start_local_activity(
StartLocalActivityInput(
activity=name,
Expand Down Expand Up @@ -2859,6 +2863,7 @@ def _encode_search_attributes(

class _WorkflowExternFunctions(TypedDict):
__temporal_get_metric_meter: Callable[[], temporalio.common.MetricMeter]
__temporal_assert_local_activity_valid: Callable[[str], None]


class _ReplaySafeMetricMeter(temporalio.common.MetricMeter):
Expand Down
52 changes: 52 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7421,3 +7421,55 @@ async def test_workflow_dynamic_config_failure(client: Client):
await assert_task_fail_eventually(
handle, message_contains="Dynamic config failure"
)


async def test_workflow_missing_local_activity(client: Client):
async with new_worker(
client, SimpleLocalActivityWorkflow, activities=[custom_error_activity]
) as worker:
handle = await client.start_workflow(
SimpleLocalActivityWorkflow.run,
"Temporal",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

await assert_task_fail_eventually(
handle,
message_contains="Activity function say_hello is not registered on this worker, available activities: custom_error_activity",
)


async def test_workflow_missing_local_activity_but_dynamic(client: Client):
async with new_worker(
client,
SimpleLocalActivityWorkflow,
activities=[custom_error_activity, return_name_activity],
) as worker:
res = await client.execute_workflow(
SimpleLocalActivityWorkflow.run,
"Temporal",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

assert res == "say_hello"


async def test_workflow_missing_local_activity_no_activities(client: Client):
async with new_worker(
client,
SimpleLocalActivityWorkflow,
activities=[],
) as worker:
handle = await client.start_workflow(
SimpleLocalActivityWorkflow.run,
"Temporal",
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

await assert_task_fail_eventually(
handle,
message_contains="Activity function say_hello is not registered on this worker, no available activities",
)
Loading