Skip to content

DM-51738 : Implement Campaign and Node FSM#198

Merged
tcjennings merged 7 commits intotickets/DM-51337/releasefrom
tickets/DM-51738
Aug 6, 2025
Merged

DM-51738 : Implement Campaign and Node FSM#198
tcjennings merged 7 commits intotickets/DM-51337/releasefrom
tickets/DM-51738

Conversation

@tcjennings
Copy link
Copy Markdown
Collaborator

Implements core logic for campaign and node evolution through an FSM implemented using the pytransitions package.

Provides scaffolding for iterative implementation of business logic (i.e., translating v1 "scripts" to v2 "triggers") with ABCs and E2E tests.

@tcjennings tcjennings force-pushed the tickets/DM-51505/v2_nodes_edges branch from 00b4769 to 99b7b13 Compare July 8, 2025 19:55
@tcjennings tcjennings force-pushed the tickets/DM-51738 branch 8 times, most recently from 410006a to 4caf5fb Compare July 10, 2025 22:08
@tcjennings tcjennings marked this pull request as ready for review July 11, 2025 19:09
Base automatically changed from tickets/DM-51505/v2_nodes_edges to tickets/DM-51488/v2_api July 15, 2025 13:49
Base automatically changed from tickets/DM-51488/v2_api to tickets/DM-51337/release July 15, 2025 13:50
- move campaign test fixture to module conftest
- scope campaign test fixture to function
- add auto fixture for patched config
- Implement Campaign/Node FSM using pytransitions
- Establish daemon v2 iteration loop
- Establish E2E FSM and Daemon tests
- machines get their own sessions
- add config flags for daemon v2 campaign/node processing
- validate graph and affect status update in background task
- update fastapi session override in tests
- add activity log route for campaigns
- implement campaign status updates via PATCH and FSM
- chore: refactor test fixtures
- fix: improve session hygiene in tests
- chore: denormalize timestamp columns for activitylog
- fix: FSM finalize based on activitylog finished_at instead of detail
- fix: handle full lifecycle of session in session factory
- feat(daemon): refactor task callbacks as modular callables
- fix(daemon): set submitted_at and finished_at for tasks instead of deleting
- fix(graph): use session.get() instead of select()
- fix: allow null node in activty log
- rename "session manager" class as "database manager"
- replace "get_async_session" function with calls to sessionmaker().
@tcjennings tcjennings force-pushed the tickets/DM-51738 branch 2 times, most recently from a9dbeda to 1a1305c Compare July 15, 2025 20:36
- use "uuid" instead of "id" in graph node simple mode
- require namespace when GET node by name
- include additional links in campaign response header
- campaign post returns existing campaign on duplicate
- improve consistency in route parameter naming
- delegate more route typing to pydantic models
- use sorting in activity log route
Copy link
Copy Markdown
Member

@ctslater ctslater left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive amount of substantive code here, really fun to read through (and sorry it took me forever!) Just a few questions on things I didn't understand, no real disagreements. Going to be fun to start trying it out.

# TODO: notification callback
finalizer = create_task(finalize_runner_callback(context))
finalizer.add_done_callback(callbacks.discard)
callbacks.add(finalizer)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't something need to await callbacks?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. This is a bit of a rube goldberg sequence of async tasks that starts with the primary task that the daemon is adding to a task group, which is awaited at the end of the TaskGroup context manager. Each of these tasks has a callback coro (task_runner_callback) which is awaited when the original task completes; and that coro itself has a callback coro to be awaited when it finishes, and this last bit is where callbacks comes in. callbacks is a set collection that is holding a strong reference to the callbacks set up by task_runner_callback so they don't get lost in the shuffle. The last callback coro (callbacks.discard) cleans up the collection and discards the strong reference to coros as they complete. A simpler example of this pattern is in the python docs.

happy_path = [StatusEnum.waiting, StatusEnum.ready, StatusEnum.running, StatusEnum.accepted]
if self in happy_path:
i = happy_path.index(self)
return happy_path[i + 1]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope nobody calls this on StatusEnum.accepted?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true by fiat but in uncontrolled hands it could be an IndexError waiting to happen. I'll make a note to guard against that.

class InvalidCampaignGraphError(Exception): ...


class CampaignMachine(NodeMachine):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I understanding right that the implementation of "start", "finish" and "resume" triggers will go in here?

assert self.db_model is not None

if self.activity_log_entry is not None:
return None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this gives up here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a guard against the case where a log entry object already exists when this callback is invoked. It doesn't "give up" exactly, it returns early instead of clobbering an existing instance attribute.

async def do_finish(self, event: EventData) -> None: ...

async def is_successful(self, event: EventData) -> bool:
"""Checks whether the WMS job is finished or not based on the result of
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a placeholder for future-work but this only makes sense for Groups.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the organization of callbacks and docstrings will be changed substantially as we implement specific types of Node machines in the next iteration.

Comment thread tests/v2/test_daemon.py
while end_node.status is not StatusEnum.accepted:
i -= 1
await consider_campaigns(session)
await consider_nodes(session)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not understanding how the daemon reaches the nodes beyond the first iteration in this loop? Don't the nodes need to advance to some "finished" state before the next layer in the graph can be reached?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop is watching the status of the end node of the campaign. Because this is a test, it is a contrived example where the number of iterations is "known" (hence the countdown variable i) but each node in turn is advanced by one status along the happy path for each execution of this loop (the daemon "task" for this being generated by consider_campaigns and the resolution of that task by consider_nodes). After the first iteration the start node will have advanced from waiting to ready, and after the second iteration from ready to running, etc. The loop only ends when the end_node's status reaches accepted which can only happen when the entire graph has been "considered".

@tcjennings tcjennings merged commit 04d6d7a into tickets/DM-51337/release Aug 6, 2025
7 checks passed
@tcjennings tcjennings deleted the tickets/DM-51738 branch August 6, 2025 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants