From 943c642406cd1ee4fe24c411304b3740a657ff73 Mon Sep 17 00:00:00 2001 From: Devansh Jain <86314060+devanshjainms@users.noreply.github.com> Date: Thu, 18 Dec 2025 20:53:04 -0800 Subject: [PATCH] Refactor agents to Semantic Kernel agent framework --- src/agents/agents/__init__.py | 4 +- src/agents/agents/action_executor_agent.py | 316 ++------------------- src/agents/agents/action_planner_agent.py | 64 +---- src/agents/agents/base.py | 294 ++----------------- src/agents/agents/echo_agent.py | 15 +- src/agents/agents/orchestrator.py | 265 ++++++----------- src/agents/agents/system_context_agent.py | 17 +- src/agents/agents/test_advisor_agent.py | 54 +--- src/agents/filters/__init__.py | 8 + src/agents/filters/approval_filter.py | 135 +++++++++ src/agents/prompts.py | 32 +-- src/agents/sk_kernel.py | 9 + 12 files changed, 308 insertions(+), 905 deletions(-) create mode 100644 src/agents/filters/__init__.py create mode 100644 src/agents/filters/approval_filter.py diff --git a/src/agents/agents/__init__.py b/src/agents/agents/__init__.py index 4fb86e78..18b502c7 100644 --- a/src/agents/agents/__init__.py +++ b/src/agents/agents/__init__.py @@ -1,6 +1,6 @@ """Agent implementations for SAP QA framework.""" -from src.agents.agents.base import Agent, AgentRegistry, create_default_agent_registry +from src.agents.agents.base import AgentRegistry, SAPAutomationAgent, create_default_agent_registry from src.agents.agents.echo_agent import EchoAgentSK from src.agents.agents.system_context_agent import SystemContextAgentSK from src.agents.agents.action_executor_agent import ActionExecutorAgent @@ -9,7 +9,7 @@ from src.agents.agents.test_advisor_agent import TestAdvisorAgentSK __all__ = [ - "Agent", + "SAPAutomationAgent", "AgentRegistry", "create_default_agent_registry", "EchoAgentSK", diff --git a/src/agents/agents/action_executor_agent.py b/src/agents/agents/action_executor_agent.py index 8019ccc7..f686881e 100644 --- a/src/agents/agents/action_executor_agent.py +++ b/src/agents/agents/action_executor_agent.py @@ -11,21 +11,16 @@ - Async execution with real-time status updates """ -import json -from typing import Any, Optional, TYPE_CHECKING +from typing import Optional, TYPE_CHECKING from semantic_kernel import Kernel -from semantic_kernel.contents import ChatHistory -from semantic_kernel.filters import FilterTypes -from src.agents.models.chat import ChatMessage, ChatResponse -from src.agents.agents.base import Agent +from src.agents.agents.base import SAPAutomationAgent from src.agents.workspace.workspace_store import WorkspaceStore from src.agents.plugins.execution import ExecutionPlugin from src.agents.plugins.workspace import WorkspacePlugin from src.agents.plugins.ssh import SSHPlugin -from src.agents.models.reasoning import sanitize_snapshot -from src.agents.execution import GuardLayer, GuardFilter +from src.agents.execution import GuardLayer from src.agents.observability import get_logger from src.agents.prompts import ACTION_EXECUTOR_SYSTEM_PROMPT @@ -35,12 +30,12 @@ logger = get_logger(__name__) -class ActionExecutorAgent(Agent): +class ActionExecutorAgent(SAPAutomationAgent): """Agent for executing SAP QA actions with strong safety and environment gating. Uses Semantic Kernel with: - ExecutionPlugin: Provides test execution tools as SK functions - - GuardFilter: Intercepts function calls to enforce safety constraints + - Kernel-level approval filters for safety constraints Supports two execution modes: 1. Synchronous (blocking): For quick tests or when streaming not needed @@ -57,8 +52,7 @@ def __init__( ): """Initialize ActionExecutorAgent. - Registers ExecutionPlugin with Semantic Kernel and adds GuardFilter - for safety enforcement. + Registers ExecutionPlugin with Semantic Kernel agents. :param kernel: Semantic Kernel instance :type kernel: Kernel @@ -71,13 +65,6 @@ def __init__( :param job_worker: Optional JobWorker for background execution :type job_worker: Optional[JobWorker] """ - super().__init__( - name="action_executor", - description="Executes SAP QA actions, runs playbooks, performs configuration checks, " - + "and runs functional tests (HA, crash, failover) using Ansible. " - + "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action.", - ) - self.kernel = kernel self.workspace_store = workspace_store self.execution_plugin = execution_plugin @@ -90,102 +77,28 @@ def __init__( workspace_store=workspace_store, ) - self._safe_add_plugin(execution_plugin, "execution") - self._safe_add_plugin(WorkspacePlugin(workspace_store), "workspace") - self._safe_add_plugin(SSHPlugin(), "ssh") + plugins: list[object] = [ + execution_plugin, + WorkspacePlugin(workspace_store), + SSHPlugin(), + ] if getattr(execution_plugin, "keyvault_plugin", None) is not None: - self._safe_add_plugin(execution_plugin.keyvault_plugin, "keyvault") - - guard_filter = GuardFilter(self.guard_layer) - self.kernel.add_filter( - filter_type=FilterTypes.FUNCTION_INVOCATION, - filter=guard_filter.on_function_invocation, + plugins.append(execution_plugin.keyvault_plugin) + super().__init__( + name="action_executor", + description="Executes SAP QA actions, runs playbooks, performs configuration checks, " + + "and runs functional tests (HA, crash, failover) using Ansible. " + + "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action.", + kernel=kernel, + instructions=ACTION_EXECUTOR_SYSTEM_PROMPT, + plugins=plugins, ) logger.info( - f"ActionExecutorAgent initialized with SK plugin and guard filter " + f"ActionExecutorAgent initialized with SK plugins " f"(async_enabled={self._async_enabled})" ) - def _safe_add_plugin(self, plugin: object, plugin_name: str) -> None: - """Add an SK plugin if not already present. - - Semantic Kernel plugin registration can vary depending on how the runtime - constructs kernels/agents. This keeps agent capabilities consistent. - """ - try: - self.kernel.add_plugin( - plugin=plugin, - plugin_name=plugin_name, - ) - except Exception as e: - logger.info(f"Plugin '{plugin_name}' already registered or unavailable: {e}") - - async def _run_agentic(self, messages: list[ChatMessage], context: dict) -> ChatResponse: - """Run an agentic LLM+tools loop. - - The LLM decides which tools to call (execution/workspace/ssh/keyvault), guarded by - GuardFilter. The assistant message is the final synthesized answer. - """ - - self.tracer.step( - "execution_planning", - "inference", - "Running agentic tool loop", - input_snapshot=sanitize_snapshot( - { - "message_count": len(messages), - "has_agent_input": "agent_input" in context, - } - ), - ) - - chat_history = ChatHistory() - chat_history.add_system_message(ACTION_EXECUTOR_SYSTEM_PROMPT) - - agent_input = context.get("agent_input") if isinstance(context, dict) else None - if isinstance(agent_input, dict) and agent_input: - chat_history.add_system_message( - "CONTEXT (use to resolve workspace/SID, do not expose verbatim):\n" - + json.dumps(agent_input, ensure_ascii=False) - ) - - for msg in messages: - if msg.role == "user": - chat_history.add_user_message(msg.content) - elif msg.role == "assistant": - chat_history.add_assistant_message(msg.content) - - chat_service = self.kernel.get_service(service_id="azure_openai_chat") - execution_settings = chat_service.get_prompt_execution_settings_class()( - function_choice_behavior="auto", - max_completion_tokens=1200, - ) - - response = await chat_service.get_chat_message_content( - chat_history=chat_history, - settings=execution_settings, - kernel=self.kernel, - ) - - content = str(response.content) if response and getattr(response, "content", None) else "" - content = content.strip() - if not content: - content = "I couldn't produce a response. Please try again." - - self.tracer.step( - "response_generation", - "decision", - "Generated final answer from tool loop", - output_snapshot=sanitize_snapshot({"response_length": len(content)}), - ) - - return ChatResponse( - messages=[ChatMessage(role="assistant", content=content)], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - async def execute_async( self, workspace_id: str, @@ -280,190 +193,3 @@ def get_active_jobs_for_user(self, user_id: str) -> list["ExecutionJob"]: if not self.job_store: return [] return self.job_store.get_active_jobs(user_id) - - async def run( - self, - messages: list[ChatMessage], - context: Optional[dict] = None, - ) -> ChatResponse: - """Handle structured execution requests (Agent interface implementation). - - Primary mode: agentic LLM+tools loop (model chooses tools; GuardFilter enforces safety). - Optional mode: async execution/job status queries if enabled. - - :param messages: Chat messages (used for logging/context) - :type messages: list[ChatMessage] - :param context: Context with execution parameters - :type context: Optional[dict] - :returns: ChatResponse with execution summary or job info - :rtype: ChatResponse - """ - self.tracer.start() - try: - context = context or {} - if "async_execution" in context and self._async_enabled: - return await self._run_async(messages, context) - if "job_status_query" in context: - return await self._handle_job_status_query(context) - - return await self._run_agentic(messages, context) - - except Exception as e: - logger.error(f"Error in ActionExecutorAgent.run: {e}") - - self.tracer.step( - "execution_run", - "inference", - f"Error during test execution: {str(e)}", - error=str(e), - output_snapshot=sanitize_snapshot({"error_type": type(e).__name__}), - ) - - raise - - finally: - self.tracer.finish() - - async def _run_async( - self, - messages: list[ChatMessage], - context: dict, - ) -> ChatResponse: - """Handle async execution request. - - :param messages: Chat messages - :type messages: list[ChatMessage] - :param context: Context with async_execution params - :type context: dict - :returns: ChatResponse with job info - :rtype: ChatResponse - """ - async_params = context["async_execution"] - workspace_id = async_params["workspace_id"] - test_ids = async_params.get("test_ids", []) - test_group = async_params.get("test_group", "CONFIG_CHECKS") - conversation_id = context.get("conversation_id") - user_id = context.get("user_id") - - self.tracer.step( - "execution_async", - "tool_call", - f"Starting async execution for {len(test_ids)} tests", - input_snapshot=sanitize_snapshot( - { - "workspace_id": workspace_id, - "test_count": len(test_ids), - "test_group": test_group, - } - ), - ) - - try: - job = await self.execute_async( - workspace_id=workspace_id, - test_ids=test_ids, - test_group=test_group, - conversation_id=conversation_id, - user_id=user_id, - ) - - test_list = ", ".join(test_ids[:3]) - if len(test_ids) > 3: - test_list += f" and {len(test_ids) - 3} more" - - response_content = ( - f"**Starting test execution**\n\n" - f"- **Workspace**: `{workspace_id}`\n" - f"- **Tests**: {test_list}\n" - f"- **Job ID**: `{job.id}`\n\n" - f"I'll provide real-time updates as the tests progress..." - ) - - self.tracer.step( - "execution_async", - "decision", - f"Job {job.id} submitted for execution", - output_snapshot=sanitize_snapshot( - { - "job_id": str(job.id), - "status": job.status.value, - } - ), - ) - - return ChatResponse( - messages=[ChatMessage(role="assistant", content=response_content)], - reasoning_trace=self.tracer.get_trace(), - metadata={"job_id": str(job.id), "streaming": True}, - ) - - except Exception as e: - logger.error(f"Failed to start async execution: {e}") - return ChatResponse( - messages=[ChatMessage(role="assistant", content=str(e))], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - - async def _handle_job_status_query(self, context: dict) -> ChatResponse: - """Handle job status query. - - :param context: Context with job_status_query params - :type context: dict - :returns: ChatResponse with job status - :rtype: ChatResponse - """ - query = context["job_status_query"] - job_id = query.get("job_id") - user_id = query.get("user_id") - - if job_id: - job = self.get_job_status(job_id) - if job: - from src.agents.execution.worker import JobEventEmitter - - summary = JobEventEmitter.format_job_summary(job) - return ChatResponse( - messages=[ChatMessage(role="assistant", content=summary)], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - else: - return ChatResponse( - messages=[ChatMessage(role="assistant", content=f"Job `{job_id}` not found.")], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - elif user_id: - jobs = self.get_active_jobs_for_user(user_id) - if jobs: - from src.agents.execution.worker import JobEventEmitter - - lines = ["**Your Active Jobs:**\n"] - for job in jobs: - lines.append(JobEventEmitter.format_job_summary(job)) - lines.append("") - return ChatResponse( - messages=[ChatMessage(role="assistant", content="\n".join(lines))], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - else: - return ChatResponse( - messages=[ - ChatMessage(role="assistant", content="You have no active test executions.") - ], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - else: - return ChatResponse( - messages=[ - ChatMessage( - role="assistant", - content="Please specify a job ID or ask about your active jobs.", - ) - ], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) diff --git a/src/agents/agents/action_planner_agent.py b/src/agents/agents/action_planner_agent.py index 4641e981..61dd9a3f 100644 --- a/src/agents/agents/action_planner_agent.py +++ b/src/agents/agents/action_planner_agent.py @@ -16,9 +16,7 @@ from semantic_kernel import Kernel -from src.agents.agents.base import BaseSKAgent, TracingPhase -from src.agents.models.chat import ChatMessage, ChatResponse -from src.agents.models.reasoning import sanitize_snapshot +from src.agents.agents.base import SAPAutomationAgent from src.agents.observability import get_logger from src.agents.plugins.action_planner import ActionPlannerPlugin from src.agents.plugins.test import TestPlannerPlugin @@ -29,7 +27,7 @@ logger = get_logger(__name__) -class ActionPlannerAgentSK(BaseSKAgent): +class ActionPlannerAgentSK(SAPAutomationAgent): """Plans work as an ActionPlan (jobs).""" def __init__( @@ -39,6 +37,11 @@ def __init__( action_planner_plugin: Optional[ActionPlannerPlugin] = None, test_planner_plugin: Optional[TestPlannerPlugin] = None, ) -> None: + self.workspace_store = workspace_store + + self.action_planner_plugin = action_planner_plugin or ActionPlannerPlugin() + self.test_planner_plugin = test_planner_plugin or TestPlannerPlugin() + workspace_plugin = WorkspacePlugin(workspace_store) super().__init__( name="action_planner", description=( @@ -46,57 +49,8 @@ def __init__( "Produces jobs but does not execute them." ), kernel=kernel, - system_prompt=ACTION_PLANNER_AGENT_SYSTEM_PROMPT, + instructions=ACTION_PLANNER_AGENT_SYSTEM_PROMPT, + plugins=[self.action_planner_plugin, self.test_planner_plugin, workspace_plugin], ) - self.workspace_store = workspace_store - - self.action_planner_plugin = action_planner_plugin or ActionPlannerPlugin() - self._safe_add_plugin(self.action_planner_plugin, "ActionPlannerPlugin") - - self.test_planner_plugin = test_planner_plugin or TestPlannerPlugin() - self._safe_add_plugin(self.test_planner_plugin, "TestPlannerPlugin") - - self._safe_add_plugin(WorkspacePlugin(workspace_store), "workspace") - logger.info("ActionPlannerAgentSK initialized") - - def _safe_add_plugin(self, plugin: object, plugin_name: str) -> None: - try: - self.kernel.add_plugin(plugin=plugin, plugin_name=plugin_name) - except Exception as e: - logger.info(f"Plugin '{plugin_name}' already registered or unavailable: {e}") - - def _get_tracing_phase(self) -> TracingPhase: - return "execution_planning" - - def _process_response( - self, - response_content: str, - context: Optional[dict] = None, - ) -> ChatResponse: - action_plan = None - - if self.action_planner_plugin._last_generated_plan: - action_plan = self.action_planner_plugin._last_generated_plan - action_plan_dict = action_plan.model_dump() - self.tracer.step( - "execution_planning", - "decision", - "ActionPlan generated", - output_snapshot=sanitize_snapshot( - { - "workspace_id": action_plan_dict.get("workspace_id"), - "intent": action_plan_dict.get("intent"), - "jobs": len(action_plan_dict.get("jobs", [])), - } - ), - ) - self.action_planner_plugin._last_generated_plan = None - - return ChatResponse( - messages=[ChatMessage(role="assistant", content=response_content)], - action_plan=action_plan, - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) diff --git a/src/agents/agents/base.py b/src/agents/agents/base.py index 8db6869a..3ffbf629 100644 --- a/src/agents/agents/base.py +++ b/src/agents/agents/base.py @@ -4,34 +4,19 @@ Agent abstraction for SAP QA backend. This module provides: -- Agent: Abstract base class for all agents -- BaseSKAgent: Base class for Semantic Kernel-powered agents with function calling +- SAPAutomationAgent: Base class for Semantic Kernel ChatCompletionAgent agents - AgentRegistry: Registry for managing and routing to available agents - AgentTracer: Wrapper for ReasoningTracer with agent context """ -from abc import ABC, abstractmethod from pathlib import Path -from typing import Optional -import time from typing import Optional, Literal, Any from semantic_kernel import Kernel -from semantic_kernel.contents import ChatHistory - -from src.agents.models.chat import ChatMessage, ChatResponse -from src.agents.models.reasoning import ( - ReasoningTracer, - TracingPhase, - ReasoningStep, - sanitize_snapshot, -) -from src.agents.observability import ( - get_logger, - AgentContext, - create_agent_event, - LogLevel, -) +from semantic_kernel.agents import Agent, ChatCompletionAgent + +from src.agents.models.reasoning import ReasoningTracer, ReasoningStep, TracingPhase +from src.agents.observability import get_logger logger = get_logger(__name__) @@ -120,262 +105,26 @@ def finish(self) -> None: self._tracer = None -class Agent(ABC): - """ - Abstract base class for agents. - - :param ABC: Abstract base class from abc module - :type ABC: ABC - """ - - def __init__(self, name: str, description: str) -> None: - """Initialize agent with name and description. - - :param name: Unique identifier for the agent - :param description: Human-readable description of agent capabilities - """ - self.name = name - self.description = description - self.tracer = AgentTracer(agent_name=name) - - @abstractmethod - async def run( - self, - messages: list[ChatMessage], - context: Optional[dict] = None, - ) -> ChatResponse: - """Execute agent logic and return response. - - :param messages: Full conversation history - :param context: Optional metadata (user, session, etc.) - :returns: ChatResponse with agent's reply - """ - pass - - -class BaseSKAgent(Agent): - """ - Base class for Semantic Kernel-powered agents with function calling. - - This class handles the common SK boilerplate: - - ChatHistory setup with system prompt - - SK chat completion with function calling - - Structured error handling and tracing - - Configurable model deployment per agent - - Subclasses should: - - Call super().__init__() with name, description, kernel, system_prompt - - Register plugins in __init__ after calling super() - - Override _get_tracing_phase() to return the phase name for tracing - - Optionally override _process_response() for custom response handling - """ +class SAPAutomationAgent(ChatCompletionAgent): + """Base class for SAP automation agents using Semantic Kernel's ChatCompletionAgent.""" def __init__( self, + *, name: str, description: str, kernel: Kernel, - system_prompt: str, - max_tokens: int = 2000, - service_id: str = "azure_openai_chat", + instructions: str, + plugins: Optional[list[object]] = None, ) -> None: - """Initialize BaseSKAgent with Semantic Kernel. - - :param name: Unique identifier for the agent - :type name: str - :param description: Human-readable description of agent capabilities - :type description: str - :param kernel: Configured Semantic Kernel instance - :type kernel: Kernel - :param system_prompt: System prompt for the agent - :type system_prompt: str - :param max_tokens: Maximum tokens for completion (default: 2000) - :type max_tokens: int - :param service_id: SK service ID for chat completion (default: azure_openai_chat) - :type service_id: str - """ - super().__init__(name=name, description=description) - self.kernel = kernel - self.system_prompt = system_prompt - self.max_tokens = max_tokens - self.service_id = service_id - - logger.info(f"{self.__class__.__name__} initialized with SK service '{service_id}'") - - def _get_tracing_phase(self) -> TracingPhase: - """Return the primary tracing phase for this agent. - - Override in subclasses to provide a specific phase name. - - :returns: Phase name for tracing (e.g., 'documentation_retrieval', 'workspace_resolution') - :rtype: TracingPhase - """ - return "response_generation" - - def _build_chat_history(self, messages: list[ChatMessage]) -> ChatHistory: - """Build SK ChatHistory from conversation messages. - - :param messages: List of ChatMessage objects from the conversation - :type messages: list[ChatMessage] - :returns: Configured ChatHistory with system prompt and messages - :rtype: ChatHistory - """ - chat_history = ChatHistory() - chat_history.add_system_message(self.system_prompt) - - for msg in messages: - if msg.role == "user": - chat_history.add_user_message(msg.content) - elif msg.role == "assistant": - chat_history.add_assistant_message(msg.content) - - return chat_history - - async def _get_sk_response(self, chat_history: ChatHistory) -> str: - """Execute SK chat completion with function calling. - - :param chat_history: Prepared ChatHistory - :type chat_history: ChatHistory - :returns: Response content string - :rtype: str - """ - chat_service = self.kernel.get_service(service_id=self.service_id) - execution_settings = chat_service.get_prompt_execution_settings_class()( - function_choice_behavior="auto", - max_completion_tokens=self.max_tokens, + super().__init__( + name=name, + description=description, + kernel=kernel, + instructions=instructions, + plugins=plugins, ) - - logger.info(f"Calling SK chat completion for {self.name}") - response = await chat_service.get_chat_message_content( - chat_history=chat_history, - settings=execution_settings, - kernel=self.kernel, - ) - - return str(response.content) if response and response.content else "" - - def _process_response( - self, - response_content: str, - context: Optional[dict] = None, - ) -> ChatResponse: - """Process SK response into ChatResponse. - - Override in subclasses for custom response handling (e.g., extracting test plans). - - :param response_content: Raw response content from SK - :type response_content: str - :param context: Optional context dictionary - :type context: Optional[dict] - :returns: Formatted ChatResponse - :rtype: ChatResponse - """ - return ChatResponse( - messages=[ChatMessage(role="assistant", content=response_content)], - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) - - async def run( - self, - messages: list[ChatMessage], - context: Optional[dict] = None, - ) -> ChatResponse: - """Execute the agent using Semantic Kernel with function calling. - - This method handles the common SK workflow: - 1. Build ChatHistory with system prompt - 2. Execute SK chat completion with function calling - 3. Process and return response - - :param messages: List of ChatMessage objects from the conversation - :type messages: list[ChatMessage] - :param context: Optional context dictionary - :type context: Optional[dict] - :returns: ChatResponse with the agent's response - :rtype: ChatResponse - """ - workspace_id = context.get("workspace_id") if context else None - start_time = time.perf_counter() - with AgentContext(agent_name=self.name, workspace_id=workspace_id): - logger.info(f"{self.__class__.__name__}.run called with {len(messages)} messages") - logger.event( - create_agent_event( - event="agent_start", - phase=self._get_tracing_phase(), - ) - ) - - self.tracer.start() - - try: - self.tracer.step( - self._get_tracing_phase(), - "tool_call", - f"Processing request with SK for {self.name}", - input_snapshot=sanitize_snapshot( - {"message_count": len(messages), "has_context": context is not None} - ), - ) - - chat_history = self._build_chat_history(messages) - logger.info(f"Chat history prepared with {len(chat_history.messages)} messages") - - response_content = await self._get_sk_response(chat_history) - - self.tracer.step( - "response_generation", - "inference", - f"Generated response for {self.name}", - output_snapshot=sanitize_snapshot( - { - "response_length": len(response_content), - "has_content": bool(response_content), - } - ), - ) - - duration_ms = int((time.perf_counter() - start_time) * 1000) - logger.event( - create_agent_event( - event="agent_end", - status="success", - duration_ms=duration_ms, - phase=self._get_tracing_phase(), - ) - ) - - return self._process_response(response_content, context) - - except Exception as e: - duration_ms = int((time.perf_counter() - start_time) * 1000) - - logger.error( - f"Error in {self.__class__.__name__}: {type(e).__name__}: {e}", - exc_info=e, - ) - logger.event( - create_agent_event( - event="agent_end", - level=LogLevel.ERROR, - status="error", - duration_ms=duration_ms, - error=str(e), - phase=self._get_tracing_phase(), - ) - ) - - self.tracer.step( - self._get_tracing_phase(), - "inference", - f"Error in {self.name}: {str(e)}", - error=str(e), - output_snapshot=sanitize_snapshot({"error_type": type(e).__name__}), - ) - raise - - finally: - self.tracer.finish() + logger.info(f"{self.__class__.__name__} initialized with Semantic Kernel agents framework") class AgentRegistry: @@ -413,6 +162,10 @@ def list_agents(self) -> list[dict[str, str]]: for agent in self._agents.values() ] + def all_agents(self) -> list[Agent]: + """Return all registered agent instances.""" + return list(self._agents.values()) + def __contains__(self, name: str) -> bool: """Check if agent is registered. @@ -438,7 +191,6 @@ def create_default_agent_registry(kernel: Optional[Kernel] = None) -> "AgentRegi from src.agents.workspace.workspace_store import WorkspaceStore from src.agents.plugins.execution import ExecutionPlugin from src.agents.plugins.keyvault import KeyVaultPlugin - from src.agents.plugins.ssh import SSHPlugin from src.agents.ansible_runner import AnsibleRunner from src.agents.sk_kernel import create_kernel @@ -448,16 +200,12 @@ def create_default_agent_registry(kernel: Optional[Kernel] = None) -> "AgentRegi src_dir = Path(__file__).parent.parent.parent ansible_runner = AnsibleRunner(base_dir=src_dir) keyvault_plugin = KeyVaultPlugin() - ssh_plugin = SSHPlugin() execution_plugin = ExecutionPlugin( workspace_store=workspace_store, ansible_runner=ansible_runner, keyvault_plugin=keyvault_plugin, ) - kernel.add_plugin(execution_plugin, plugin_name="execution") - kernel.add_plugin(keyvault_plugin, plugin_name="keyvault") - kernel.add_plugin(ssh_plugin, plugin_name="ssh") registry = AgentRegistry() registry.register(EchoAgentSK(kernel=kernel)) diff --git a/src/agents/agents/echo_agent.py b/src/agents/agents/echo_agent.py index 8e801dd2..667010cc 100644 --- a/src/agents/agents/echo_agent.py +++ b/src/agents/agents/echo_agent.py @@ -9,7 +9,7 @@ from semantic_kernel import Kernel -from src.agents.agents.base import BaseSKAgent, TracingPhase +from src.agents.agents.base import SAPAutomationAgent from src.agents.plugins.documentation import DocumentationPlugin from src.agents.prompts import ECHO_AGENT_SK_SYSTEM_PROMPT from src.agents.observability import get_logger @@ -17,7 +17,7 @@ logger = get_logger(__name__) -class EchoAgentSK(BaseSKAgent): +class EchoAgentSK(SAPAutomationAgent): """Agent for providing documentation-based help using Semantic Kernel. This agent uses SK's native function calling to interact with documentation, @@ -30,6 +30,7 @@ def __init__(self, kernel: Kernel) -> None: :param kernel: Configured Semantic Kernel instance :type kernel: Kernel """ + documentation_plugin = DocumentationPlugin() super().__init__( name="echo", description=( @@ -38,14 +39,8 @@ def __init__(self, kernel: Kernel) -> None: "NEVER use this agent for executing tests or running commands." ), kernel=kernel, - system_prompt=ECHO_AGENT_SK_SYSTEM_PROMPT, + instructions=ECHO_AGENT_SK_SYSTEM_PROMPT, + plugins=[documentation_plugin], ) - documentation_plugin = DocumentationPlugin() - self.kernel.add_plugin(plugin=documentation_plugin, plugin_name="Documentation") - logger.info("EchoAgentSK initialized with Documentation plugin") - - def _get_tracing_phase(self) -> TracingPhase: - """Return documentation_retrieval as the primary tracing phase.""" - return "documentation_retrieval" diff --git a/src/agents/agents/orchestrator.py b/src/agents/agents/orchestrator.py index ce8cd4a5..46aeb783 100644 --- a/src/agents/agents/orchestrator.py +++ b/src/agents/agents/orchestrator.py @@ -1,35 +1,27 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. """ -Orchestrator powered by Semantic Kernel for routing chat requests to agents. +Orchestrator powered by Semantic Kernel AgentGroupChat. -This orchestrator uses Semantic Kernel with AgentRoutingPlugin for intelligent -request routing using function calling. +This orchestrator uses the Semantic Kernel Agents framework for routing +and multi-agent coordination. """ -import json -import time from typing import Optional + from semantic_kernel import Kernel -from semantic_kernel.contents import ChatHistory -from src.agents.models.chat import ChatRequest, ChatResponse, ChatMessage -from src.agents.agents.base import AgentRegistry, AgentTracer -from src.agents.plugins.routing import AgentRoutingPlugin -from src.agents.prompts import ORCHESTRATOR_SK_SYSTEM_PROMPT -from src.agents.models.execution import ExecutionRequest -from src.agents.models.reasoning import sanitize_snapshot -from src.agents.models.streaming import ( - emit_thinking_start, - emit_thinking_step, - emit_thinking_end, -) -from src.agents.observability import get_logger +from semantic_kernel.agents.group_chat.agent_group_chat import AgentGroupChat +from semantic_kernel.agents.strategies.selection import kernel_function_selection_strategy +from semantic_kernel.agents.strategies.termination import default_termination_strategy +from semantic_kernel.contents import ChatHistory, AuthorRole -logger = get_logger(__name__) +from src.agents.models.chat import ChatRequest, ChatResponse, ChatMessage +from src.agents.agents.base import AgentRegistry +from src.agents.models.streaming import emit_thinking_start, emit_thinking_step, emit_thinking_end class OrchestratorSK: - """Routes chat requests to appropriate agents using Semantic Kernel.""" + """Routes chat requests to appropriate agents using AgentGroupChat.""" def __init__(self, registry: AgentRegistry, kernel: Kernel) -> None: """Initialize orchestrator with agent registry and Semantic Kernel. @@ -41,10 +33,52 @@ def __init__(self, registry: AgentRegistry, kernel: Kernel) -> None: """ self.registry = registry self.kernel = kernel - self.tracer = AgentTracer(agent_name="orchestrator") - routing_plugin = AgentRoutingPlugin(registry) - self.kernel.add_plugin(plugin=routing_plugin, plugin_name="AgentRouting") - logger.info("OrchestratorSK initialized with Semantic Kernel and AgentRouting plugin") + self.selection_strategy = self._build_selection_strategy() + self.termination_strategy = default_termination_strategy.DefaultTerminationStrategy( + maximum_iterations=10 + ) + logger.info("OrchestratorSK initialized with Semantic Kernel AgentGroupChat") + + def _build_selection_strategy(self) -> kernel_function_selection_strategy.KernelFunctionSelectionStrategy: + agents = self.registry.all_agents() + agent_summaries = [f"- {agent.name}: {agent.description}" for agent in agents] + available_names = {agent.name for agent in agents} + prompt = ( + "You are selecting the best agent to handle the user's latest request.\n" + "Choose ONLY from the available agent names listed below.\n\n" + "Available agents:\n" + + "\n".join(agent_summaries) + + "\n\nConversation history:\n{{$history}}\n\n" + "Available agent names (comma-separated): {{$agent}}\n\n" + "Return ONLY the agent name that should respond next." + ) + + selection_function = self.kernel.add_function( + plugin_name="agent_selection", + function_name="select_next_agent", + prompt=prompt, + description="Select the next agent to respond based on conversation history.", + ) + + def _parse_selection_result(result) -> str: + if result is None: + return next(iter(available_names)) + content = str(result.value if hasattr(result, "value") else result).strip() + if content in available_names: + return content + first_token = content.split()[0] if content else "" + if first_token in available_names: + return first_token + for name in available_names: + if name.lower() in content.lower(): + return name + return next(iter(available_names)) + + return kernel_function_selection_strategy.KernelFunctionSelectionStrategy( + kernel=self.kernel, + function=selection_function, + result_parser=_parse_selection_result, + ) async def handle_chat( self, @@ -53,8 +87,6 @@ async def handle_chat( ) -> ChatResponse: """Route chat request to appropriate agents and return response. - Supports multi-agent workflows by looping until a final answer is produced. - :param request: ChatRequest with conversation history :type request: ChatRequest :param context: Optional metadata for agent execution @@ -62,183 +94,52 @@ async def handle_chat( :returns: ChatResponse from the selected agent(s) :rtype: ChatResponse """ - from src.agents.models.streaming import emit_thinking_start - await emit_thinking_start() - self.tracer.start() - agent_chain = ["orchestrator"] - - parent_step = self.tracer.step( - "routing", - "inference", - "Starting multi-agent orchestration loop", - input_snapshot=sanitize_snapshot({"message_count": len(request.messages)}), - ) - parent_step_id = parent_step.id if parent_step else None - combined_trace = self.tracer.get_trace() or {"steps": []} - + agent_chain: list[str] = [] chat_history = ChatHistory() - chat_history.add_system_message(ORCHESTRATOR_SK_SYSTEM_PROMPT) for msg in request.messages: if msg.role == "user": chat_history.add_user_message(msg.content) elif msg.role == "assistant": chat_history.add_assistant_message(msg.content) - - chat_service = self.kernel.get_service(service_id="azure_openai_chat") - execution_settings = chat_service.get_prompt_execution_settings_class()( - function_choice_behavior="auto", - max_completion_tokens=1000, + elif msg.role == "system": + chat_history.add_system_message(msg.content) + + group_chat = AgentGroupChat( + agents=self.registry.all_agents(), + selection_strategy=self.selection_strategy, + termination_strategy=self.termination_strategy, + chat_history=chat_history, ) - max_iterations = 10 final_content = "" iteration_count = 0 - last_history_len = len(chat_history.messages) - for i in range(max_iterations): - iteration_count = i + 1 - logger.info(f"Orchestration iteration {iteration_count}/{max_iterations}") - - response = await chat_service.get_chat_message_content( - chat_history=chat_history, - settings=execution_settings, - kernel=self.kernel, + async for message in group_chat.invoke(): + if message.role != AuthorRole.ASSISTANT: + continue + iteration_count += 1 + final_content = message.content or "" + agent_name = message.name or "assistant" + agent_chain.append(agent_name) + await emit_thinking_step( + agent=agent_name, + action=f"{agent_name} responded", + status="complete", ) - new_messages = chat_history.messages[last_history_len:] - routing_decision = self._extract_routing_decision(response, new_messages) - last_history_len = len(chat_history.messages) - - if routing_decision: - agent_name = routing_decision.get("agent_name") - agent_input = routing_decision.get("agent_input", {}) - - if agent_name: - agent = self.registry.get(agent_name) - if agent: - logger.info(f"Executing agent: {agent_name}") - agent_chain.append(agent_name) - agent_step_id = await emit_thinking_step( - agent=agent_name, - action=f"Agent {agent_name} is working...", - status="in_progress", - parent_step_id=parent_step_id, - ) - - agent_start = time.time() - agent_context = (context or {}).copy() - agent_context["agent_input"] = agent_input - - agent_response = await agent.run( - messages=request.messages, context=agent_context - ) - - agent_duration = int((time.time() - agent_start) * 1000) - await emit_thinking_step( - agent=agent_name, - action=f"Agent {agent_name} completed", - status="complete", - step_id=agent_step_id, - duration_ms=agent_duration, - ) - if agent_response.reasoning_trace: - steps = agent_response.reasoning_trace.get("steps", []) - for step in steps: - if isinstance(step, dict): - if "parent_step_id" not in step or not step["parent_step_id"]: - step["parent_step_id"] = parent_step_id - elif hasattr(step, "parent_step_id"): - if not step.parent_step_id: - step.parent_step_id = parent_step_id - combined_trace["steps"].extend(steps) - agent_content = agent_response.messages[-1].content - chat_history.add_system_message( - f"OUTPUT FROM AGENT {agent_name}:\n{agent_content}" - ) - - continue - else: - logger.warning(f"Agent {agent_name} not found in registry") - else: - logger.warning("No agent_name specified in routing decision") - - if response and response.content: - final_content = str(response.content) - break - if not final_content: - final_content = "I've consulted the specialized agents but couldn't produce a final summary. Please try rephrasing your request." - - final_step = self.tracer.step( - "response_generation", - "inference", - "Orchestration complete", - output_snapshot=sanitize_snapshot({"final_content": final_content}), - parent_step_id=parent_step_id, - ) - if final_step: - combined_trace["steps"].append(final_step.model_dump()) + final_content = ( + "I've consulted the specialized agents but couldn't produce a final summary. " + "Please try rephrasing your request." + ) await emit_thinking_end() - self.tracer.finish() return ChatResponse( messages=[ChatMessage(role="assistant", content=final_content)], agent_chain=agent_chain, - reasoning_trace=combined_trace, + reasoning_trace=None, metadata={"iterations": iteration_count}, ) - - def _extract_routing_decision(self, response, new_messages) -> Optional[dict]: - """Helper to extract routing JSON from SK response or new messages.""" - routing_json = None - - logger.info("Extracting routing decision from response and new messages") - - if response and hasattr(response, "items"): - for item in response.items: - item_type = type(item).__name__ - if item_type == "FunctionResultContent": - result_str = str(item.result) if hasattr(item, "result") else str(item) - if "agent_name" in result_str: - routing_json = result_str - logger.info(f"Found routing JSON in response item: {routing_json}") - break - - if not routing_json: - for msg in reversed(new_messages): - if hasattr(msg, "items"): - for item in msg.items: - item_type = type(item).__name__ - if item_type == "FunctionResultContent": - result_str = str(item.result) if hasattr(item, "result") else str(item) - if "agent_name" in result_str: - routing_json = result_str - logger.info( - f"Found routing JSON in new message item: {routing_json}" - ) - break - if routing_json: - break - - msg_str = str(msg) - if "agent_name" in msg_str and "{" in msg_str: - start = msg_str.find("{") - end = msg_str.rfind("}") + 1 - if start >= 0 and end > start: - routing_json = msg_str[start:end] - logger.info(f"Found routing JSON in new message string: {routing_json}") - break - - if routing_json: - try: - decision = json.loads(routing_json) - if isinstance(decision, dict) and "agent_name" in decision: - logger.info(f"Successfully extracted routing decision: {decision}") - return decision - except Exception as e: - logger.error(f"Failed to parse routing JSON: {e}") - - return None diff --git a/src/agents/agents/system_context_agent.py b/src/agents/agents/system_context_agent.py index 70f28284..74f924d3 100644 --- a/src/agents/agents/system_context_agent.py +++ b/src/agents/agents/system_context_agent.py @@ -9,7 +9,7 @@ from semantic_kernel import Kernel -from src.agents.agents.base import BaseSKAgent, TracingPhase +from src.agents.agents.base import SAPAutomationAgent from src.agents.workspace.workspace_store import WorkspaceStore from src.agents.plugins.workspace import WorkspacePlugin from src.agents.prompts import SYSTEM_CONTEXT_AGENT_SYSTEM_PROMPT @@ -18,7 +18,7 @@ logger = get_logger(__name__) -class SystemContextAgentSK(BaseSKAgent): +class SystemContextAgentSK(SAPAutomationAgent): """Agent for managing SAP QA system workspaces using Semantic Kernel. This agent uses SK's native function calling to interact with workspaces, @@ -33,6 +33,8 @@ def __init__(self, kernel: Kernel, workspace_store: WorkspaceStore) -> None: :param workspace_store: WorkspaceStore instance for managing workspaces :type workspace_store: WorkspaceStore """ + self.workspace_store = workspace_store + workspace_plugin = WorkspacePlugin(workspace_store) super().__init__( name="system_context", description=( @@ -41,15 +43,8 @@ def __init__(self, kernel: Kernel, workspace_store: WorkspaceStore) -> None: "get workspace details, or create a new workspace for testing." ), kernel=kernel, - system_prompt=SYSTEM_CONTEXT_AGENT_SYSTEM_PROMPT, + instructions=SYSTEM_CONTEXT_AGENT_SYSTEM_PROMPT, + plugins=[workspace_plugin], ) - self.workspace_store = workspace_store - workspace_plugin = WorkspacePlugin(workspace_store) - self.kernel.add_plugin(plugin=workspace_plugin, plugin_name="Workspace") - logger.info("SystemContextAgentSK initialized with Workspace plugin") - - def _get_tracing_phase(self) -> TracingPhase: - """Return workspace_resolution as the primary tracing phase.""" - return "workspace_resolution" diff --git a/src/agents/agents/test_advisor_agent.py b/src/agents/agents/test_advisor_agent.py index ac60c416..99fd30ac 100644 --- a/src/agents/agents/test_advisor_agent.py +++ b/src/agents/agents/test_advisor_agent.py @@ -8,13 +8,9 @@ from __future__ import annotations -from typing import Optional - from semantic_kernel import Kernel -from src.agents.agents.base import BaseSKAgent, TracingPhase -from src.agents.models.chat import ChatMessage, ChatResponse -from src.agents.models.reasoning import sanitize_snapshot +from src.agents.agents.base import SAPAutomationAgent from src.agents.observability import get_logger from src.agents.plugins.test import TestPlannerPlugin from src.agents.prompts import TEST_ADVISOR_AGENT_SYSTEM_PROMPT @@ -23,10 +19,13 @@ logger = get_logger(__name__) -class TestAdvisorAgentSK(BaseSKAgent): +class TestAdvisorAgentSK(SAPAutomationAgent): """Recommends tests and generates TestPlan (no execution jobs).""" def __init__(self, kernel: Kernel, workspace_store: WorkspaceStore): + self.workspace_store = workspace_store + + self.test_planner_plugin = TestPlannerPlugin() super().__init__( name="test_advisor", description=( @@ -34,47 +33,8 @@ def __init__(self, kernel: Kernel, workspace_store: WorkspaceStore): "Does not execute and does not generate ActionPlan jobs." ), kernel=kernel, - system_prompt=TEST_ADVISOR_AGENT_SYSTEM_PROMPT, + instructions=TEST_ADVISOR_AGENT_SYSTEM_PROMPT, + plugins=[self.test_planner_plugin], ) - self.workspace_store = workspace_store - - self.test_planner_plugin = TestPlannerPlugin() - self.kernel.add_plugin(plugin=self.test_planner_plugin, plugin_name="TestPlannerPlugin") - logger.info("TestAdvisorAgentSK initialized") - - def _get_tracing_phase(self) -> TracingPhase: - return "test_selection" - - def _process_response( - self, - response_content: str, - context: Optional[dict] = None, - ) -> ChatResponse: - test_plan = None - - if self.test_planner_plugin._last_generated_plan: - test_plan = self.test_planner_plugin._last_generated_plan - test_plan_dict = test_plan.model_dump() - self.tracer.step( - "test_selection", - "decision", - "Test plan generated successfully", - output_snapshot=sanitize_snapshot( - { - "workspace_id": test_plan_dict.get("workspace_id"), - "total_tests": test_plan_dict.get("total_tests"), - "safe_tests": len(test_plan_dict.get("safe_tests", [])), - "destructive_tests": len(test_plan_dict.get("destructive_tests", [])), - } - ), - ) - self.test_planner_plugin._last_generated_plan = None - - return ChatResponse( - messages=[ChatMessage(role="assistant", content=response_content)], - test_plan=test_plan, - reasoning_trace=self.tracer.get_trace(), - metadata=None, - ) diff --git a/src/agents/filters/__init__.py b/src/agents/filters/__init__.py new file mode 100644 index 00000000..f6127e53 --- /dev/null +++ b/src/agents/filters/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Filters for Semantic Kernel execution.""" + +from src.agents.filters.approval_filter import ApprovalFilter + +__all__ = ["ApprovalFilter"] diff --git a/src/agents/filters/approval_filter.py b/src/agents/filters/approval_filter.py new file mode 100644 index 00000000..1734af98 --- /dev/null +++ b/src/agents/filters/approval_filter.py @@ -0,0 +1,135 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Kernel-level approval filter for execution safety. + +This filter blocks hallucinated or invalid execution requests before any +Ansible or SSH action is invoked. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Callable + +import yaml +from semantic_kernel.filters.functions.function_invocation_context import ( + FunctionInvocationContext, +) +from semantic_kernel.functions.function_result import FunctionResult + +from src.agents.constants import TEST_GROUP_PLAYBOOKS +from src.agents.observability import get_logger +from src.agents.plugins.command_validator import validate_readonly_command +from src.agents.workspace.workspace_store import WorkspaceStore + +logger = get_logger(__name__) + + +class ApprovalFilter: + """Kernel filter that validates execution requests before tool invocation.""" + + def __init__(self, test_catalog_path: str = "src/vars/input-api.yaml") -> None: + self.test_catalog_path = Path(test_catalog_path) + self.test_catalog = self._load_test_catalog() + workspace_root = Path(__file__).resolve().parents[3] / "WORKSPACES" / "SYSTEM" + self.workspace_store = WorkspaceStore(workspace_root) + + def _load_test_catalog(self) -> dict[str, dict[str, dict[str, bool]]]: + if not self.test_catalog_path.exists(): + logger.warning( + f"Test catalog not found at {self.test_catalog_path}. " + "Execution approvals will be limited to known playbook groups." + ) + return {} + + try: + with open(self.test_catalog_path, "r") as handle: + config = yaml.safe_load(handle) or {} + except Exception as exc: + logger.warning(f"Failed to load test catalog: {exc}") + return {} + + catalog: dict[str, dict[str, dict[str, bool]]] = {} + for group in config.get("test_groups", []): + group_name = group.get("name") + if not group_name: + continue + tests = {} + for test_case in group.get("test_cases", []): + task_name = test_case.get("task_name") + if not task_name or not test_case.get("enabled", False): + continue + tests[task_name] = { + "destructive": bool(test_case.get("destructive", False)), + } + if tests: + catalog[group_name] = tests + return catalog + + async def on_function_invocation( + self, + context: FunctionInvocationContext, + next: Callable[[FunctionInvocationContext], Any], + ) -> None: + """Validate execution-related tool calls before invocation.""" + function_name = context.function.name + plugin_name = context.function.plugin_name or "" + + if plugin_name != "execution": + await next(context) + return + + if function_name == "run_readonly_command": + command = str(context.arguments.get("command", "")) + try: + validate_readonly_command(command) + except ValueError as exc: + logger.warning(f"ApprovalFilter blocked command: {exc}") + context.result = FunctionResult( + function=context.function.metadata, + value=f"Command rejected by approval filter: {exc}", + ) + return + + if function_name == "run_test_by_id": + test_group = str(context.arguments.get("test_group", "")).strip() + test_id = str(context.arguments.get("test_id", "")).strip() + + if not test_group or test_group not in TEST_GROUP_PLAYBOOKS: + context.result = FunctionResult( + function=context.function.metadata, + value=( + f"Test group '{test_group}' is not recognized. " + f"Valid groups: {', '.join(sorted(TEST_GROUP_PLAYBOOKS.keys()))}." + ), + ) + return + + allowed_tests = self.test_catalog.get(test_group) + if allowed_tests is not None and test_id not in allowed_tests: + context.result = FunctionResult( + function=context.function.metadata, + value=( + f"Test ID '{test_id}' is not approved for group '{test_group}'. " + "Select a test from the configured catalog." + ), + ) + return + + if allowed_tests is not None: + workspace_id = str(context.arguments.get("workspace_id", "")).strip() + workspace = ( + self.workspace_store.get_workspace(workspace_id) if workspace_id else None + ) + if workspace and allowed_tests.get(test_id, {}).get("destructive") and workspace.env == "PRD": + context.result = FunctionResult( + function=context.function.metadata, + value=( + "Destructive tests are not allowed on production workspaces. " + "Choose a non-production workspace for destructive runs." + ), + ) + return + + await next(context) diff --git a/src/agents/prompts.py b/src/agents/prompts.py index 6592deaa..b120ac07 100644 --- a/src/agents/prompts.py +++ b/src/agents/prompts.py @@ -7,34 +7,6 @@ Don't encode workflows in prompts - let the LLM reason. """ -# ============================================================================= -# Orchestrator - Routes to the right agent -# ============================================================================= - -ORCHESTRATOR_SK_SYSTEM_PROMPT = """You are the lead orchestrator for SAP on Azure operations. -Your job is to solve the user's request by coordinating with specialized agents. - -AGENTS: -- route_to_echo(): Documentation, help, general questions, and TECHNICAL BEST PRACTICES. -- route_to_system_context(): Workspace management (create, list, configure). -- route_to_test_advisor(): Test recommendations and planning. -- route_to_action_executor(): Operational work (diagnostics, commands, run tests). - -MULTI-AGENT WORKFLOW: -1. You can call multiple agents in sequence if needed. -2. KNOWLEDGE-FIRST APPROACH: If a user asks for an operational task (e.g., "check cluster status") and the "best" command depends on the environment (OS, SAP version), you should FIRST call `route_to_echo()` to search for the latest Microsoft Learn or framework documentation on the correct syntax and best practices. -3. After receiving the technical guidance from `echo`, pass that context to `action_executor` so it can execute the correct, most up-to-date command autonomously. -4. If an agent asks for information that you believe another agent can provide, call that agent. -5. DO NOT get stuck in a loop. If you have the workspace ID and the technical best practice, route to `action_executor` immediately. -6. MANDATORY AUTONOMY: If an agent asks the user for a technical choice, you MUST instead instruct that agent to "use the documentation provided by the echo agent and the system configuration to pick the best command". - -RULES: -- SILENT ROUTING: When you call an agent tool, do NOT include any conversational text (e.g., "I'm sending this to..."). Just call the tool. -- Extract SID, workspace_id, or env from the message. -- NEVER answer technical questions yourself. Use `route_to_echo()` for latest knowledge. -- FINAL SUMMARY: Once you have the final output from the agents, provide a concise summary to the user. -""" - # ============================================================================= # System Context Agent - Workspace management # ============================================================================= @@ -174,8 +146,8 @@ AUTONOMY & KNOWLEDGE-BASED DECISIONS (CRITICAL): - You are an expert system. Do NOT ask the user for clarification on technical details. -- Use the technical context provided by the Orchestrator (sourced from Echo Agent/Microsoft Learn) to pick the most appropriate command for the environment. -- If you lack specific knowledge for a command (e.g., "how to check cluster status on RHEL 9.2"), report that you need documentation for that specific OS/version. The Orchestrator will then use the Echo Agent to find it for you. +- Use technical context from documentation and workspace configuration to pick the most appropriate command. +- If you lack specific knowledge for a command (e.g., "how to check cluster status on RHEL 9.2"), state that you need documentation for that specific OS/version. - Once you have the documentation and system properties (from sap-parameters.yaml), execute the command immediately. Do NOT present the user with a list of choices. WORKSPACE RESOLUTION (MANDATORY): diff --git a/src/agents/sk_kernel.py b/src/agents/sk_kernel.py index 7d9707c6..d39e68d2 100644 --- a/src/agents/sk_kernel.py +++ b/src/agents/sk_kernel.py @@ -13,8 +13,10 @@ from semantic_kernel import Kernel from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion +from semantic_kernel.filters import FilterTypes from src.agents.observability import get_logger +from src.agents.filters import ApprovalFilter logger = get_logger(__name__) @@ -64,6 +66,13 @@ def create_kernel( ) ) + approval_filter = ApprovalFilter() + kernel.add_filter( + filter_type=FilterTypes.FUNCTION_INVOCATION, + filter=approval_filter.on_function_invocation, + ) + logger.info("Registered kernel approval filter for execution safety") + logger.info("Semantic Kernel created successfully with Azure OpenAI service") return kernel