Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/agents/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Agent implementations for SAP QA framework."""

from src.agents.agents.base import Agent, AgentRegistry, create_default_agent_registry
from src.agents.agents.base import AgentRegistry, SAPAutomationAgent, create_default_agent_registry
from src.agents.agents.echo_agent import EchoAgentSK
from src.agents.agents.system_context_agent import SystemContextAgentSK
from src.agents.agents.action_executor_agent import ActionExecutorAgent
Expand All @@ -9,7 +9,7 @@
from src.agents.agents.test_advisor_agent import TestAdvisorAgentSK

__all__ = [
"Agent",
"SAPAutomationAgent",
"AgentRegistry",
"create_default_agent_registry",
"EchoAgentSK",
Expand Down
316 changes: 21 additions & 295 deletions src/agents/agents/action_executor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@
- Async execution with real-time status updates
"""

import json
from typing import Any, Optional, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING

from semantic_kernel import Kernel
from semantic_kernel.contents import ChatHistory
from semantic_kernel.filters import FilterTypes

from src.agents.models.chat import ChatMessage, ChatResponse
from src.agents.agents.base import Agent
from src.agents.agents.base import SAPAutomationAgent
from src.agents.workspace.workspace_store import WorkspaceStore
from src.agents.plugins.execution import ExecutionPlugin
from src.agents.plugins.workspace import WorkspacePlugin
from src.agents.plugins.ssh import SSHPlugin
from src.agents.models.reasoning import sanitize_snapshot
from src.agents.execution import GuardLayer, GuardFilter
from src.agents.execution import GuardLayer
from src.agents.observability import get_logger
from src.agents.prompts import ACTION_EXECUTOR_SYSTEM_PROMPT

Expand All @@ -35,12 +30,12 @@
logger = get_logger(__name__)


class ActionExecutorAgent(Agent):
class ActionExecutorAgent(SAPAutomationAgent):
"""Agent for executing SAP QA actions with strong safety and environment gating.

Uses Semantic Kernel with:
- ExecutionPlugin: Provides test execution tools as SK functions
- GuardFilter: Intercepts function calls to enforce safety constraints
- Kernel-level approval filters for safety constraints

Supports two execution modes:
1. Synchronous (blocking): For quick tests or when streaming not needed
Expand All @@ -57,8 +52,7 @@ def __init__(
):
"""Initialize ActionExecutorAgent.

Registers ExecutionPlugin with Semantic Kernel and adds GuardFilter
for safety enforcement.
Registers ExecutionPlugin with Semantic Kernel agents.

