Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 87 additions & 18 deletions acapy_agent/admin/decorators/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,33 @@ def admin_authentication(handler):
async def admin_auth(request):
context: AdminRequestContext = request["context"]
profile = context.profile

if request.method == "OPTIONS":
return await handler(request)

# OAuth path: token was validated by setup_context middleware.
# Require acapy:admin scope for admin-only routes.
if context.metadata and "scopes" in context.metadata:
if "acapy:admin" in context.metadata["scopes"]:
return await handler(request)
raise web.HTTPForbidden(
reason="acapy:admin scope required",
text="acapy:admin scope required",
)

header_admin_api_key = request.headers.get("x-api-key")
valid_key = general_utils.const_compare(
profile.settings.get("admin.admin_api_key"), header_admin_api_key
)
insecure_mode = bool(profile.settings.get("admin.admin_insecure_mode"))

# We have to allow OPTIONS method access to paths without a key since
# browsers performing CORS requests will never include the original
# x-api-key header from the method that triggered the preflight
# OPTIONS check.
if insecure_mode or valid_key or (request.method == "OPTIONS"):
if insecure_mode or valid_key:
return await handler(request)
else:
raise web.HTTPUnauthorized(
reason="API Key invalid or missing",
text="API Key invalid or missing",
)

raise web.HTTPUnauthorized(
reason="API Key invalid or missing",
text="API Key invalid or missing",
)

return admin_auth

Expand All @@ -58,6 +68,29 @@ def tenant_authentication(handler):
async def tenant_auth(request):
context: AdminRequestContext = request["context"]
profile = context.profile

if request.method == "OPTIONS":
return await handler(request)

# OAuth path: token was validated by setup_context middleware.
# acapy:tenant or acapy:admin both grant tenant-level access.
# acapy:tenant:read grants read-only access (safe HTTP methods only).
if context.metadata and "scopes" in context.metadata:
scopes = context.metadata["scopes"]
if scopes & {"acapy:tenant", "acapy:admin"}:
return await handler(request)
if "acapy:tenant:read" in scopes:
if request.method in ("GET", "HEAD"):
return await handler(request)
raise web.HTTPForbidden(
reason="acapy:tenant:read scope does not permit write operations",
text="acapy:tenant:read scope does not permit write operations",
)
raise web.HTTPForbidden(
reason="acapy:tenant scope required",
text="acapy:tenant scope required",
)

authorization_header = request.headers.get("Authorization")
header_admin_api_key = request.headers.get("x-api-key")
valid_key = general_utils.const_compare(
Expand All @@ -73,25 +106,61 @@ async def tenant_auth(request):
request.path,
)

# CORS fix: allow OPTIONS method access to paths without a token
if (
(multitenant_enabled and authorization_header)
or (not multitenant_enabled and valid_key)
or (multitenant_enabled and valid_key and base_wallet_allowed_route)
or (insecure_mode and not multitenant_enabled)
or request.method == "OPTIONS"
):
return await handler(request)
else:
auth_mode = "Authorization token" if multitenant_enabled else "API key"
raise web.HTTPUnauthorized(
reason=f"{auth_mode} missing or invalid",
text=f"{auth_mode} missing or invalid",
)

auth_mode = "Authorization token" if multitenant_enabled else "API key"
raise web.HTTPUnauthorized(
reason=f"{auth_mode} missing or invalid",
text=f"{auth_mode} missing or invalid",
)

return tenant_auth


def require_scope(*required_scopes: str):
"""Require at least one of the given OAuth2 scopes on the request token.

No-op when OAuth mode is not enabled (admin.oauth_enabled is not True), so
routes decorated with require_scope continue to work with API key / insecure
mode without any changes.

Must be stacked inside tenant_authentication or admin_authentication so that
authentication is checked before scope enforcement.

Example::

@tenant_authentication
@require_scope("acapy:tenant", "acapy:admin")
async def my_handler(request): ...
"""

def decorator(handler):
@functools.wraps(handler)
async def scope_check(request):
if request.method == "OPTIONS":
return await handler(request)
context: AdminRequestContext = request["context"]
if not context.profile.settings.get("admin.oauth_enabled"):
return await handler(request)
token_scopes: set = (context.metadata or {}).get("scopes", set())
if not token_scopes.intersection(required_scopes):
raise web.HTTPForbidden(
reason="Insufficient scope",
text="Insufficient scope",
)
return await handler(request)

