diff --git a/README.md b/README.md index 4c58f676..ccc1b6e5 100644 --- a/README.md +++ b/README.md @@ -991,7 +991,7 @@ To remove restrictions around a particular block of code, use `with temporalio.w The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied. To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining -it. This will run the entire workflow outside of the workflow which means it can share global state and other bad +it. This will run the entire workflow outside of the sandbox which means it can share global state and other bad things. To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 413dee13..3abe85ec 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -532,6 +532,17 @@ async def _run_activity( except Exception: temporalio.activity.logger.exception("Failed completing activity task") + def assert_activity_valid(self, activity) -> None: + if self._dynamic_activity: + return + activity_def = self._activities.get(activity) + if not activity_def: + activity_names = ", ".join(sorted(self._activities.keys())) + raise ValueError( + f"Activity function {activity} " + f"is not registered on this worker, available activities: {activity_names}", + ) + @dataclass class _RunningActivity: diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 49fce619..823a2881 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -216,6 +216,7 @@ def on_eviction_hook( disable_eager_activity_execution=False, disable_safe_eviction=self._config["disable_safe_workflow_eviction"], should_enforce_versioning_behavior=False, + assert_local_activity_valid=lambda a: None, ) # Create bridge worker bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index ae46e8c6..b061dfae 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -400,6 +400,15 @@ def __init__( and deployment_config.default_versioning_behavior == temporalio.common.VersioningBehavior.UNSPECIFIED ) + + def check_activity(activity): + if self._activity_worker is None: + raise ValueError( + f"Activity function {activity} " + f"is not registered on this worker, no available activities.", + ) + self._activity_worker.assert_activity_valid(activity) + self._workflow_worker = _WorkflowWorker( bridge_worker=lambda: self._bridge_worker, namespace=client.namespace, @@ -418,6 +427,7 @@ def __init__( on_eviction_hook=None, disable_safe_eviction=disable_safe_workflow_eviction, should_enforce_versioning_behavior=should_enforce_versioning_behavior, + assert_local_activity_valid=check_activity, ) if tuner is not None: diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 6606c6b6..ade0560f 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -78,6 +78,7 @@ def __init__( ], disable_safe_eviction: bool, should_enforce_versioning_behavior: bool, + assert_local_activity_valid: Callable[[str], None], ) -> None: self._bridge_worker = bridge_worker self._namespace = namespace @@ -104,8 +105,12 @@ def __init__( if interceptor_class: self._interceptor_classes.append(interceptor_class) self._extern_functions.update( - **_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter) + **_WorkflowExternFunctions( + __temporal_get_metric_meter=lambda: metric_meter, + __temporal_assert_local_activity_valid=assert_local_activity_valid, + ) ) + self._workflow_failure_exception_types = workflow_failure_exception_types self._running_workflows: Dict[str, _RunningWorkflow] = {} self._disable_eager_activity_execution = disable_eager_activity_execution diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 4c9e69dd..3d2c5ab6 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1350,6 +1350,10 @@ def workflow_start_local_activity( else: raise TypeError("Activity must be a string or callable") + cast(_WorkflowExternFunctions, self._extern_functions)[ + "__temporal_assert_local_activity_valid" + ](name) + return self._outbound.start_local_activity( StartLocalActivityInput( activity=name, @@ -2859,6 +2863,7 @@ def _encode_search_attributes( class _WorkflowExternFunctions(TypedDict): __temporal_get_metric_meter: Callable[[], temporalio.common.MetricMeter] + __temporal_assert_local_activity_valid: Callable[[str], None] class _ReplaySafeMetricMeter(temporalio.common.MetricMeter): diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 98e2bba2..7373f33b 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -7421,3 +7421,55 @@ async def test_workflow_dynamic_config_failure(client: Client): await assert_task_fail_eventually( handle, message_contains="Dynamic config failure" ) + + +async def test_workflow_missing_local_activity(client: Client): + async with new_worker( + client, SimpleLocalActivityWorkflow, activities=[custom_error_activity] + ) as worker: + handle = await client.start_workflow( + SimpleLocalActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + await assert_task_fail_eventually( + handle, + message_contains="Activity function say_hello is not registered on this worker, available activities: custom_error_activity", + ) + + +async def test_workflow_missing_local_activity_but_dynamic(client: Client): + async with new_worker( + client, + SimpleLocalActivityWorkflow, + activities=[custom_error_activity, return_name_activity], + ) as worker: + res = await client.execute_workflow( + SimpleLocalActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + assert res == "say_hello" + + +async def test_workflow_missing_local_activity_no_activities(client: Client): + async with new_worker( + client, + SimpleLocalActivityWorkflow, + activities=[], + ) as worker: + handle = await client.start_workflow( + SimpleLocalActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + await assert_task_fail_eventually( + handle, + message_contains="Activity function say_hello is not registered on this worker, no available activities", + )