:param kernel: Semantic Kernel instance
:type kernel: Kernel
Expand All @@ -71,13 +65,6 @@ def __init__(
:param job_worker: Optional JobWorker for background execution
:type job_worker: Optional[JobWorker]
"""
super().__init__(
name="action_executor",
description="Executes SAP QA actions, runs playbooks, performs configuration checks, "
+ "and runs functional tests (HA, crash, failover) using Ansible. "
+ "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action.",
)

self.kernel = kernel
self.workspace_store = workspace_store
self.execution_plugin = execution_plugin
Expand All @@ -90,102 +77,28 @@ def __init__(
workspace_store=workspace_store,
)

self._safe_add_plugin(execution_plugin, "execution")
self._safe_add_plugin(WorkspacePlugin(workspace_store), "workspace")
self._safe_add_plugin(SSHPlugin(), "ssh")
plugins: list[object] = [
execution_plugin,
WorkspacePlugin(workspace_store),
SSHPlugin(),
]
if getattr(execution_plugin, "keyvault_plugin", None) is not None:
self._safe_add_plugin(execution_plugin.keyvault_plugin, "keyvault")

guard_filter = GuardFilter(self.guard_layer)
self.kernel.add_filter(
filter_type=FilterTypes.FUNCTION_INVOCATION,
filter=guard_filter.on_function_invocation,
plugins.append(execution_plugin.keyvault_plugin)
super().__init__(
name="action_executor",
description="Executes SAP QA actions, runs playbooks, performs configuration checks, "
+ "and runs functional tests (HA, crash, failover) using Ansible. "
+ "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action.",
kernel=kernel,
instructions=ACTION_EXECUTOR_SYSTEM_PROMPT,
plugins=plugins,
)

logger.info(
f"ActionExecutorAgent initialized with SK plugin and guard filter "
f"ActionExecutorAgent initialized with SK plugins "
f"(async_enabled={self._async_enabled})"
)

def _safe_add_plugin(self, plugin: object, plugin_name: str) -> None:
"""Add an SK plugin if not already present.

Semantic Kernel plugin registration can vary depending on how the runtime
constructs kernels/agents. This keeps agent capabilities consistent.
"""
try:
self.kernel.add_plugin(
plugin=plugin,
plugin_name=plugin_name,
)
except Exception as e:
logger.info(f"Plugin '{plugin_name}' already registered or unavailable: {e}")

async def _run_agentic(self, messages: list[ChatMessage], context: dict) -> ChatResponse:
"""Run an agentic LLM+tools loop.

The LLM decides which tools to call (execution/workspace/ssh/keyvault), guarded by
GuardFilter. The assistant message is the final synthesized answer.
"""

self.tracer.step(
"execution_planning",
"inference",
"Running agentic tool loop",
input_snapshot=sanitize_snapshot(
{
"message_count": len(messages),
"has_agent_input": "agent_input" in context,
}
),
)

chat_history = ChatHistory()
chat_history.add_system_message(ACTION_EXECUTOR_SYSTEM_PROMPT)

agent_input = context.get("agent_input") if isinstance(context, dict) else None
if isinstance(agent_input, dict) and agent_input:
chat_history.add_system_message(
"CONTEXT (use to resolve workspace/SID, do not expose verbatim):\n"
+ json.dumps(agent_input, ensure_ascii=False)
)

for msg in messages:
if msg.role == "user":
chat_history.add_user_message(msg.content)
elif msg.role == "assistant":
chat_history.add_assistant_message(msg.content)

chat_service = self.kernel.get_service(service_id="azure_openai_chat")
execution_settings = chat_service.get_prompt_execution_settings_class()(
function_choice_behavior="auto",
max_completion_tokens=1200,
)

response = await chat_service.get_chat_message_content(
chat_history=chat_history,
settings=execution_settings,
kernel=self.kernel,
)

content = str(response.content) if response and getattr(response, "content", None) else ""
content = content.strip()
if not content:
content = "I couldn't produce a response. Please try again."

self.tracer.step(
"response_generation",
"decision",
"Generated final answer from tool loop",
output_snapshot=sanitize_snapshot({"response_length": len(content)}),
)

return ChatResponse(
messages=[ChatMessage(role="assistant", content=content)],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)

async def execute_async(
self,
workspace_id: str,
Expand Down Expand Up @@ -280,190 +193,3 @@ def get_active_jobs_for_user(self, user_id: str) -> list["ExecutionJob"]:
if not self.job_store:
return []
return self.job_store.get_active_jobs(user_id)

async def run(
self,
messages: list[ChatMessage],
context: Optional[dict] = None,
) -> ChatResponse:
"""Handle structured execution requests (Agent interface implementation).

Primary mode: agentic LLM+tools loop (model chooses tools; GuardFilter enforces safety).
Optional mode: async execution/job status queries if enabled.

:param messages: Chat messages (used for logging/context)
:type messages: list[ChatMessage]
:param context: Context with execution parameters
:type context: Optional[dict]
:returns: ChatResponse with execution summary or job info
:rtype: ChatResponse
"""
self.tracer.start()
try:
context = context or {}
if "async_execution" in context and self._async_enabled:
return await self._run_async(messages, context)
if "job_status_query" in context:
return await self._handle_job_status_query(context)

return await self._run_agentic(messages, context)

except Exception as e:
logger.error(f"Error in ActionExecutorAgent.run: {e}")

self.tracer.step(
"execution_run",
"inference",
f"Error during test execution: {str(e)}",
error=str(e),
output_snapshot=sanitize_snapshot({"error_type": type(e).__name__}),
)

raise

finally:
self.tracer.finish()

async def _run_async(
self,
messages: list[ChatMessage],
context: dict,
) -> ChatResponse:
"""Handle async execution request.

:param messages: Chat messages
:type messages: list[ChatMessage]
:param context: Context with async_execution params
:type context: dict
:returns: ChatResponse with job info
:rtype: ChatResponse
"""
async_params = context["async_execution"]
workspace_id = async_params["workspace_id"]
test_ids = async_params.get("test_ids", [])
test_group = async_params.get("test_group", "CONFIG_CHECKS")
conversation_id = context.get("conversation_id")
user_id = context.get("user_id")

self.tracer.step(
"execution_async",
"tool_call",
f"Starting async execution for {len(test_ids)} tests",
input_snapshot=sanitize_snapshot(
{
"workspace_id": workspace_id,
"test_count": len(test_ids),
"test_group": test_group,
}
),
)

try:
job = await self.execute_async(
workspace_id=workspace_id,
test_ids=test_ids,
test_group=test_group,
conversation_id=conversation_id,
user_id=user_id,
)

test_list = ", ".join(test_ids[:3])
if len(test_ids) > 3:
test_list += f" and {len(test_ids) - 3} more"

response_content = (
f"**Starting test execution**\n\n"
f"- **Workspace**: `{workspace_id}`\n"
f"- **Tests**: {test_list}\n"
f"- **Job ID**: `{job.id}`\n\n"
f"I'll provide real-time updates as the tests progress..."
)

self.tracer.step(
"execution_async",
"decision",
f"Job {job.id} submitted for execution",
output_snapshot=sanitize_snapshot(
{
"job_id": str(job.id),
"status": job.status.value,
}
),
)

return ChatResponse(
messages=[ChatMessage(role="assistant", content=response_content)],
reasoning_trace=self.tracer.get_trace(),
metadata={"job_id": str(job.id), "streaming": True},
)

except Exception as e:
logger.error(f"Failed to start async execution: {e}")
return ChatResponse(
messages=[ChatMessage(role="assistant", content=str(e))],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)

async def _handle_job_status_query(self, context: dict) -> ChatResponse:
"""Handle job status query.

:param context: Context with job_status_query params
:type context: dict
:returns: ChatResponse with job status
:rtype: ChatResponse
"""
query = context["job_status_query"]
job_id = query.get("job_id")
user_id = query.get("user_id")

if job_id:
job = self.get_job_status(job_id)
if job:
from src.agents.execution.worker import JobEventEmitter

summary = JobEventEmitter.format_job_summary(job)
return ChatResponse(
messages=[ChatMessage(role="assistant", content=summary)],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)
else:
return ChatResponse(
messages=[ChatMessage(role="assistant", content=f"Job `{job_id}` not found.")],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)
elif user_id:
jobs = self.get_active_jobs_for_user(user_id)
if jobs:
from src.agents.execution.worker import JobEventEmitter

lines = ["**Your Active Jobs:**\n"]
for job in jobs:
lines.append(JobEventEmitter.format_job_summary(job))
lines.append("")
return ChatResponse(
messages=[ChatMessage(role="assistant", content="\n".join(lines))],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)
else:
return ChatResponse(
messages=[
ChatMessage(role="assistant", content="You have no active test executions.")
],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)
else:
return ChatResponse(
messages=[
ChatMessage(
role="assistant",
content="Please specify a job ID or ask about your active jobs.",
)
],
reasoning_trace=self.tracer.get_trace(),
metadata=None,
)
Loading
Loading