Skip to content

Commit da4a966

Browse files
committed
WIP
1 parent 61ce25d commit da4a966

5 files changed

Lines changed: 89 additions & 93 deletions

File tree

src/lsst/cmservice/common/daemon_v2.py

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import pickle
2+
from asyncio import Task as AsyncTask
3+
from asyncio import TaskGroup
24
from collections.abc import Mapping
35

46
from sqlmodel import col, select
@@ -41,7 +43,7 @@ async def consider_campaigns(session: AsyncSession) -> None:
4143
# table should be to ensure the same *node* is not added twice if
4244
# the same campaign is "considered" by multiple daemons.
4345
for node in graph.processable_graph_nodes(campaign_graph):
44-
logger.info("Daemon considering node", id=node.id)
46+
logger.info("Daemon considering node", id=str(node.id))
4547
node_task = Task(
4648
namespace=campaign_id,
4749
node=node.id,
@@ -71,61 +73,59 @@ async def consider_nodes(session: AsyncSession) -> None:
7173

7274
# Using a TaskGroup context manager means all "tasks" added to the group
7375
# are awaited when the CM exits, giving us concurrency for all the nodes
74-
# being considered in the current iteration, but complicating the wrap-up
75-
# operations that involve repickling the node's machine for a future
76-
# iteration.
77-
# . async with TaskGroup() as tg:
78-
for task in tasks:
79-
node = await session.get_one(Node, task.node)
80-
# We expunge the node from *this* session because it will be added to
81-
# whatever session the node_machine acquires during its transition
82-
session.expunge(node)
83-
84-
node_machine: NodeMachine
85-
node_machine_pickle: Machine | None
86-
if node.machine is None:
87-
# create a new machine for the node
88-
node_machine = node_machine_factory(node.kind)(o=node)
89-
node_machine_pickle = None
90-
else:
91-
# unpickle the node's machine and rehydrate the Node Stateful Model
92-
node_machine_pickle = await session.get_one(Machine, node.machine)
93-
node_machine = (pickle.loads(node_machine_pickle.state)).model
94-
node_machine.node = node
95-
# discard the pickled machine from this session and context
96-
session.expunge(node_machine_pickle)
97-
del node_machine_pickle
98-
99-
# the task's status field is the target status for the node, so the
100-
# daemon intends to evolve the node machine to that state.
101-
try:
102-
assert node.status is task.previous_status
103-
except AssertionError:
104-
logger.error("Node status out of sync with Machine", id=node.id)
105-
continue
106-
107-
# check possible triggers for state
108-
# TODO how to pick the "best" trigger from multiple available?
109-
# - Add a caller-backed conditional to the triggers, to identify which
110-
# . triggers the daemon is "allowed" to use
111-
# - Determine the "desired" trigger from the (source, dest) task state
112-
# . tuple
113-
if (trigger := trigger_for_transition(task, node_machine.machine.events)) is None:
114-
logger.warning(
115-
"No trigger available for desired state transition",
116-
source=task.previous_status,
117-
dest=task.status,
118-
)
119-
continue
120-
121-
# Add the node transition trigger method to the task group
122-
_ = await node_machine.trigger(trigger)
123-
124-
# wrap up - the task is removed from the session
125-
# TODO if the node has been transitioned to a terminal state, we
126-
# should not need to keep its machine around.
127-
await session.delete(task)
128-
await session.commit()
76+
# being considered in the current iteration.
77+
async with TaskGroup() as tg:
78+
for task in tasks:
79+
node = await session.get_one(Node, task.node)
80+
81+
# the task's status field is the target status for the node, so the
82+
# daemon intends to evolve the node machine to that state.
83+
try:
84+
assert node.status is task.previous_status
85+
except AssertionError:
86+
logger.error("Node status out of sync with Machine", id=str(node.id))
87+
continue
88+
89+
# Expunge the node from *this* session because it will be added to
90+
# whatever session the node_machine acquires during its transition
91+
session.expunge(node)
92+
93+
node_machine: NodeMachine
94+
node_machine_pickle: Machine | None
95+
if node.machine is None:
96+
# create a new machine for the node
97+
node_machine = node_machine_factory(node.kind)(o=node)
98+
node_machine_pickle = None
99+
else:
100+
# unpickle the node's machine and rehydrate the Stateful Model
101+
node_machine_pickle = await session.get_one(Machine, node.machine)
102+
node_machine = (pickle.loads(node_machine_pickle.state)).model
103+
node_machine.node = node
104+
# discard the pickled machine from this session and context
105+
session.expunge(node_machine_pickle)
106+
del node_machine_pickle
107+
108+
# check possible triggers for state
109+
# TODO how to pick the "best" trigger from multiple available?
110+
# - Add a caller-backed conditional to the triggers, to identify
111+
# . triggers the daemon is "allowed" to use
112+
# - Determine the "desired" trigger from the task (source, dest)
113+
if (trigger := trigger_for_transition(task, node_machine.machine.events)) is None:
114+
logger.warning(
115+
"No trigger available for desired state transition",
116+
source=task.previous_status,
117+
dest=task.status,
118+
)
119+
continue
120+
121+
# Add the node transition trigger method to the task group
122+
# TODO give this a name and a callback
123+
task_ = tg.create_task(node_machine.trigger(trigger), name=str(node.id))
124+
task_.add_done_callback(task_runner_callback)
125+
126+
# wrap up - the task is removed from the db.
127+
await session.delete(task)
128+
await session.commit()
129129

130130

131131
async def daemon_iteration(session: AsyncSession) -> None:
@@ -138,6 +138,7 @@ async def daemon_iteration(session: AsyncSession) -> None:
138138
await consider_campaigns(session)
139139
if config.daemon.process_nodes:
140140
await consider_nodes(session)
141+
await session.close()
141142

142143

143144
def trigger_for_transition(task: Task, events: Mapping[str, Event]) -> str | None:
@@ -156,3 +157,8 @@ def trigger_for_transition(task: Task, events: Mapping[str, Event]) -> str | Non
156157
):
157158
return trigger
158159
return None
160+
161+
162+
def task_runner_callback(task: AsyncTask):
163+
"""Callback function for `asyncio.TaskGroup` tasks."""
164+
logger.info("Transition complete", id=task.get_name())

