Skip to content

Commit 2864297

Browse files
tconley1428cretz
andauthored
Add a workflow level check for the existence of a local activity (#857)
* Add a workflow level check for the existence of a local activity * Ruff linting * Fix lambda to check optional activity worker * Add a test for no activities * Move assert function to extern functions. Add local to name * Fix minor typo * Update temporalio/worker/_activity.py Co-authored-by: Chad Retz <[email protected]> * Change return type to none --------- Co-authored-by: Chad Retz <[email protected]>
1 parent 0e08ff8 commit 2864297

File tree

7 files changed

+86
-2
lines changed

7 files changed

+86
-2
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,7 @@ To remove restrictions around a particular block of code, use `with temporalio.w
991991
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.
992992

993993
To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining
994-
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
994+
it. This will run the entire workflow outside of the sandbox which means it can share global state and other bad
995995
things.
996996

997997
To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to

temporalio/worker/_activity.py

+11
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,17 @@ async def _run_activity(
532532
except Exception:
533533
temporalio.activity.logger.exception("Failed completing activity task")
534534

535+
def assert_activity_valid(self, activity) -> None:
536+
if self._dynamic_activity:
537+
return
538+
activity_def = self._activities.get(activity)
539+
if not activity_def:
540+
activity_names = ", ".join(sorted(self._activities.keys()))
541+
raise ValueError(
542+
f"Activity function {activity} "
543+
f"is not registered on this worker, available activities: {activity_names}",
544+
)
545+
535546

536547
@dataclass
537548
class _RunningActivity:

temporalio/worker/_replayer.py

+1
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ def on_eviction_hook(
216216
disable_eager_activity_execution=False,
217217
disable_safe_eviction=self._config["disable_safe_workflow_eviction"],
218218
should_enforce_versioning_behavior=False,
219+
assert_local_activity_valid=lambda a: None,
219220
)
220221
# Create bridge worker
221222
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(

temporalio/worker/_worker.py

+10
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,15 @@ def __init__(
400400
and deployment_config.default_versioning_behavior
401401
== temporalio.common.VersioningBehavior.UNSPECIFIED
402402
)
403+
404+
def check_activity(activity):
405+
if self._activity_worker is None:
406+
raise ValueError(
407+
f"Activity function {activity} "
408+
f"is not registered on this worker, no available activities.",
409+
)
410+
self._activity_worker.assert_activity_valid(activity)
411+
403412
self._workflow_worker = _WorkflowWorker(
404413
bridge_worker=lambda: self._bridge_worker,
405414
namespace=client.namespace,
@@ -418,6 +427,7 @@ def __init__(
418427
on_eviction_hook=None,
419428
disable_safe_eviction=disable_safe_workflow_eviction,
420429
should_enforce_versioning_behavior=should_enforce_versioning_behavior,
430+
assert_local_activity_valid=check_activity,
421431
)
422432

423433
if tuner is not None:

temporalio/worker/_workflow.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def __init__(
7878
],
7979
disable_safe_eviction: bool,
8080
should_enforce_versioning_behavior: bool,
81+
assert_local_activity_valid: Callable[[str], None],
8182
) -> None:
8283
self._bridge_worker = bridge_worker
8384
self._namespace = namespace
@@ -104,8 +105,12 @@ def __init__(
104105
if interceptor_class:
105106
self._interceptor_classes.append(interceptor_class)
106107
self._extern_functions.update(
107-
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter)
108+
**_WorkflowExternFunctions(
109+
__temporal_get_metric_meter=lambda: metric_meter,
110+
__temporal_assert_local_activity_valid=assert_local_activity_valid,
111+
)
108112
)
113+
109114
self._workflow_failure_exception_types = workflow_failure_exception_types
110115
self._running_workflows: Dict[str, _RunningWorkflow] = {}
111116
self._disable_eager_activity_execution = disable_eager_activity_execution

temporalio/worker/_workflow_instance.py

+5
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,10 @@ def workflow_start_local_activity(
13501350
else:
13511351
raise TypeError("Activity must be a string or callable")
13521352

1353+
cast(_WorkflowExternFunctions, self._extern_functions)[
1354+
"__temporal_assert_local_activity_valid"
1355+
](name)
1356+
13531357
return self._outbound.start_local_activity(
13541358
StartLocalActivityInput(
13551359
activity=name,
@@ -2859,6 +2863,7 @@ def _encode_search_attributes(
28592863

28602864
class _WorkflowExternFunctions(TypedDict):
28612865
__temporal_get_metric_meter: Callable[[], temporalio.common.MetricMeter]
2866+
__temporal_assert_local_activity_valid: Callable[[str], None]
28622867

28632868

28642869
class _ReplaySafeMetricMeter(temporalio.common.MetricMeter):

tests/worker/test_workflow.py

+52
Original file line numberDiff line numberDiff line change
@@ -7421,3 +7421,55 @@ async def test_workflow_dynamic_config_failure(client: Client):
74217421
await assert_task_fail_eventually(
74227422
handle, message_contains="Dynamic config failure"
74237423
)
7424+
7425+
7426+
async def test_workflow_missing_local_activity(client: Client):
7427+
async with new_worker(
7428+
client, SimpleLocalActivityWorkflow, activities=[custom_error_activity]
7429+
) as worker:
7430+
handle = await client.start_workflow(
7431+
SimpleLocalActivityWorkflow.run,
7432+
"Temporal",
7433+
id=f"workflow-{uuid.uuid4()}",
7434+
task_queue=worker.task_queue,
7435+
)
7436+
7437+
await assert_task_fail_eventually(
7438+
handle,
7439+
message_contains="Activity function say_hello is not registered on this worker, available activities: custom_error_activity",
7440+
)
7441+
7442+
7443+
async def test_workflow_missing_local_activity_but_dynamic(client: Client):
7444+
async with new_worker(
7445+
client,
7446+
SimpleLocalActivityWorkflow,
7447+
activities=[custom_error_activity, return_name_activity],
7448+
) as worker:
7449+
res = await client.execute_workflow(
7450+
SimpleLocalActivityWorkflow.run,
7451+
"Temporal",
7452+
id=f"workflow-{uuid.uuid4()}",
7453+
task_queue=worker.task_queue,
7454+
)
7455+
7456+
assert res == "say_hello"
7457+
7458+
7459+
async def test_workflow_missing_local_activity_no_activities(client: Client):
7460+
async with new_worker(
7461+
client,
7462+
SimpleLocalActivityWorkflow,
7463+
activities=[],
7464+
) as worker:
7465+
handle = await client.start_workflow(
7466+
SimpleLocalActivityWorkflow.run,
7467+
"Temporal",
7468+
id=f"workflow-{uuid.uuid4()}",
7469+
task_queue=worker.task_queue,
7470+
)
7471+
7472+
await assert_task_fail_eventually(
7473+
handle,
7474+
message_contains="Activity function say_hello is not registered on this worker, no available activities",
7475+
)

0 commit comments

Comments
 (0)