1111- Async execution with real-time status updates
1212"""
1313
14- import json
15- from typing import Any , Optional , TYPE_CHECKING
14+ from typing import Optional , TYPE_CHECKING
1615
1716from semantic_kernel import Kernel
18- from semantic_kernel .contents import ChatHistory
19- from semantic_kernel .filters import FilterTypes
2017
21- from src .agents .models .chat import ChatMessage , ChatResponse
22- from src .agents .agents .base import Agent
18+ from src .agents .agents .base import SAPAutomationAgent
2319from src .agents .workspace .workspace_store import WorkspaceStore
2420from src .agents .plugins .execution import ExecutionPlugin
2521from src .agents .plugins .workspace import WorkspacePlugin
2622from src .agents .plugins .ssh import SSHPlugin
27- from src .agents .models .reasoning import sanitize_snapshot
28- from src .agents .execution import GuardLayer , GuardFilter
23+ from src .agents .execution import GuardLayer
2924from src .agents .observability import get_logger
3025from src .agents .prompts import ACTION_EXECUTOR_SYSTEM_PROMPT
3126
3530logger = get_logger (__name__ )
3631
3732
38- class ActionExecutorAgent (Agent ):
33+ class ActionExecutorAgent (SAPAutomationAgent ):
3934 """Agent for executing SAP QA actions with strong safety and environment gating.
4035
4136 Uses Semantic Kernel with:
4237 - ExecutionPlugin: Provides test execution tools as SK functions
43- - GuardFilter: Intercepts function calls to enforce safety constraints
38+ - Kernel-level approval filters for safety constraints
4439
4540 Supports two execution modes:
4641 1. Synchronous (blocking): For quick tests or when streaming not needed
@@ -57,8 +52,7 @@ def __init__(
5752 ):
5853 """Initialize ActionExecutorAgent.
5954
60- Registers ExecutionPlugin with Semantic Kernel and adds GuardFilter
61- for safety enforcement.
55+ Registers ExecutionPlugin with Semantic Kernel agents.
6256
6357 :param kernel: Semantic Kernel instance
6458 :type kernel: Kernel
@@ -71,13 +65,6 @@ def __init__(
7165 :param job_worker: Optional JobWorker for background execution
7266 :type job_worker: Optional[JobWorker]
7367 """
74- super ().__init__ (
75- name = "action_executor" ,
76- description = "Executes SAP QA actions, runs playbooks, performs configuration checks, "
77- + "and runs functional tests (HA, crash, failover) using Ansible. "
78- + "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action." ,
79- )
80-
8168 self .kernel = kernel
8269 self .workspace_store = workspace_store
8370 self .execution_plugin = execution_plugin
@@ -90,102 +77,28 @@ def __init__(
9077 workspace_store = workspace_store ,
9178 )
9279
93- self ._safe_add_plugin (execution_plugin , "execution" )
94- self ._safe_add_plugin (WorkspacePlugin (workspace_store ), "workspace" )
95- self ._safe_add_plugin (SSHPlugin (), "ssh" )
80+ plugins : list [object ] = [
81+ execution_plugin ,
82+ WorkspacePlugin (workspace_store ),
83+ SSHPlugin (),
84+ ]
9685 if getattr (execution_plugin , "keyvault_plugin" , None ) is not None :
97- self ._safe_add_plugin (execution_plugin .keyvault_plugin , "keyvault" )
98-
99- guard_filter = GuardFilter (self .guard_layer )
100- self .kernel .add_filter (
101- filter_type = FilterTypes .FUNCTION_INVOCATION ,
102- filter = guard_filter .on_function_invocation ,
86+ plugins .append (execution_plugin .keyvault_plugin )
87+ super ().__init__ (
88+ name = "action_executor" ,
89+ description = "Executes SAP QA actions, runs playbooks, performs configuration checks, "
90+ + "and runs functional tests (HA, crash, failover) using Ansible. "
91+ + "Use this agent whenever the user asks to 'run', 'execute', 'perform', or 'start' a test or action." ,
92+ kernel = kernel ,
93+ instructions = ACTION_EXECUTOR_SYSTEM_PROMPT ,
94+ plugins = plugins ,
10395 )
10496
10597 logger .info (
106- f"ActionExecutorAgent initialized with SK plugin and guard filter "
98+ f"ActionExecutorAgent initialized with SK plugins "
10799 f"(async_enabled={ self ._async_enabled } )"
108100 )
109101
110- def _safe_add_plugin (self , plugin : object , plugin_name : str ) -> None :
111- """Add an SK plugin if not already present.
112-
113- Semantic Kernel plugin registration can vary depending on how the runtime
114- constructs kernels/agents. This keeps agent capabilities consistent.
115- """
116- try :
117- self .kernel .add_plugin (
118- plugin = plugin ,
119- plugin_name = plugin_name ,
120- )
121- except Exception as e :
122- logger .info (f"Plugin '{ plugin_name } ' already registered or unavailable: { e } " )
123-
124- async def _run_agentic (self , messages : list [ChatMessage ], context : dict ) -> ChatResponse :
125- """Run an agentic LLM+tools loop.
126-
127- The LLM decides which tools to call (execution/workspace/ssh/keyvault), guarded by
128- GuardFilter. The assistant message is the final synthesized answer.
129- """
130-
131- self .tracer .step (
132- "execution_planning" ,
133- "inference" ,
134- "Running agentic tool loop" ,
135- input_snapshot = sanitize_snapshot (
136- {
137- "message_count" : len (messages ),
138- "has_agent_input" : "agent_input" in context ,
139- }
140- ),
141- )
142-
143- chat_history = ChatHistory ()
144- chat_history .add_system_message (ACTION_EXECUTOR_SYSTEM_PROMPT )
145-
146- agent_input = context .get ("agent_input" ) if isinstance (context , dict ) else None
147- if isinstance (agent_input , dict ) and agent_input :
148- chat_history .add_system_message (
149- "CONTEXT (use to resolve workspace/SID, do not expose verbatim):\n "
150- + json .dumps (agent_input , ensure_ascii = False )
151- )
152-
153- for msg in messages :
154- if msg .role == "user" :
155- chat_history .add_user_message (msg .content )
156- elif msg .role == "assistant" :
157- chat_history .add_assistant_message (msg .content )
158-
159- chat_service = self .kernel .get_service (service_id = "azure_openai_chat" )
160- execution_settings = chat_service .get_prompt_execution_settings_class ()(
161- function_choice_behavior = "auto" ,
162- max_completion_tokens = 1200 ,
163- )
164-
165- response = await chat_service .get_chat_message_content (
166- chat_history = chat_history ,
167- settings = execution_settings ,
168- kernel = self .kernel ,
169- )
170-
171- content = str (response .content ) if response and getattr (response , "content" , None ) else ""
172- content = content .strip ()
173- if not content :
174- content = "I couldn't produce a response. Please try again."
175-
176- self .tracer .step (
177- "response_generation" ,
178- "decision" ,
179- "Generated final answer from tool loop" ,
180- output_snapshot = sanitize_snapshot ({"response_length" : len (content )}),
181- )
182-
183- return ChatResponse (
184- messages = [ChatMessage (role = "assistant" , content = content )],
185- reasoning_trace = self .tracer .get_trace (),
186- metadata = None ,
187- )
188-
189102 async def execute_async (
190103 self ,
191104 workspace_id : str ,
@@ -280,190 +193,3 @@ def get_active_jobs_for_user(self, user_id: str) -> list["ExecutionJob"]:
280193 if not self .job_store :
281194 return []
282195 return self .job_store .get_active_jobs (user_id )
283-
284- async def run (
285- self ,
286- messages : list [ChatMessage ],
287- context : Optional [dict ] = None ,
288- ) -> ChatResponse :
289- """Handle structured execution requests (Agent interface implementation).
290-
291- Primary mode: agentic LLM+tools loop (model chooses tools; GuardFilter enforces safety).
292- Optional mode: async execution/job status queries if enabled.
293-
294- :param messages: Chat messages (used for logging/context)
295- :type messages: list[ChatMessage]
296- :param context: Context with execution parameters
297- :type context: Optional[dict]
298- :returns: ChatResponse with execution summary or job info
299- :rtype: ChatResponse
300- """
301- self .tracer .start ()
302- try :
303- context = context or {}
304- if "async_execution" in context and self ._async_enabled :
305- return await self ._run_async (messages , context )
306- if "job_status_query" in context :
307- return await self ._handle_job_status_query (context )
308-
309- return await self ._run_agentic (messages , context )
310-
311- except Exception as e :
312- logger .error (f"Error in ActionExecutorAgent.run: { e } " )
313-
314- self .tracer .step (
315- "execution_run" ,
316- "inference" ,
317- f"Error during test execution: { str (e )} " ,
318- error = str (e ),
319- output_snapshot = sanitize_snapshot ({"error_type" : type (e ).__name__ }),
320- )
321-
322- raise
323-
324- finally :
325- self .tracer .finish ()
326-
327- async def _run_async (
328- self ,
329- messages : list [ChatMessage ],
330- context : dict ,
331- ) -> ChatResponse :
332- """Handle async execution request.
333-
334- :param messages: Chat messages
335- :type messages: list[ChatMessage]
336- :param context: Context with async_execution params
337- :type context: dict
338- :returns: ChatResponse with job info
339- :rtype: ChatResponse
340- """
341- async_params = context ["async_execution" ]
342- workspace_id = async_params ["workspace_id" ]
343- test_ids = async_params .get ("test_ids" , [])
344- test_group = async_params .get ("test_group" , "CONFIG_CHECKS" )
345- conversation_id = context .get ("conversation_id" )
346- user_id = context .get ("user_id" )
347-
348- self .tracer .step (
349- "execution_async" ,
350- "tool_call" ,
351- f"Starting async execution for { len (test_ids )} tests" ,
352- input_snapshot = sanitize_snapshot (
353- {
354- "workspace_id" : workspace_id ,
355- "test_count" : len (test_ids ),
356- "test_group" : test_group ,
357- }
358- ),
359- )
360-
361- try :
362- job = await self .execute_async (
363- workspace_id = workspace_id ,
364- test_ids = test_ids ,
365- test_group = test_group ,
366- conversation_id = conversation_id ,
367- user_id = user_id ,
368- )
369-
370- test_list = ", " .join (test_ids [:3 ])
371- if len (test_ids ) > 3 :
372- test_list += f" and { len (test_ids ) - 3 } more"
373-
374- response_content = (
375- f"**Starting test execution**\n \n "
376- f"- **Workspace**: `{ workspace_id } `\n "
377- f"- **Tests**: { test_list } \n "
378- f"- **Job ID**: `{ job .id } `\n \n "
379- f"I'll provide real-time updates as the tests progress..."
380- )
381-
382- self .tracer .step (
383- "execution_async" ,
384- "decision" ,
385- f"Job { job .id } submitted for execution" ,
386- output_snapshot = sanitize_snapshot (
387- {
388- "job_id" : str (job .id ),
389- "status" : job .status .value ,
390- }
391- ),
392- )
393-
394- return ChatResponse (
395- messages = [ChatMessage (role = "assistant" , content = response_content )],
396- reasoning_trace = self .tracer .get_trace (),
397- metadata = {"job_id" : str (job .id ), "streaming" : True },
398- )
399-
400- except Exception as e :
401- logger .error (f"Failed to start async execution: { e } " )
402- return ChatResponse (
403- messages = [ChatMessage (role = "assistant" , content = str (e ))],
404- reasoning_trace = self .tracer .get_trace (),
405- metadata = None ,
406- )
407-
408- async def _handle_job_status_query (self , context : dict ) -> ChatResponse :
409- """Handle job status query.
410-
411- :param context: Context with job_status_query params
412- :type context: dict
413- :returns: ChatResponse with job status
414- :rtype: ChatResponse
415- """
416- query = context ["job_status_query" ]
417- job_id = query .get ("job_id" )
418- user_id = query .get ("user_id" )
419-
420- if job_id :
421- job = self .get_job_status (job_id )
422- if job :
423- from src .agents .execution .worker import JobEventEmitter
424-
425- summary = JobEventEmitter .format_job_summary (job )
426- return ChatResponse (
427- messages = [ChatMessage (role = "assistant" , content = summary )],
428- reasoning_trace = self .tracer .get_trace (),
429- metadata = None ,
430- )
431- else :
432- return ChatResponse (
433- messages = [ChatMessage (role = "assistant" , content = f"Job `{ job_id } ` not found." )],
434- reasoning_trace = self .tracer .get_trace (),
435- metadata = None ,
436- )
437- elif user_id :
438- jobs = self .get_active_jobs_for_user (user_id )
439- if jobs :
440- from src .agents .execution .worker import JobEventEmitter
441-
442- lines = ["**Your Active Jobs:**\n " ]
443- for job in jobs :
444- lines .append (JobEventEmitter .format_job_summary (job ))
445- lines .append ("" )
446- return ChatResponse (
447- messages = [ChatMessage (role = "assistant" , content = "\n " .join (lines ))],
448- reasoning_trace = self .tracer .get_trace (),
449- metadata = None ,
450- )
451- else :
452- return ChatResponse (
453- messages = [
454- ChatMessage (role = "assistant" , content = "You have no active test executions." )
455- ],
456- reasoning_trace = self .tracer .get_trace (),
457- metadata = None ,
458- )
459- else :
460- return ChatResponse (
461- messages = [
462- ChatMessage (
463- role = "assistant" ,
464- content = "Please specify a job ID or ask about your active jobs." ,
465- )
466- ],
467- reasoning_trace = self .tracer .get_trace (),
468- metadata = None ,
469- )
0 commit comments