src/lsst/cmservice/machines/campaign.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,7 @@ async def update_persistent_status(self, event: EventData) -> None:
213213
self.activity_log_entry.detail["finished_at"] = timestamp.element_time()
214214

215215
# Ensure database record for transitioned object is updated
216-
self.session.add(self.campaign)
217-
await self.session.refresh(self.campaign)
216+
self.campaign = await self.session.merge(self.campaign, load=False)
218217
self.campaign.status = self.state
219218
await self.session.commit()
220219

src/lsst/cmservice/machines/node.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ async def error_handler(self, event: EventData) -> None:
124124
...
125125

126126
async def prepare_session(self, event: EventData) -> None:
127-
"""Prepares the machine by acquiring a database session."""
127+
"""Prepares the machine by acquiring a database session and merging
128+
the reference object.
129+
"""
128130
# This positive assertion concerning the Node member will prevent
129131
# any callback from proceeding if no such member is defined, but type
130132
# checkers don't know this, which is why it repeated in a TYPE_CHECKING
@@ -172,8 +174,7 @@ async def update_persistent_status(self, event: EventData) -> None:
172174
self.activity_log_entry.detail["finished_at"] = timestamp.element_time()
173175

174176
# Ensure database record for transitioned object is updated
175-
self.session.add(self.node)
176-
await self.session.refresh(self.node)
177+
self.node = await self.session.merge(self.node, load=False)
177178
self.node.status = self.state
178179
await self.session.commit()
179180

tests/v2/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ async def testdb(rawdb: DatabaseSessionDependency) -> AsyncGenerator[DatabaseSes
101101
namespace=str(NAMESPACE_DNS),
102102
name="DEFAULT",
103103
owner="root",
104+
status="accepted",
105+
metadata={},
106+
configuration={},
104107
)
105108
)
106109
await aconn.commit()

tests/v2/test_machines.py

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,31 @@ async def test_campaign_machine(test_campaign: str, session: AsyncSession) -> No
2929
x = campaign.status
3030
assert x is StatusEnum.waiting
3131

32-
# This instance is going to be "owned" by the session managed by the State-
33-
# ful Model, so we expunge it from the session managed by this test
34-
session.expunge(campaign)
3532
campaign_machine = CampaignMachine(o=campaign)
36-
3733
assert campaign_machine.is_waiting()
3834
assert await campaign_machine.may_prepare()
3935

4036
did_prepare = await campaign_machine.prepare()
4137
assert did_prepare
4238
assert await campaign_machine.may_start()
4339

44-
ref_campaign = await session.get_one(Campaign, campaign_id)
45-
await session.refresh(ref_campaign, attribute_names=["status"])
46-
x = ref_campaign.status
40+
await session.refresh(campaign, attribute_names=["status"])
41+
x = campaign.status
4742
assert x is StatusEnum.ready
4843

4944
did_start = await campaign_machine.start()
5045
assert did_start
5146
assert await campaign_machine.may_pause()
5247
assert await campaign_machine.may_finish()
5348

54-
await session.refresh(ref_campaign, attribute_names=["status"])
55-
x = ref_campaign.status
49+
await session.refresh(campaign, attribute_names=["status"])
50+
x = campaign.status
5651
assert x is StatusEnum.running
5752

