Skip to content

Commit c0e27a6

Browse files
committed
Remove health check and "bad" node concept
A concern was raised that our samples should not be demonstrating any form of polling from within an entity workflow loop, even if the poll frequency is low. Instead, we should point to long-running activity patterns.
1 parent 35b476d commit c0e27a6

File tree

4 files changed

+10
-67
lines changed

4 files changed

+10
-67
lines changed

Diff for: tests/updates_and_signals/safe_message_handlers/workflow_test.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from updates_and_signals.safe_message_handlers.activities import (
1111
assign_nodes_to_job,
12-
find_bad_nodes,
1312
unassign_nodes_for_job,
1413
)
1514
from updates_and_signals.safe_message_handlers.workflow import (
@@ -30,7 +29,7 @@ async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):
3029
client,
3130
task_queue=task_queue,
3231
workflows=[ClusterManagerWorkflow],
33-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
32+
activities=[assign_nodes_to_job, unassign_nodes_for_job],
3433
):
3534
cluster_manager_handle = await client.start_workflow(
3635
ClusterManagerWorkflow.run,
@@ -82,7 +81,7 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
8281
client,
8382
task_queue=task_queue,
8483
workflows=[ClusterManagerWorkflow],
85-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
84+
activities=[assign_nodes_to_job, unassign_nodes_for_job],
8685
):
8786
cluster_manager_handle = await client.start_workflow(
8887
ClusterManagerWorkflow.run,
@@ -106,8 +105,7 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
106105
total_num_nodes=5, job_name="jobby-job"
107106
),
108107
)
109-
# the second call should not assign more nodes (it may return fewer if the health check finds bad nodes
110-
# in between the two signals.)
108+
# the second call should not assign more nodes
111109
assert result_1.nodes_assigned >= result_2.nodes_assigned
112110

113111

@@ -121,7 +119,7 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment):
121119
client,
122120
task_queue=task_queue,
123121
workflows=[ClusterManagerWorkflow],
124-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
122+
activities=[assign_nodes_to_job, unassign_nodes_for_job],
125123
):
126124
cluster_manager_handle = await client.start_workflow(
127125
ClusterManagerWorkflow.run,
@@ -152,4 +150,4 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment):
152150
finally:
153151
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
154152
result = await cluster_manager_handle.result()
155-
assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24
153+
assert result.num_currently_assigned_nodes == 24

Diff for: updates_and_signals/safe_message_handlers/activities.py

-16
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,3 @@ class UnassignNodesForJobInput:
2727
async def unassign_nodes_for_job(input: UnassignNodesForJobInput) -> None:
2828
print(f"Deallocating nodes {input.nodes} from job {input.job_name}")
2929
await asyncio.sleep(0.1)
30-
31-
32-
@dataclass
33-
class FindBadNodesInput:
34-
nodes_to_check: Set[str]
35-
36-
37-
@activity.defn
38-
async def find_bad_nodes(input: FindBadNodesInput) -> Set[str]:
39-
await asyncio.sleep(0.1)
40-
bad_nodes = set([n for n in input.nodes_to_check if int(n) % 5 == 0])
41-
if bad_nodes:
42-
print(f"Found bad nodes: {bad_nodes}")
43-
else:
44-
print("No new bad nodes found.")
45-
return bad_nodes

Diff for: updates_and_signals/safe_message_handlers/worker.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from updates_and_signals.safe_message_handlers.workflow import (
88
ClusterManagerWorkflow,
99
assign_nodes_to_job,
10-
find_bad_nodes,
1110
unassign_nodes_for_job,
1211
)
1312

@@ -22,7 +21,7 @@ async def main():
2221
client,
2322
task_queue="safe-message-handlers-task-queue",
2423
workflows=[ClusterManagerWorkflow],
25-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
24+
activities=[assign_nodes_to_job, unassign_nodes_for_job],
2625
):
2726
# Wait until interrupted
2827
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")

Diff for: updates_and_signals/safe_message_handlers/workflow.py

+4-42
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@
55
from typing import Dict, List, Optional, Set
66

77
from temporalio import workflow
8-
from temporalio.common import RetryPolicy
98
from temporalio.exceptions import ApplicationError
109

