-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathworkflow_test.py
143 lines (126 loc) · 5.21 KB
/
workflow_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import asyncio
import uuid
import pytest
from temporalio.client import Client, WorkflowUpdateFailedError
from temporalio.exceptions import ApplicationError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from message_passing.safe_message_handlers.activities import (
assign_nodes_to_job,
find_bad_nodes,
unassign_nodes_for_job,
)
from message_passing.safe_message_handlers.workflow import (
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
ClusterManagerWorkflow,
)
async def test_safe_message_handlers(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
allocation_updates = []
for i in range(6):
allocation_updates.append(
cluster_manager_handle.execute_update(
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=2, job_name=f"task-{i}"
),
)
)
results = await asyncio.gather(*allocation_updates)
for result in results:
assert len(result.nodes_assigned) == 2
await asyncio.sleep(1)
deletion_updates = []
for i in range(6):
deletion_updates.append(
cluster_manager_handle.execute_update(
ClusterManagerWorkflow.delete_job,
ClusterManagerDeleteJobInput(job_name=f"task-{i}"),
)
)
await asyncio.gather(*deletion_updates)
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
result = await cluster_manager_handle.result()
assert result.num_currently_assigned_nodes == 0
async def test_update_idempotency(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
result_1 = await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=5, job_name="jobby-job"
),
)
# simulate that in calling it twice, the operation is idempotent
result_2 = await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=5, job_name="jobby-job"
),
)
# the second call should not assign more nodes (it may return fewer if the health check finds bad nodes
# in between the two signals.)
assert result_1.nodes_assigned >= result_2.nodes_assigned
async def test_update_failure(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=24, job_name="big-task"
),
)
try:
# Try to assign too many nodes
await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=3, job_name="little-task"
),
)
except WorkflowUpdateFailedError as e:
assert isinstance(e.cause, ApplicationError)
assert e.cause.message == "Cannot assign 3 nodes; have only 1 available"
finally:
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
result = await cluster_manager_handle.result()
assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24