5853
did_finish = await campaign_machine.trigger("finish")
5954
assert did_finish
60-
await session.refresh(ref_campaign, attribute_names=["status"])
61-
x = ref_campaign.status
55+
await session.refresh(campaign, attribute_names=["status"])
56+
x = campaign.status
6257
assert x is StatusEnum.accepted
6358

6459
await session.close()
@@ -74,7 +69,6 @@ async def test_bad_transition(test_campaign: str, session: AsyncSession) -> None
7469
campaign = await session.get_one(Campaign, campaign_id)
7570
x = campaign.status
7671
assert x is StatusEnum.waiting
77-
session.expunge(campaign)
7872
campaign_machine = CampaignMachine(o=campaign)
7973

8074
with patch(
@@ -97,26 +91,25 @@ async def test_bad_transition(test_campaign: str, session: AsyncSession) -> None
9791

9892
# Both the state machine and db object should reflect the failed state
9993
assert campaign_machine.is_failed()
100-
ref_campaign = await session.get_one(Campaign, campaign_id)
101-
await session.refresh(ref_campaign, attribute_names=["status"])
102-
x = ref_campaign.status
94+
await session.refresh(campaign, attribute_names=["status"])
95+
x = campaign.status
10396
assert x is StatusEnum.failed
10497

10598
# retry by rolling back to ready
10699
assert await campaign_machine.may_retry()
107100
await campaign_machine.trigger("retry")
108101

109102
assert campaign_machine.is_ready()
110-
await session.refresh(ref_campaign, attribute_names=["status"])
111-
x = ref_campaign.status
103+
await session.refresh(campaign, attribute_names=["status"])
104+
x = campaign.status
112105
assert x is StatusEnum.ready
113106

114107
# start and finish without further error
115108
assert await campaign_machine.start()
116109
assert await campaign_machine.finish()
117110

118-
await session.refresh(ref_campaign, attribute_names=["status"])
119-
x = ref_campaign.status
111+
await session.refresh(campaign, attribute_names=["status"])
112+
x = campaign.status
120113
assert x is StatusEnum.accepted
121114
await session.close()
122115

@@ -130,8 +123,6 @@ async def test_machine_pickle(test_campaign: str, session: AsyncSession) -> None
130123
x = campaign.status
131124
assert x is StatusEnum.waiting
132125
campaign_machine = CampaignMachine(o=campaign)
133-
session.expunge(campaign)
134-
ref_campaign = await session.get_one(Campaign, campaign_id)
135126

136127
# evolve the machine to "prepared"
137128
await campaign_machine.prepare()
@@ -145,15 +136,13 @@ async def test_machine_pickle(test_campaign: str, session: AsyncSession) -> None
145136
# not necessarily deterministic (i.e., they do not have to be namespaced.)
146137
session.add(machine_db)
147138
await session.commit()
148-
ref_campaign = await session.get_one(Campaign, campaign_id)
149-
ref_campaign.machine = machine_db.id
139+
campaign.machine = machine_db.id
150140
await session.commit()
151-
await session.refresh(ref_campaign, attribute_names=["status", "machine"])
141+
await session.refresh(campaign, attribute_names=["status", "machine"])
152142

153143
await session.close()
154144
del campaign_machine
155145
del campaign
156-
del ref_campaign
157146
del machine_db
158147
del machine_pickle
159148

@@ -169,18 +158,16 @@ async def test_machine_pickle(test_campaign: str, session: AsyncSession) -> None
169158
assert machine_unpickle.state is not None
170159
new_campaign_machine: CampaignMachine = (pickle.loads(machine_unpickle.state)).model
171160
new_campaign_machine.campaign = new_campaign
172-
session.expunge(new_campaign)
173-
new_ref_campaign = await session.get_one(Campaign, campaign_id)
174161

175162
# test the rehydrated machine and continue to evolve it
176-
assert new_campaign_machine.state == new_ref_campaign.status
163+
assert new_campaign_machine.state == new_campaign.status
177164

178165
await new_campaign_machine.start()
179-
await session.refresh(new_ref_campaign, attribute_names=["status"])
180-
assert new_campaign_machine.state == new_ref_campaign.status
166+
await session.refresh(new_campaign, attribute_names=["status"])
167+
assert new_campaign_machine.state == new_campaign.status
181168

182169
await new_campaign_machine.finish()
183-
await session.refresh(new_ref_campaign, attribute_names=["status"])
184-
assert new_campaign_machine.state == new_ref_campaign.status
170+
await session.refresh(new_campaign, attribute_names=["status"])
171+
assert new_campaign_machine.state == new_campaign.status
185172

186173
await session.close()

0 commit comments

Comments
 (0)