Skip to content
Merged
1 change: 1 addition & 0 deletions cypress/src/smoke/fixtures/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"showRagLibrary": true,
"showPromptTemplateLibrary": true,
"mcpConnections": true,
"bedrockAgents": true,
"awsSessions": false,
"showMcpWorkbench": true,
"projectOrganization": true
Expand Down
4 changes: 4 additions & 0 deletions lambda/chat_assistant_stacks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class ChatAssistantStackModel(BaseModel):
collectionIds: list[str] = Field(default_factory=list)
mcpServerIds: list[str] = Field(default_factory=list)
mcpToolIds: list[str] = Field(default_factory=list)
bedrockAgentIds: list[str] = Field(
default_factory=list,
description="Bedrock agent IDs allowed when using this stack; users must still opt in under Agent connections.",
)
personaPromptId: str | None = None
directivePromptIds: list[str] = Field(default_factory=list)
allowedGroups: list[str] = Field(default_factory=list)
Expand Down
225 changes: 224 additions & 1 deletion lambda/mcp_server/lambda_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import uuid
from decimal import Decimal
from functools import reduce
from typing import Any
from typing import Any, cast

import boto3
from boto3.dynamodb.conditions import Attr, Key
from botocore.exceptions import ClientError
from models.domain_objects import InvokeBedrockAgentRequest
from utilities.auth import admin_only, get_user_context
from utilities.bedrock_agent_discovery import discover_bedrock_agents
from utilities.common_functions import api_wrapper, get_bearer_token, get_item, retry_config
from utilities.exceptions import (
BadRequestException,
Expand All @@ -35,8 +38,11 @@
InternalServerErrorException,
NotFoundException,
)
from utilities.time import iso_string
from utilities.validation import ValidationError

