Create a test harness called exgentic_a2a_runner that integrates Exgentic benchmarks with Kagenti agents using the A2A protocol. This harness will follow the execution model defined in GitHub Issue #963.
┌─────────────────────────────────────────────────────────────────┐
│ Exgentic A2A Runner │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Runner │───▶│ Exgentic │───▶│ MCP Client │ │
│ │ │ │ Adapter │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────┐ │
│ │ │ │ Exgentic MCP │ │
│ │ │ │ Server │ │
│ │ │ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ A2A Client │ │ Prompt │ │
│ │ │ │ Builder │ │
│ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Kagenti Agent│ │
│ │ (via A2A) │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
For each task:
- Create Session:
(session_id, task) = benchmark_mcp.call("create_session") - Record Start Time:
startTime = time() - Invoke Agent:
agent.invoke_agent("{task}. Use session id {session_id} in all accesses") - Record Completion Time:
completionTime = time() - startTime - Evaluate Session:
success = benchmark_mcp.call("evaluate_session", {"session_id": session_id}) - Store Statistics:
stats[session_id] = (completion_time, success) - Close Session:
benchmark_mcp.call("close_session", {"session_id": session_id})
exgentic_a2a_runner/
├── pyproject.toml # Project configuration and dependencies
├── README.md # Documentation
├── example.env # Example environment configuration
├── .gitignore # Git ignore patterns
└── exgentic_a2a_runner/
├── __init__.py # Package initialization
├── runner.py # Main orchestration logic
├── config.py # Configuration management
├── exgentic_adapter.py # Exgentic MCP server adapter
├── mcp_client.py # MCP protocol client
├── a2a_client.py # A2A protocol client (reused from appworld)
├── prompt.py # Prompt construction
└── otel.py # OpenTelemetry instrumentation
ExgenticConfig
mcp_server_url: URL of the Exgentic MCP server (required)mcp_timeout_seconds: Timeout for MCP operations (default: 60)max_tasks: Maximum number of tasks to process (optional)abort_on_failure: Stop on first failure (default: false)
A2AConfig (reused from appworld_a2a_runner)
base_url: A2A endpoint base URLtimeout_seconds: Request timeoutauth_token: Bearer token for authenticationverify_tls: TLS verification flagendpoint_path: Endpoint path
OTELConfig (reused from appworld_a2a_runner)
- Standard OpenTelemetry configuration
DebugConfig (reused from appworld_a2a_runner)
log_prompt: Log prompt detailslog_response: Log response details
Uses the official MCP Python SDK to communicate with the Exgentic MCP server.
Key Methods:
create_session() -> (session_id: str, task: str): Create a new benchmark sessionevaluate_session(session_id: str) -> bool: Evaluate session successclose_session(session_id: str) -> None: Close and cleanup session
Implementation Notes:
- Use
mcpPython package for MCP protocol communication - Handle connection lifecycle properly
- Implement proper error handling and timeouts
- Support both stdio and SSE transport modes
Provides high-level interface to Exgentic MCP server operations.
SessionData Class:
class SessionData:
session_id: str
task: str
created_at: floatExgenticAdapter Class:
initialize(): Initialize MCP client connectioncreate_session() -> SessionData: Create new session and get taskevaluate_session(session_id: str) -> bool: Evaluate sessionclose_session(session_id: str): Close sessioniterate_sessions(): Iterator for sequential session processing
Constructs prompts that include the session_id for the agent.
Format:
The task you are to complete is:
{task}
IMPORTANT: Use session id "{session_id}" in all your interactions with the benchmark tools.
Reuse the existing implementation from appworld_a2a_runner with minimal modifications.
Main orchestration logic following the execution model.
SessionResult Class:
class SessionResult:
session_id: str
success: bool
latency_ms: float
error: Optional[str]
response_chars: Optional[int]Runner Class:
initialize(): Initialize all componentsprocess_session(session_data: SessionData) -> SessionResult: Process single sessionrun() -> int: Main execution loop
Process Flow:
def process_session(session_data):
start_time = time.time()
# Build prompt with session_id
prompt = build_prompt(session_data.task, session_data.session_id)
# Send to agent via A2A
response = a2a_client.send_prompt(prompt)
# Evaluate session
success = exgentic_adapter.evaluate_session(session_data.session_id)
# Close session
exgentic_adapter.close_session(session_data.session_id)
completion_time = time.time() - start_time
return SessionResult(
session_id=session_data.session_id,
success=success,
latency_ms=completion_time * 1000,
response_chars=len(response)
)Extended from appworld_a2a_runner with additional metrics.
Additional Spans:
exgentic_a2a.session: Overall session processingexgentic_a2a.mcp.create_session: Session creationexgentic_a2a.mcp.evaluate_session: Session evaluationexgentic_a2a.mcp.close_session: Session cleanup
Additional Attributes:
exgentic.session_id: Session identifierexgentic.mcp_server_url: MCP server URLexgentic.evaluation_result: Success/failure of evaluation
Additional Metrics:
exgentic_a2a_sessions_total{status=success|failed}: Total sessions processedexgentic_a2a_session_latency_ms: End-to-end session latencyexgentic_a2a_evaluation_latency_ms: Evaluation operation latencyexgentic_a2a_session_creation_latency_ms: Session creation latency
[project]
name = "exgentic-a2a-runner"
version = "0.1.0"
description = "Exgentic Benchmark A2A Runner for Kagenti"
requires-python = ">=3.11"
dependencies = [
"mcp>=0.9.0",
"requests>=2.28.0",
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp>=1.20.0",
"opentelemetry-instrumentation-requests>=0.41b0",
]
[project.scripts]
exgentic-a2a-runner = "exgentic_a2a_runner.runner:main"# REQUIRED CONFIGURATION
EXGENTIC_MCP_SERVER_URL=http://localhost:3000
A2A_BASE_URL=http://localhost:8000
# OPTIONAL CONFIGURATION
EXGENTIC_MCP_TIMEOUT_SECONDS=60
MAX_TASKS=10
ABORT_ON_FAILURE=false
# A2A Configuration
A2A_TIMEOUT_SECONDS=300
A2A_AUTH_TOKEN=
A2A_VERIFY_TLS=true
A2A_ENDPOINT_PATH=/v1/chat
# OpenTelemetry Configuration
OTEL_SERVICE_NAME=exgentic-a2a-runner
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
OTEL_INSTRUMENT_REQUESTS=true
# Debug Configuration
LOG_PROMPT=0
LOG_RESPONSE=0- MCP Integration: Uses MCP protocol instead of direct AppWorld API calls
- Session Management: Explicit session lifecycle (create → use → evaluate → close)
- Task Source: Tasks come from MCP server's
create_sessioncall, not from dataset enumeration - Evaluation: Uses MCP server's
evaluate_sessioninstead of AppWorld's evaluation system - Prompt Format: Includes session_id in the prompt for agent to use
- Dependencies: Adds MCP Python SDK, removes AppWorld package
- Create directory structure
- Set up configuration management
- Create basic project files (pyproject.toml, README.md, example.env)
- Implement MCPClient using official MCP SDK
- Implement ExgenticAdapter with session lifecycle
- Add proper error handling and timeouts
- Implement main Runner class
- Implement session processing flow
- Add summary statistics and reporting
- Reuse/adapt A2A client from appworld_a2a_runner
- Implement prompt builder with session_id
- Add OpenTelemetry instrumentation
- Test with actual Exgentic MCP server
- Complete README with usage examples
- Add error handling and edge cases
- ✅ Sequential execution of benchmark tasks via MCP server
- ✅ Proper session lifecycle management (create → evaluate → close)
- ✅ Integration with Kagenti agents via A2A protocol
- ✅ Session_id included in prompts for agent use
- ✅ Comprehensive OpenTelemetry instrumentation
- ✅ Configuration via environment variables
- ✅ Summary statistics and reporting
- ✅ Proper error handling and logging
- Review and approve this plan
- Switch to Code mode to implement the solution
- Test with actual Exgentic MCP server and Kagenti agent
- Iterate based on testing results