return scope_check

return decorator


def _base_wallet_route_access(additional_routes: List[str], request_path: str) -> bool:
"""Check if request path matches additional routes."""
additional_routes_pattern = (
Expand Down
122 changes: 122 additions & 0 deletions acapy_agent/admin/oauth_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""OAuth2 token validator for ACA-Py acting as a Resource Server."""

import logging
from typing import Optional

import aiohttp
import jwt
from aiohttp import web

LOGGER = logging.getLogger(__name__)

_SUPPORTED_ALGORITHMS = [
"RS256", "RS384", "RS512",
"ES256", "ES384", "ES512",
"PS256", "PS384", "PS512",
]


class OAuthTokenValidator:
"""Validate OAuth2 bearer tokens from an external Authorization Server.

Supports JWT access tokens validated locally via JWKS, with opaque token
fallback via RFC 7662 token introspection.
"""

def __init__(self, settings):
"""Initialize the validator from application settings."""
self.jwks_uri: Optional[str] = settings.get("oauth.jwks_uri")
self.issuer: Optional[str] = settings.get("oauth.issuer")
self.audience: Optional[str] = settings.get("oauth.audience")
self.introspection_endpoint: Optional[str] = settings.get(
"oauth.introspection_endpoint"
)
self.introspection_client_id: Optional[str] = settings.get(
"oauth.introspection_client_id"
)
self.introspection_client_secret: Optional[str] = settings.get(
"oauth.introspection_client_secret"
)

# PyJWKClient handles JWKS fetching and caching internally.
self._jwks_client = (
jwt.PyJWKClient(self.jwks_uri, cache_keys=True) if self.jwks_uri else None
)

async def validate(self, token: str) -> dict:
"""Validate an access token and return its claims.

Tries JWT validation via JWKS first; falls back to introspection if the
token is opaque or if JWKS is not configured.

Raises:
web.HTTPUnauthorized: If the token is invalid or inactive.

Returns:
dict: Validated token claims.
"""
if self._jwks_client:
try:
return self._validate_jwt(token)
except jwt.exceptions.PyJWKClientError as exc:
# JWKS infrastructure error — only fall through if introspection is
# configured as a fallback, otherwise the token cannot be validated.
if not self.introspection_endpoint:
raise web.HTTPUnauthorized(reason="Token validation failed") from exc
LOGGER.debug("JWKS lookup failed, falling back to introspection: %s", exc)
except jwt.DecodeError:
# Token is not a JWT — only meaningful if introspection is available.
if not self.introspection_endpoint:
raise web.HTTPUnauthorized(reason="Invalid token")
LOGGER.debug("JWT decode failed, falling back to introspection")
except jwt.InvalidTokenError as exc:
raise web.HTTPUnauthorized(reason=str(exc)) from exc

if self.introspection_endpoint:
return await self._introspect(token)

raise web.HTTPUnauthorized(
reason="No token validation method available — configure oauth.jwks_uri or "
"oauth.introspection_endpoint"
)

def _validate_jwt(self, token: str) -> dict:
"""Validate a JWT access token using the configured JWKS endpoint."""
signing_key = self._jwks_client.get_signing_key_from_jwt(token)

decode_kwargs = dict(
algorithms=_SUPPORTED_ALGORITHMS,
options={"require": ["exp", "iss", "sub"]},
)
if self.issuer:
decode_kwargs["issuer"] = self.issuer
if self.audience:
decode_kwargs["audience"] = self.audience

return jwt.decode(token, signing_key.key, **decode_kwargs)

async def _introspect(self, token: str) -> dict:
"""Validate an opaque token via RFC 7662 introspection."""
auth = None
if self.introspection_client_id:
auth = aiohttp.BasicAuth(
self.introspection_client_id,
self.introspection_client_secret or "",
)

async with aiohttp.ClientSession() as session:
resp = await session.post(
self.introspection_endpoint,
data={"token": token, "token_type_hint": "access_token"},
auth=auth,
)
if resp.status != 200:
raise web.HTTPUnauthorized(
reason=f"Token introspection returned HTTP {resp.status}"
)
body = await resp.json()

if not body.get("active"):
raise web.HTTPUnauthorized(reason="Token is not active")

return body
Loading
Loading