Skip to content

Commit 4df348a

Browse files
Demonstrate workflow.all_handlers_finished (#139)
* Demonstrate workflow.all_handlers_finished * Document all_handlers_finished in the README
1 parent ccf0945 commit 4df348a

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

Diff for: updates_and_signals/safe_message_handlers/README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ This sample shows off important techniques for handling signals and updates, aka
44

55
* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
66
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
7-
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
8-
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
9-
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.
7+
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access.
8+
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time.
9+
* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `await workflow.wait_condition(lambda: workflow.all_handlers_finished())` to achieve this.
1010
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`.
1111

1212
To run, first see [README.md](../../README.md) for prerequisites.

Diff for: updates_and_signals/safe_message_handlers/workflow.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None:
212212
self.sleep_interval_seconds = 1
213213

214214
def should_continue_as_new(self) -> bool:
215-
# We don't want to continue-as-new if we're in the middle of an update
216-
if self.nodes_lock.locked():
217-
return False
218215
if workflow.info().is_continue_as_new_suggested():
219216
return True
220217
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
@@ -243,13 +240,17 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
243240
if self.state.cluster_shutdown:
244241
break
245242
if self.should_continue_as_new():
243+
# We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
244+
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
246245
workflow.logger.info("Continuing as new")
247246
workflow.continue_as_new(
248247
ClusterManagerInput(
249248
state=self.state,
250249
test_continue_as_new=input.test_continue_as_new,
251250
)
252251
)
252+
# Make sure we finish off handlers such as deleting jobs before we complete the workflow.
253+
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
253254
return ClusterManagerResult(
254255
len(self.get_assigned_nodes()),
255256
len(self.get_bad_nodes()),

0 commit comments

Comments
 (0)