from .models import (
BedrockAgentApprovalPut,
HostedMcpServerModel,
HostedMcpServerStatus,
McpServerModel,
Expand All @@ -52,6 +58,58 @@
stepfunctions = boto3.client("stepfunctions", region_name=os.environ["AWS_REGION"], config=retry_config)


def _bedrock_approvals_table() -> Any:
name = os.environ.get("BEDROCK_AGENT_APPROVALS_TABLE_NAME")
if not name:
raise InternalServerErrorException("BEDROCK_AGENT_APPROVALS_TABLE_NAME is not configured")
return dynamodb.Table(name)


def _normalize_catalog_groups(groups: list[str] | None) -> list[str]:
if not groups:
return []
out: list[str] = []
for g in groups:
s = str(g).strip()
if not s:
continue
out.append(s if s.startswith("group:") else f"group:{s}")
return out


def _approval_visible_to_user(user_groups: list[str], approval_groups: list[str] | None) -> bool:
"""True if catalog row is global (no groups) or user matches a listed group: token."""
if not approval_groups:
return True
formatted = [f"group:{g}" for g in user_groups]
return _is_member(formatted, list(approval_groups))


def _scan_bedrock_agent_approvals() -> list[dict[str, Any]]:
t = _bedrock_approvals_table()
items: list[dict[str, Any]] = []
kwargs: dict[str, Any] = {}
while True:
resp = t.scan(**kwargs)
items.extend(resp.get("Items", []))
lek = resp.get("LastEvaluatedKey")
if not lek:
break
kwargs["ExclusiveStartKey"] = lek
return items


def _serialize_dynamo_item(item: dict[str, Any]) -> dict[str, Any]:
return cast(dict[str, Any], json.loads(json.dumps(item, default=str)))


def _path_bedrock_agent_id(event: dict) -> str:
aid = (event.get("pathParameters") or {}).get("agentId")
if not aid:
raise BadRequestException("agentId is required")
return str(aid)


def _normalize_server_name(name: str) -> str:
"""Normalize server name to match CDK resource naming (alphanumeric only)."""
return re.sub(r"[^a-zA-Z0-9]", "", name)
Expand Down Expand Up @@ -557,3 +615,168 @@ def update_hosted_mcp_server(event: dict, context: dict) -> Any:

# Return current server config (status will be updated by state machine)
return item


@api_wrapper
def list_bedrock_agents(event: dict, context: dict) -> dict[str, Any]:
"""
List admin-approved Bedrock agents visible to this user, merged with live AWS discovery.
"""
_user_id, _is_admin, groups = get_user_context(event)
logger.info("Listing approved Bedrock agents for catalog")

approvals = _scan_bedrock_agent_approvals()
visible = [a for a in approvals if _approval_visible_to_user(groups, a.get("groups"))]
Comment thread
estohlmann marked this conversation as resolved.
Outdated

aws_region = os.environ["AWS_REGION"]
bedrock_agent_client = boto3.client("bedrock-agent", aws_region, config=retry_config)
discovered = discover_bedrock_agents(bedrock_agent_client)
by_id = {d.agentId: d for d in discovered}

merged: list[dict[str, Any]] = []
for appr in visible:
agent_id = str(appr["agentId"])
alias = appr.get("agentAliasId")
disc = by_id.get(agent_id)
raw_groups = appr.get("groups")
ag_groups = raw_groups if isinstance(raw_groups, list) else []
if disc:
row = disc.model_dump(mode="json")
if alias:
row["suggestedAliasId"] = str(alias)
row["catalogGroups"] = [str(g) for g in ag_groups]
row["inAccount"] = True
else:
row = {
"agentId": agent_id,
"agentName": str(appr.get("agentName", agent_id)),
"agentStatus": "NOT_IN_ACCOUNT",
"description": "",
"suggestedAliasId": str(alias) if alias else None,
"aliases": [],
"invokeReady": bool(alias),
"actionTools": [],
"inAccount": False,
"catalogGroups": [str(g) for g in ag_groups],
}
merged.append(row)

return {"agents": merged, "totalAgents": len(merged)}


@api_wrapper
@admin_only
def list_bedrock_agents_discovery(event: dict, context: dict) -> dict[str, Any]:
"""Full account scan (admin only) for the management UI."""
get_user_context(event)
aws_region = os.environ["AWS_REGION"]
bedrock_agent_client = boto3.client("bedrock-agent", aws_region, config=retry_config)
agents = discover_bedrock_agents(bedrock_agent_client)
return {"agents": [a.model_dump(mode="json") for a in agents], "totalAgents": len(agents)}


@api_wrapper
@admin_only
def list_bedrock_agent_approvals(event: dict, context: dict) -> dict[str, Any]:
"""All catalog rows (admin)."""
get_user_context(event)
raw = _scan_bedrock_agent_approvals()
return {"approvals": [_serialize_dynamo_item(x) for x in raw]}


@api_wrapper
@admin_only
def put_bedrock_agent_approval(event: dict, context: dict) -> dict[str, Any]:
"""Upsert a catalog row."""
user_id, _, _ = get_user_context(event)
agent_id = _path_bedrock_agent_id(event)
body = json.loads(event.get("body") or "{}", parse_float=Decimal)
model = BedrockAgentApprovalPut(**body)
groups = _normalize_catalog_groups(model.groups)
item = {
"agentId": agent_id,
"agentAliasId": model.agentAliasId,
"agentName": model.agentName,
"groups": groups,
"updatedAt": iso_string(),
"updatedBy": user_id,
}
_bedrock_approvals_table().put_item(Item=item)
return _serialize_dynamo_item(item)


@api_wrapper
@admin_only
def delete_bedrock_agent_approval(event: dict, context: dict) -> dict[str, str]:
get_user_context(event)
agent_id = _path_bedrock_agent_id(event)
_bedrock_approvals_table().delete_item(Key={"agentId": agent_id})
return {"status": "ok"}


@api_wrapper
def invoke_bedrock_agent(event: dict, context: dict) -> dict[str, Any]:
"""
Invoke a Bedrock Agent via bedrock-agent-runtime and return aggregated text output.
"""
user_id, is_admin_user, groups = get_user_context(event)
Comment thread
estohlmann marked this conversation as resolved.
body = json.loads(event.get("body") or "{}")
request = InvokeBedrockAgentRequest(**body)

appr = _bedrock_approvals_table().get_item(Key={"agentId": request.agentId}).get("Item")
if not appr:
raise ForbiddenException("This Bedrock agent is not approved for use in LISA.")
if not is_admin_user and not _approval_visible_to_user(groups, appr.get("groups")):
raise ForbiddenException("You are not allowed to invoke this Bedrock agent.")
approved_alias = appr.get("agentAliasId")
if not approved_alias or str(approved_alias) != str(request.agentAliasId):
raise ValidationError("Agent alias does not match the approved catalog entry for this agent.")

session_id = request.sessionId or str(uuid.uuid4())

if request.functionName:
params = dict(request.parameters or {})
params_json = json.dumps(params, ensure_ascii=False)
ag_label = (request.actionGroupName or "").strip()
ag_hint = f", action group name {ag_label!r}" if ag_label else ""
input_text = (
f"You must invoke the action group function {request.functionName!r} "
f"(action group id {request.actionGroupId}{ag_hint}) "
f"with these parameter values: {params_json}. "
"Execute the function and respond with the outcome."
)
else:
input_text = str(request.inputText or "")

aws_region = os.environ["AWS_REGION"]
runtime = boto3.client("bedrock-agent-runtime", aws_region, config=retry_config)
try:
response = runtime.invoke_agent(
agentId=request.agentId,
agentAliasId=request.agentAliasId,
sessionId=session_id,
inputText=input_text,
)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code == "AccessDeniedException":
raise ValidationError(
"Access denied invoking Bedrock Agent. Check IAM for bedrock:InvokeAgent on the agent alias."
) from e
raise ValidationError(f"Failed to invoke Bedrock Agent: {e!s}") from e

completion_parts: list[str] = []
for stream_event in response.get("completion", []):
if "chunk" in stream_event:
chunk = stream_event["chunk"]
raw = chunk.get("bytes")
if raw is not None:
if isinstance(raw, bytes):
completion_parts.append(raw.decode("utf-8", errors="replace"))
else:
completion_parts.append(str(raw))

return {
"outputText": "".join(completion_parts),
"sessionId": session_id,
}
11 changes: 11 additions & 0 deletions lambda/mcp_server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,14 @@ def validate_memory(cls, memory: int | None) -> int | None:
if memory > 30720:
raise ValueError("Memory limit must be at most 30720 MiB")
return memory


class BedrockAgentApprovalPut(BaseModel):
"""Admin catalog entry for a Bedrock agent (DynamoDB)."""

agentAliasId: str = Field(min_length=1, description="Alias ID used for InvokeAgent")
agentName: str = Field(min_length=1, description="Display name snapshot")
groups: list[str] | None = Field(
default=None,
description="group:... tokens; omit or empty = all authenticated users",
)
77 changes: 77 additions & 0 deletions lambda/models/domain_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,3 +1498,80 @@ class DataSourceSelection(BaseModel):
dataSourceName: str = Field(description="Data Source name")
s3Bucket: str = Field(description="S3 bucket for the data source")
s3Prefix: str = Field(default="", description="S3 prefix for the data source")


class BedrockAgentAliasSummary(BaseModel):
"""Summary row from bedrock-agent ListAgentAliases."""

model_config = ConfigDict(extra="ignore")

agentAliasId: str
agentAliasName: str | None = None
agentAliasStatus: str | None = None
description: str | None = None


class BedrockAgentActionTool(BaseModel):
"""One action-group function exposed as an OpenAI tool in LISA chat."""

openAiToolName: str = Field(description="Unique tool name for the chat LLM")
functionName: str
actionGroupId: str
actionGroupName: str = ""
description: str = ""
parameterSchema: dict[str, Any] = Field(
default_factory=dict,
description="OpenAI-style JSON Schema object with type, properties, required",
)


class BedrockAgentDiscoveryItem(BaseModel):
"""One Bedrock Agent with alias metadata for LISA discovery UI."""

agentId: str
agentName: str
agentStatus: str
description: str = ""
updatedAt: datetime | None = None
latestAgentVersion: str | None = None
suggestedAliasId: str | None = None
aliases: list[BedrockAgentAliasSummary] = Field(default_factory=list)
invokeReady: bool = False
actionTools: list[BedrockAgentActionTool] = Field(default_factory=list)


class InvokeBedrockAgentRequest(BaseModel):
"""Body for invoking an agent via LISA (server-side InvokeAgent)."""

agentId: str = Field(min_length=1)
agentAliasId: str = Field(min_length=1)
inputText: str | None = Field(
default=None,
description="Natural-language turn for the agent orchestrator (omit when using functionName)",
)
sessionId: str | None = Field(
default=None,
description="Bedrock agent session id for multi-turn; generated if omitted",
)
functionName: str | None = Field(
default=None,
description="When set, LISA builds inputText so the agent should run this action-group function",
)
actionGroupId: str | None = Field(default=None, description="Required when functionName is set")
actionGroupName: str | None = Field(default=None, description="Optional; improves orchestration prompt")
parameters: dict[str, Any] | None = Field(
default=None,
description="Parameter values for functionName (object); may be empty",
)

@model_validator(mode="after")
def validate_invoke_mode(self) -> Self:
has_fn = bool(self.functionName and self.functionName.strip())
has_text = bool(self.inputText and str(self.inputText).strip())
if has_fn and has_text:
raise ValueError("Provide either inputText or functionName, not both")
if not has_fn and not has_text:
raise ValueError("Provide inputText or functionName (with optional parameters)")
if has_fn and not (self.actionGroupId and str(self.actionGroupId).strip()):
raise ValueError("actionGroupId is required when functionName is set")
return self
Loading
Loading