Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ bcrypt>=4.0.0
# OIDC/OAuth2 authentication (Red Hat SSO / Keycloak)
PyJWT>=2.8.0
httpx>=0.27.0

# LDAP for Rover group lookups (RBAC)
pyasn1==0.6.2
ldap3>=2.9.0

# Logging and monitoring
python-json-logger>=2.0.0
Expand Down
19 changes: 17 additions & 2 deletions server/core/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from server.handlers.tool_handler import ToolHandler
from server.transport.base_transport import BaseTransport
from utils.logger import get_logger
from utils.rbac import AuthorizationService


class KonfluxDevLakeMCPServer:
Expand All @@ -39,11 +40,25 @@ def __init__(self, config, db_connection, tools_manager, security_manager):
self.db_connection = db_connection
self.tools_manager = tools_manager
self.security_manager = security_manager
self.logger = get_logger(f"{__name__}.KonfluxDevLakeMCPServer")

# Initialize RBAC - enabled when OIDC is enabled
rbac_enabled = False
if hasattr(config, "oidc") and config.oidc.enabled:
rbac_enabled = True
self.logger.info("RBAC enabled (OIDC authentication is active)")
else:
self.logger.info("RBAC disabled (OIDC authentication is not active)")

# Initialize core components
self.server = Server("konflux-devlake-mcp-server")
self.tool_handler = ToolHandler(tools_manager, security_manager)
self.logger = get_logger(f"{__name__}.KonfluxDevLakeMCPServer")
self.authorization_service = AuthorizationService() if rbac_enabled else None
self.tool_handler = ToolHandler(
tools_manager,
security_manager,
authorization_service=self.authorization_service,
rbac_enabled=rbac_enabled,
)

# Setup protocol handlers
self._setup_protocol_handlers()
Expand Down
99 changes: 93 additions & 6 deletions server/handlers/tool_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,89 @@
Tool Handler for MCP Server

This module handles tool execution requests, including security validation,
data masking, and error handling.
data masking, authorization, and error handling.
"""

import contextvars
import json
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from mcp.types import TextContent

from utils.logger import get_logger
from utils.db import DateTimeEncoder
from utils.security import SQLInjectionDetector, DataMasking
from utils.rbac import AuthorizationService

# Context variable to store current user info for the request
# This allows the tool handler to access user info set by the auth middleware
current_user_context: contextvars.ContextVar[Optional[Dict[str, Any]]] = contextvars.ContextVar(
"current_user_context", default=None
)


def set_user_context(user_info: Optional[Dict[str, Any]]) -> None:
"""
Set the current user context for the request.

Args:
user_info: User information from OIDC authentication (id, username, groups, etc.)
"""
current_user_context.set(user_info)


def get_user_context() -> Optional[Dict[str, Any]]:
"""
Get the current user context for the request.

Returns:
User information dict or None if not authenticated
"""
return current_user_context.get()


class ToolHandler:
"""
Handles tool execution requests with security validation and data masking.
Handles tool execution requests with security validation, authorization, and data masking.

This class provides a centralized way to execute tools while ensuring
proper security validation and data protection.
proper security validation, role-based access control, and data protection.
"""

def __init__(self, tools_manager, security_manager):
def __init__(
self,
tools_manager,
security_manager,
authorization_service: Optional[AuthorizationService] = None,
rbac_enabled: bool = True,
):
"""
Initialize the tool handler.

Args:
tools_manager: Tools management system
security_manager: Security validation system
authorization_service: RBAC authorization service (created if None and rbac_enabled)
rbac_enabled: Whether to enforce role-based access control
"""
self.tools_manager = tools_manager
self.security_manager = security_manager
self.sql_injection_detector = SQLInjectionDetector()
self.data_masking = DataMasking()
self.logger = get_logger(f"{__name__}.ToolHandler")

# RBAC configuration
self.rbac_enabled = rbac_enabled
if rbac_enabled:
self.authorization_service = authorization_service or AuthorizationService()
self.logger.info("RBAC enabled for tool handler")
else:
self.authorization_service = None
self.logger.info("RBAC disabled for tool handler")

async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""
Handle a tool execution request with full security validation.
Handle a tool execution request with authorization and security validation.

Args:
name: Name of the tool to execute
Expand All @@ -52,6 +97,15 @@ async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[T
try:
self.logger.info(f"Handling tool call request: {name}")

# Check authorization (RBAC) if enabled
if self.rbac_enabled and self.authorization_service:
auth_result = self._check_authorization(name)
if not auth_result["authorized"]:
self.logger.warning(
f"Authorization denied for tool '{name}': {auth_result['reason']}"
)
return self._create_error_response(auth_result["reason"])

# Perform security validation
validation_result = await self._validate_tool_request(name, arguments)
if not validation_result["valid"]:
Expand All @@ -69,6 +123,39 @@ async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[T
self.logger.error(f"Failed to handle tool call request: {e}")
return self._create_error_response(f"Tool call failed: {str(e)}", name, arguments)

def _check_authorization(self, tool_name: str) -> Dict[str, Any]:
"""
Check if the current user is authorized to call the specified tool.

Uses Rover group membership for role assignment.

Args:
tool_name: Name of the tool being called

Returns:
Dict with 'authorized' bool and 'reason' string if denied
"""
user_context = get_user_context()

# If no user context and RBAC is enabled, deny access
if user_context is None:
return {
"authorized": False,
"reason": "Access denied: authentication required for this tool",
}

# Extract username from context
username = user_context.get("username", "unknown")

# Check authorization
if self.authorization_service.is_authorized(tool_name, username):
self.logger.debug(f"User '{username}' authorized for tool '{tool_name}'")
return {"authorized": True}

# Get detailed denial reason
reason = self.authorization_service.get_denied_reason(tool_name, username)
return {"authorized": False, "reason": reason}

async def _validate_tool_request(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate tool request for security and correctness.
Expand Down
17 changes: 14 additions & 3 deletions server/middleware/auth_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
Authentication Middleware for MCP Server

This module provides ASGI middleware for authenticating requests
using OIDC tokens from Red Hat SSO / Keycloak.
using OIDC tokens from Red Hat SSO / Keycloak, and sets user context
for downstream authorization checks.
"""

from typing import Any, Callable, Dict, Optional

from starlette.responses import JSONResponse

from server.middleware.oidc_auth import OIDCAuthenticator, OIDCConfig
from server.handlers.tool_handler import set_user_context
from utils.logger import get_logger


Expand Down Expand Up @@ -98,25 +100,34 @@ async def __call__(self, scope: Dict[str, Any], receive: Callable, send: Callabl
return

# Add user info to scope for downstream handlers
scope["user"] = {
user_info = {
"id": result.user_id,
"username": result.username,
"email": result.email,
"groups": result.groups,
"scopes": result.scopes,
}
scope["user"] = user_info

# Set user context for RBAC authorization in tool handler
set_user_context(user_info)

self.logger.debug(
f"Authenticated request from user: {result.username}",
extra={
"path": path,
"user_id": result.user_id,
"username": result.username,
"groups": result.groups,
},
)

# Pass request to the application
await self.app(scope, receive, send)
try:
await self.app(scope, receive, send)
finally:
# Clear user context after request completes
set_user_context(None)


def create_auth_middleware(
Expand Down
Loading
Loading