Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions charts/mcp-stack/templates/configmap-nginx-proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ data:
keepalive 32;
}

# A2A API: prefer Rust sidecar when enabled, fallback to Python gateway.
upstream a2a_upstream {
server {{ $gatewayServiceName }}:8790;
server {{ $gatewayServiceName }}:{{ $gatewayServicePort }} backup;
keepalive 32;
}

server {
listen 80;

Expand Down Expand Up @@ -78,6 +85,25 @@ data:
proxy_pass http://gateway_upstream;
}

location ~ ^/a2a(/|$) {
proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $forwarded_proto;
proxy_set_header X-Forwarded-Host $http_host;
proxy_set_header Connection "";

proxy_cache off;

proxy_connect_timeout 30s;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
send_timeout {{ .Values.nginxProxy.config.sendTimeout }};

proxy_pass http://a2a_upstream;
}

location / {
proxy_http_version 1.1;
proxy_set_header Host $http_host;
Expand Down
88 changes: 88 additions & 0 deletions docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ MCP_RUST_AFFINITY_CORE_ENABLED="${MCP_RUST_AFFINITY_CORE_ENABLED:-}"
MCP_RUST_SESSION_AUTH_REUSE_ENABLED="${MCP_RUST_SESSION_AUTH_REUSE_ENABLED:-}"
MCP_RUST_SESSION_AUTH_REUSE_TTL_SECONDS="${MCP_RUST_SESSION_AUTH_REUSE_TTL_SECONDS:-}"

# Rust A2A sidecar mode (mirrors MCP runtime approach).
RUST_A2A_MODE="${RUST_A2A_MODE:-off}"
CONTEXTFORGE_ENABLE_RUST_A2A_BUILD="${CONTEXTFORGE_ENABLE_RUST_A2A_BUILD:-${CONTEXTFORGE_ENABLE_RUST_BUILD:-false}}"
A2A_RUST_LISTEN_HTTP="${A2A_RUST_LISTEN_HTTP:-}"
A2A_RUST_BACKEND_BASE_URL="${A2A_RUST_BACKEND_BASE_URL:-}"
A2A_RUST_AUTH_SECRET="${A2A_RUST_AUTH_SECRET:-}"
A2A_RUST_MAX_CONCURRENT="${A2A_RUST_MAX_CONCURRENT:-}"
A2A_RUST_MAX_QUEUED="${A2A_RUST_MAX_QUEUED:-}"
A2A_RUST_INVOKE_TIMEOUT_SECS="${A2A_RUST_INVOKE_TIMEOUT_SECS:-}"

RUST_A2A_PID=""

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "${SCRIPT_DIR}" || {
echo "ERROR: Cannot change to script directory: ${SCRIPT_DIR}"
Expand Down Expand Up @@ -191,12 +203,74 @@ cleanup() {
pids+=("${RUST_MCP_PID}")
fi

if [[ -n "${RUST_A2A_PID}" ]] && kill -0 "${RUST_A2A_PID}" 2>/dev/null; then
pids+=("${RUST_A2A_PID}")
fi

if [[ ${#pids[@]} -gt 0 ]]; then
kill "${pids[@]}" 2>/dev/null || true
wait "${pids[@]}" 2>/dev/null || true
fi
}

start_managed_rust_a2a_service() {
local runtime_bin="/app/bin/a2a-service"
local rust_listen_http="${A2A_RUST_LISTEN_HTTP:-127.0.0.1:8790}"
local backend_base_url="${A2A_RUST_BACKEND_BASE_URL:-http://127.0.0.1:${PORT:-4444}}"

if [[ "${CONTEXTFORGE_ENABLE_RUST_A2A_BUILD}" != "true" ]]; then
echo "ERROR: RUST_A2A_MODE enabled but this image was built without Rust A2A artifacts."
echo "Rebuild with the Rust A2A binary included, or set RUST_A2A_MODE=off."
exit 1
fi

if [[ ! -x "${runtime_bin}" ]]; then
echo "ERROR: Rust A2A service binary not found at ${runtime_bin}"
exit 1
fi

export A2A_RUST_LISTEN_HTTP="${rust_listen_http}"
export A2A_RUST_BACKEND_BASE_URL="${backend_base_url}"
if [[ -n "${A2A_RUST_AUTH_SECRET}" ]]; then
export A2A_RUST_AUTH_SECRET="${A2A_RUST_AUTH_SECRET}"
fi
if [[ -n "${A2A_RUST_MAX_CONCURRENT}" ]]; then
export A2A_RUST_MAX_CONCURRENT="${A2A_RUST_MAX_CONCURRENT}"
fi
if [[ -n "${A2A_RUST_MAX_QUEUED}" ]]; then
export A2A_RUST_MAX_QUEUED="${A2A_RUST_MAX_QUEUED}"
fi
if [[ -n "${A2A_RUST_INVOKE_TIMEOUT_SECS}" ]]; then
export A2A_RUST_INVOKE_TIMEOUT_SECS="${A2A_RUST_INVOKE_TIMEOUT_SECS}"
fi

echo "Starting Rust A2A service on ${A2A_RUST_LISTEN_HTTP} (backend: ${A2A_RUST_BACKEND_BASE_URL})..."
"${runtime_bin}" &
RUST_A2A_PID=$!

python3 - <<'PY'
import os
import sys
import time
import urllib.error
import urllib.request

listen = os.environ.get("A2A_RUST_LISTEN_HTTP", "127.0.0.1:8790")
health_url = f"http://{listen}/health"

for _ in range(60):
try:
with urllib.request.urlopen(health_url, timeout=2) as response:
if response.status == 200:
sys.exit(0)
except (OSError, urllib.error.URLError):
time.sleep(0.5)

print(f\"ERROR: Rust A2A service failed health check at {health_url}\", file=sys.stderr)
sys.exit(1)
PY
}

print_mcp_runtime_mode() {
local runtime_mode="python"
local upstream_client_mode="native"
Expand Down Expand Up @@ -386,6 +460,20 @@ apply_rust_mcp_mode_defaults
build_server_command "$@"
print_mcp_runtime_mode

case "${RUST_A2A_MODE,,}" in
""|off)
;;
shadow|edge)
trap cleanup EXIT INT TERM
start_managed_rust_a2a_service
;;
*)
echo "ERROR: Unknown RUST_A2A_MODE value: ${RUST_A2A_MODE}"
echo "Valid options: off, shadow, edge"
exit 1
;;
esac

if [[ "${EXPERIMENTAL_RUST_MCP_RUNTIME_ENABLED}" = "true" && "${EXPERIMENTAL_RUST_MCP_RUNTIME_MANAGED}" = "true" ]]; then
trap cleanup EXIT INT TERM
start_managed_rust_mcp_runtime
Expand Down
32 changes: 32 additions & 0 deletions infra/nginx/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ http {
keepalive_timeout 60s;
}

# A2A HTTP API: prefer Rust sidecar when enabled, fallback to Python gateway.
# Mirrors the MCP transport pattern above.
upstream a2a_backend {
least_conn;
server gateway:8790 max_fails=0;
server gateway:4444 max_fails=0 backup;
keepalive 512;
keepalive_requests 100000;
keepalive_timeout 60s;
}

# ============================================================
# SSL Backend Configuration (for HTTPS gateway backend)
# ============================================================
Expand Down Expand Up @@ -592,6 +603,27 @@ http {
proxy_read_timeout 1h;
}

# A2A HTTP API: route to Rust sidecar (when enabled), fallback to Python gateway.
location ~ ^/a2a(/|$) {
proxy_pass http://a2a_backend;

proxy_http_version 1.1;
proxy_set_header Connection '';

# Proxy headers
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $forwarded_proto;
proxy_set_header X-Forwarded-Host $http_host;

proxy_connect_timeout 30s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;

proxy_cache off;
}

# ============================================================
# JSON-RPC Endpoint - No Caching
# ============================================================
Expand Down
78 changes: 78 additions & 0 deletions mcpgateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8344,6 +8344,84 @@ async def handle_internal_mcp_prompts_get_authz(request: Request):
)


async def _authorize_internal_a2a_method(
request: Request,
*,
permission: str,
method: str,
) -> Response:
"""Authorize a trusted internal A2A method for Rust module execution.

A2A is not server-scoped like MCP. We still enforce both token-scope caps and RBAC
via the same core authorization machinery used by internal MCP authz endpoints.

Args:
request: Trusted internal authz request.
permission: Permission required for the operation.
method: Method label used for permission error reporting.

Returns:
Empty 204 response when authorized; otherwise a JSON error response.
"""
db = SessionLocal()
try:
await _authorize_internal_mcp_request(
request,
db,
permission=permission,
method=method,
)
if db.is_active and db.in_transaction() is not None:
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except JSONRPCError as exc:
return ORJSONResponse(status_code=403, content={"code": exc.code, "message": exc.message, "data": exc.data})
except Exception:
try:
db.rollback()
except Exception:
try:
db.invalidate()
except Exception:
pass # nosec B110 - Best effort cleanup on connection failure
raise
finally:
db.close()


@utility_router.post("/_internal/a2a/list/authz/")
@utility_router.post("/_internal/a2a/list/authz")
async def handle_internal_a2a_list_authz(request: Request):
"""Authorize trusted A2A list requests for Rust module execution."""
return await _authorize_internal_a2a_method(
request,
permission="a2a.read",
method="a2a/list",
)


@utility_router.post("/_internal/a2a/get/authz/")
@utility_router.post("/_internal/a2a/get/authz")
async def handle_internal_a2a_get_authz(request: Request):
"""Authorize trusted A2A get requests for Rust module execution."""
return await _authorize_internal_a2a_method(
request,
permission="a2a.read",
method="a2a/get",
)


@utility_router.post("/_internal/a2a/invoke/authz/")
@utility_router.post("/_internal/a2a/invoke/authz")
async def handle_internal_a2a_invoke_authz(request: Request):
"""Authorize trusted A2A invoke requests for Rust module execution."""
return await _authorize_internal_a2a_method(
request,
permission="a2a.invoke",
method="a2a/invoke",
)


async def _maybe_forward_affinitized_rpc_request(
request: Request,
*,
Expand Down
Loading