infra/baseline-v28: _log_extra() helper, ActiveMQ destination prefix validation#29
Conversation
- Initialize pid, hostname, operational_state in __init__ - Set operational_state to READY after MQ subscription - Set operational_state to EXITED before shutdown - Include new fields in all heartbeat/status methods Supports agent lifecycle management for workflow orchestration. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- run() now skips connect if mq_connected already True Enables subclasses to connect in __init__ for early messaging - Use set_ready() instead of direct operational_state assignment Ensures consistent state transition logging Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add validation in BaseAgent.__init__ and send_message to reject bare destination names (e.g., 'workflow_control'). All destinations must now use explicit /queue/name (anycast) or /topic/name (multicast) format. This prevents message routing failures caused by ambiguous addressing in Artemis, where bare names can be interpreted differently than prefixed names. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Only capture specific workflow context fields, not all record attributes. Removed cruft fields (pathname, filename, created, msecs). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Store self.username from getpass.getuser() - Add current_execution_id and current_run_id tracking - Add _log_extra() helper for consistent logging context - rest_logging: capture username field Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Test Coverage Summary |
There was a problem hiding this comment.
Pull request overview
This PR introduces breaking changes to enforce explicit ActiveMQ destination prefixes, adds workflow execution context to logging, and enhances agent lifecycle management with operational state tracking.
Changes:
- Added ActiveMQ destination prefix validation requiring
/queue/or/topic/prefixes for all destinations - Introduced
_log_extra()helper method for consistent logging context with execution_id, run_id, and username - Enhanced agent lifecycle with operational_state tracking (STARTING, READY, PROCESSING, EXITED) and process identification fields (pid, hostname)
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/swf_common_lib/rest_logging.py | Modified extra_data to capture execution context (execution_id, workflow_name, run_id, username) instead of technical record details |
| src/swf_common_lib/base_agent.py | Added destination prefix validation, _log_extra() helper, operational state management, and enhanced agent metadata tracking |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| extra_data = {} | ||
| for key in ('execution_id', 'workflow_name', 'run_id', 'username'): | ||
| if hasattr(record, key): | ||
| extra_data[key] = getattr(record, key) |
There was a problem hiding this comment.
The tuple of keys is duplicated logic with the _log_extra() method in base_agent.py which includes 'execution_id', 'run_id', and 'username' but not 'workflow_name'. This inconsistency could lead to confusion. Consider defining a shared constant or ensuring both locations use the same set of keys. Note that 'workflow_name' is checked here but not explicitly set by _log_extra().
| if not subscription_queue.startswith('/queue/') and not subscription_queue.startswith('/topic/'): | ||
| raise ValueError( | ||
| f"subscription_queue must start with '/queue/' or '/topic/', got: '{subscription_queue}'. " | ||
| f"Use '/queue/{subscription_queue}' for anycast or '/topic/{subscription_queue}' for multicast." | ||
| ) |
There was a problem hiding this comment.
The destination prefix validation logic is duplicated between __init__ (line 139) and send_message (line 524). Consider extracting this into a private helper method like _validate_destination(destination) to reduce duplication and ensure consistent validation across both locations.
| # Create unique agent name with username and sequential ID | ||
| import getpass | ||
| username = getpass.getuser() | ||
| self.username = getpass.getuser() |
There was a problem hiding this comment.
The username is now stored as an instance variable but the local variable assignment pattern is changed from storing in a temporary variable. While this works correctly, consider that if getpass.getuser() throws an exception, the agent will fail during initialization without self.username being set, which could cause issues in exception handlers that try to access it. Consider adding error handling or documenting this assumption.
| logging.error(f"Failed to connect after {max_retries} attempts") | ||
| raise | ||
| # Connect if not already connected (some subclasses connect in __init__) | ||
| if not getattr(self, 'mq_connected', False): |
There was a problem hiding this comment.
The comment states 'some subclasses connect in init' but this is a significant behavioral change. This should be documented in the class-level docstring or method docstring to inform subclass authors that they can optionally set mq_connected=True in their __init__ to skip the connection logic in run(). Without documentation, this pattern may be missed by developers creating new agent subclasses.
| agent = self | ||
|
|
||
| class ProcessingContext: | ||
| def __enter__(self): | ||
| agent.set_processing() | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| agent.set_ready() | ||
| return False # Don't suppress exceptions | ||
|
|
||
| return ProcessingContext() |
There was a problem hiding this comment.
The processing() context manager defines an inner class that captures agent via closure. While functional, this is an unconventional pattern. Consider using the contextlib.contextmanager decorator or making this a proper context manager class. The current approach creates a new class definition on every call to processing(), which is inefficient. For example: use @contextlib.contextmanager and yield between set_processing() and set_ready() calls.
v28 (2026-01-13)
ActiveMQ Destination Prefix Requirement (Breaking Change)
All ActiveMQ destinations now require explicit
/queue/or/topic/prefix. This is a breaking change that affects all agent code sending messages.Before (incorrect):
After (correct):
Why this matters:
/queue/for anycast (one consumer) vs/topic/for multicast (all consumers)ValueErrorfor bare namesExisting code using bare names will fail immediately with a clear error message explaining the required format. All example agents and workflow code have been updated.
MCP Workflow Control - AI-Driven Operations
The MCP service now provides full workflow control, enabling AI assistants to start, stop, and monitor workflows without requiring CLI access. This is the key enabler for AI-driven testbed operations.
New workflow control tools:
start_workflow- Start a workflow by sending a command to the DAQ Simulator agent. All parameters are optional; defaults are read from the user'stestbed.toml. Override specific parameters (e.g.,stf_count=5) while inheriting others from config.stop_workflow- Stop a running workflow gracefully by execution_id. The workflow stops at the next checkpoint.end_execution- Mark a stuck execution as terminated in the database. Use this to clean up stale executions that the agent can no longer reach.New agent management tools:
kill_agent- Send SIGKILL to an agent process by instance name. Looks up the agent's PID and hostname, kills if on the same host, and always marks the agent as EXITED in the database.New monitoring tools:
get_workflow_monitor- Aggregated view of workflow execution: status, phase, STF count, key events, and errors (from both messages and logs). Single-call alternative to polling multiple tools.list_workflow_monitors- List recent executions (last 24h) that can be monitored.The MCP tool count has grown from 20 to 27 tools. Documentation in
swf-monitor/docs/MCP.mdhas been updated to reflect all tools.User Agent Manager - Per-User Testbed Control via MCP
A new agent manager daemon enables MCP-driven control of per-user testbed agents. This allows AI assistants to start and stop a user's testbed without requiring SSH or terminal access.
Architecture:
testbed agent-managerdaemon in their swf-testbed directory/queue/agent_control.<username>) for commandsNew MCP tools:
check_agent_manager(username)- Check if a user's agent manager is alive. Returns heartbeat status, control queue name, and whether agents are running.start_user_testbed(username, config_name)- Send start command to agent manager. Agents start asynchronously.stop_user_testbed(username)- Send stop command to agent manager.Usage:
Then an AI assistant can:
check_agent_manager(username='wenauseic')start_user_testbed(username='wenauseic')start_workflow()stop_user_testbed(username='wenauseic')Persistent WorkflowRunner with Message-Driven Execution
The WorkflowRunner agent has been redesigned as a persistent, message-driven service rather than a one-shot script.
Key changes:
/queue/workflow_controlfor commandsrun_workflow(from MCPstart_workflow) andstop_workflowexecution_id(e.g.,stf_datataking-wenauseic-0044)stop_workflowcommand targets a specific execution by ID, enabling graceful terminationWhy this matters:
Enhanced get_system_state - User Context and Readiness
The
get_system_stateMCP tool now accepts ausernameparameter and provides user-specific context.New fields returned:
user_context- Namespace and workflow defaults from user'stestbed.tomlagent_manager- Status of user's agent manager daemon (healthy/unhealthy/missing/exited)workflow_runner- Status of DAQ Simulator in user's namespaceready_to_run- Boolean indicating if the user can start a workflowlast_execution- Most recent workflow execution in user's namespaceerrors_last_hour- Count of ERROR logs in user's namespaceThis enables AI assistants to answer questions like "Am I ready to run a workflow?" with a single call.
EXITED Status and Agent Lifecycle
Improved agent lifecycle management with explicit EXITED status handling.
Changes:
status='EXITED'andoperational_state='EXITED'on clean shutdownlist_agentsexcludes EXITED agents by default - usestatus='EXITED'to see only exited, orstatus='all'to see allkill_agentalways marks agents as EXITED, even if the kill failsMigration: A database migration (
0014_systemagent_exited_status.py) adds the EXITED choice to the status field.Logging Context with execution_id
Improved log traceability with execution context in log records.
Changes:
_log_extra()helper in BaseAgent returns consistent extra fields:username,execution_id,run_idlogger.info("message", extra=self._log_extra())list_logsMCP tool now supportsexecution_idparameter to filter logs by workflow executionUsage:
This enables tracing all log messages for a specific workflow execution, essential for debugging workflow failures.
Monitor UI Improvements
Log detail page: The log detail view (
/logs/<id>/) now displays theextra_dataJSON field when present. This shows execution context (execution_id, run_id, namespace, username) that agents include via_log_extra(). Previously this context was captured but not visible in the UI.Log list filtering: The log list now supports filtering by execution_id, complementing the existing app_name, instance_name, and level filters.
Documentation Updates