1110
from updates_and_signals.safe_message_handlers.activities import (
1211
AssignNodesToJobInput,
13-
FindBadNodesInput,
1412
UnassignNodesForJobInput,
1513
assign_nodes_to_job,
16-
find_bad_nodes,
1714
unassign_nodes_for_job,
1815
)
1916

@@ -37,7 +34,6 @@ class ClusterManagerInput:
3734
@dataclass
3835
class ClusterManagerResult:
3936
num_currently_assigned_nodes: int
40-
num_bad_nodes: int
4137

4238

4339
# Be in the habit of storing message inputs and outputs in serializable structures.
@@ -116,7 +112,7 @@ async def assign_nodes_to_job(
116112
)
117113
nodes_to_assign = unassigned_nodes[: input.total_num_nodes]
118114
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
119-
# with delete_job and perform_health_checks, which both touch self.state.nodes.
115+
# with delete_job, which touches self.state.nodes.
120116
await self._assign_nodes_to_job(nodes_to_assign, input.job_name)
121117
return ClusterManagerAssignNodesToJobResult(
122118
nodes_assigned=self.get_assigned_nodes(job_name=input.job_name)
@@ -150,7 +146,7 @@ async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None:
150146
k for k, v in self.state.nodes.items() if v == input.job_name
151147
]
152148
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
153-
# with assign_nodes_to_job and perform_health_checks, which all touch self.state.nodes.
149+
# with assign_nodes_to_job, which touches self.state.nodes.
154150
await self._unassign_nodes_for_job(nodes_to_unassign, input.job_name)
155151

156152
async def _unassign_nodes_for_job(
@@ -167,40 +163,11 @@ async def _unassign_nodes_for_job(
167163
def get_unassigned_nodes(self) -> List[str]:
168164
return [k for k, v in self.state.nodes.items() if v is None]
169165

170-
def get_bad_nodes(self) -> Set[str]:
171-
return set([k for k, v in self.state.nodes.items() if v == "BAD!"])
172-
173166
def get_assigned_nodes(self, *, job_name: Optional[str] = None) -> Set[str]:
174167
if job_name:
175168
return set([k for k, v in self.state.nodes.items() if v == job_name])
176169
else:
177-
return set(
178-
[
179-
k
180-
for k, v in self.state.nodes.items()
181-
if v is not None and v != "BAD!"
182-
]
183-
)
184-
185-
async def perform_health_checks(self) -> None:
186-
async with self.nodes_lock:
187-
assigned_nodes = self.get_assigned_nodes()
188-
try:
189-
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
190-
# with assign_nodes_to_job and delete_job, which both touch self.state.nodes.
191-
bad_nodes = await workflow.execute_activity(
192-
find_bad_nodes,
193-
FindBadNodesInput(nodes_to_check=assigned_nodes),
194-
start_to_close_timeout=timedelta(seconds=10),
195-
# This health check is optional, and our lock would block the whole workflow if we let it retry forever.
196-
retry_policy=RetryPolicy(maximum_attempts=1),
197-
)
198-
for node in bad_nodes:
199-
self.state.nodes[node] = "BAD!"
200-
except Exception as e:
201-
workflow.logger.warn(
202-
f"Health check failed with error {type(e).__name__}:{e}"
203-
)
170+
return set([k for k, v in self.state.nodes.items() if v is not None])
204171

205172
# The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
206173
# continue-as-new.
@@ -229,9 +196,7 @@ def should_continue_as_new(self) -> bool:
229196
async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
230197
self.init(input)
231198
await workflow.wait_condition(lambda: self.state.cluster_started)
232-
# Perform health checks at intervals.
233199
while True:
234-
await self.perform_health_checks()
235200
try:
236201
await workflow.wait_condition(
237202
lambda: self.state.cluster_shutdown
@@ -250,7 +215,4 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
250215
test_continue_as_new=input.test_continue_as_new,
251216
)
252217
)
253-
return ClusterManagerResult(
254-
len(self.get_assigned_nodes()),
255-
len(self.get_bad_nodes()),
256-
)
218+
return ClusterManagerResult(len(self.get_assigned_nodes()))

0 commit comments

Comments
 (0)