Skip to content

Commit 344421a

Browse files
committed
WIP
1 parent 61ce25d commit 344421a

5 files changed

Lines changed: 215 additions & 479 deletions

File tree

Lines changed: 68 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import pickle
2+
from asyncio import Task as AsyncTask
3+
from asyncio import TaskGroup
24
from collections.abc import Mapping
35

46
from sqlmodel import col, select
@@ -26,7 +28,11 @@ async def consider_campaigns(session: AsyncSession) -> None:
2628
until a Node is found that requires attention. Each Node found is added to
2729
the Tasks table as a queue item.
2830
"""
29-
c_statement = select(Campaign.id).where(col(Campaign.status).in_((StatusEnum.ready, StatusEnum.running)))
31+
c_statement = (
32+
select(Campaign.id)
33+
.where(col(Campaign.status).in_((StatusEnum.ready, StatusEnum.running)))
34+
.with_for_update(key_share=True, skip_locked=True)
35+
)
3036
campaigns = (await session.exec(c_statement)).all()
3137

3238
for campaign_id in campaigns:
@@ -37,11 +43,8 @@ async def consider_campaigns(session: AsyncSession) -> None:
3743
edges = (await session.exec(e_statement)).all()
3844
campaign_graph = await graph.graph_from_edge_list_v2(edges=edges, session=session)
3945

40-
# TODO consider a multi-daemon deployment, and what the PK of the Task
41-
# table should be to ensure the same *node* is not added twice if
42-
# the same campaign is "considered" by multiple daemons.
4346
for node in graph.processable_graph_nodes(campaign_graph):
44-
logger.info("Daemon considering node", id=node.id)
47+
logger.info("Daemon considering node", id=str(node.id))
4548
node_task = Task(
4649
namespace=campaign_id,
4750
node=node.id,
@@ -65,67 +68,65 @@ async def consider_nodes(session: AsyncSession) -> None:
6568
After handling, the Node's FSM is serialized and the Node is updated with
6669
new values as necessary. The Task is not returned to the Task table.
6770
"""
68-
statement = select(Task)
71+
statement = select(Task).with_for_update(skip_locked=True)
6972
# TODO add filter criteria for priority and site affinity
7073
tasks = (await session.exec(statement)).all()
7174

7275
# Using a TaskGroup context manager means all "tasks" added to the group
7376
# are awaited when the CM exits, giving us concurrency for all the nodes
74-
# being considered in the current iteration, but complicating the wrap-up
75-
# operations that involve repickling the node's machine for a future
76-
# iteration.
77-
# . async with TaskGroup() as tg:
78-
for task in tasks:
79-
node = await session.get_one(Node, task.node)
80-
# We expunge the node from *this* session because it will be added to
81-
# whatever session the node_machine acquires during its transition
82-
session.expunge(node)
83-
84-
node_machine: NodeMachine
85-
node_machine_pickle: Machine | None
86-
if node.machine is None:
87-
# create a new machine for the node
88-
node_machine = node_machine_factory(node.kind)(o=node)
89-
node_machine_pickle = None
90-
else:
91-
# unpickle the node's machine and rehydrate the Node Stateful Model
92-
node_machine_pickle = await session.get_one(Machine, node.machine)
93-
node_machine = (pickle.loads(node_machine_pickle.state)).model
94-
node_machine.node = node
95-
# discard the pickled machine from this session and context
96-
session.expunge(node_machine_pickle)
97-
del node_machine_pickle
98-
99-
# the task's status field is the target status for the node, so the
100-
# daemon intends to evolve the node machine to that state.
101-
try:
102-
assert node.status is task.previous_status
103-
except AssertionError:
104-
logger.error("Node status out of sync with Machine", id=node.id)
105-
continue
106-
107-
# check possible triggers for state
108-
# TODO how to pick the "best" trigger from multiple available?
109-
# - Add a caller-backed conditional to the triggers, to identify which
110-
# . triggers the daemon is "allowed" to use
111-
# - Determine the "desired" trigger from the (source, dest) task state
112-
# . tuple
113-
if (trigger := trigger_for_transition(task, node_machine.machine.events)) is None:
114-
logger.warning(
115-
"No trigger available for desired state transition",
116-
source=task.previous_status,
117-
dest=task.status,
118-
)
119-
continue
120-
121-
# Add the node transition trigger method to the task group
122-
_ = await node_machine.trigger(trigger)
123-
124-
# wrap up - the task is removed from the session
125-
# TODO if the node has been transitioned to a terminal state, we
126-
# should not need to keep its machine around.
127-
await session.delete(task)
128-
await session.commit()
77+
# being considered in the current iteration.
78+
async with TaskGroup() as tg:
79+
for task in tasks:
80+
node = await session.get_one(Node, task.node)
81+
82+
# the task's status field is the target status for the node, so the
83+
# daemon intends to evolve the node machine to that state.
84+
try:
85+
assert node.status is task.previous_status
86+
except AssertionError:
87+
logger.error("Node status out of sync with Machine", id=str(node.id))
88+
continue
89+
90+
# Expunge the node from *this* session because it will be added to
91+
# whatever session the node_machine acquires during its transition
92+
session.expunge(node)
93+
94+
node_machine: NodeMachine
95+
node_machine_pickle: Machine | None
96+
if node.machine is None:
97+
# create a new machine for the node
98+
node_machine = node_machine_factory(node.kind)(o=node)
99+
node_machine_pickle = None
100+
else:
101+
# unpickle the node's machine and rehydrate the Stateful Model
102+
node_machine_pickle = await session.get_one(Machine, node.machine)
103+
node_machine = (pickle.loads(node_machine_pickle.state)).model
104+
node_machine.node = node
105+
# discard the pickled machine from this session and context
106+
session.expunge(node_machine_pickle)
107+
del node_machine_pickle
108+
109+
# check possible triggers for state
110+
# TODO how to pick the "best" trigger from multiple available?
111+
# - Add a caller-backed conditional to the triggers, to identify
112+
# . triggers the daemon is "allowed" to use
113+
# - Determine the "desired" trigger from the task (source, dest)
114+
if (trigger := trigger_for_transition(task, node_machine.machine.events)) is None:
115+
logger.warning(
116+
"No trigger available for desired state transition",
117+
source=task.previous_status,
118+
dest=task.status,
119+
)
120+
continue
121+
122+
# Add the node transition trigger method to the task group
123+
# TODO give this a name and a callback
124+
task_ = tg.create_task(node_machine.trigger(trigger), name=str(node.id))
125+
task_.add_done_callback(task_runner_callback)
126+
127+
# wrap up - the task is removed from the db.
128+
await session.delete(task)
129+
await session.commit()
129130

130131

131132
async def daemon_iteration(session: AsyncSession) -> None:
@@ -138,6 +139,7 @@ async def daemon_iteration(session: AsyncSession) -> None:
138139
await consider_campaigns(session)
139140
if config.daemon.process_nodes:
140141
await consider_nodes(session)
142+
await session.close()
141143

142144

143145
def trigger_for_transition(task: Task, events: Mapping[str, Event]) -> str | None:
@@ -156,3 +158,8 @@ def trigger_for_transition(task: Task, events: Mapping[str, Event]) -> str | Non
156158
):
157159
return trigger
158160
return None
161+
162+
163+
def task_runner_callback(task: AsyncTask):
164+
"""Callback function for `asyncio.TaskGroup` tasks."""
165+
logger.info("Transition complete", id=task.get_name())

0 commit comments

Comments
 (0)