Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 57 additions & 15 deletions src/cli_agent_orchestrator/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import termios
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated, Dict, List, Optional
from typing import Annotated, Dict, List, Optional, cast

from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect, status
from fastapi import FastAPI, HTTPException, Query, Request, WebSocket, WebSocketDisconnect, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from pydantic import BaseModel, Field, field_validator
Expand All @@ -37,8 +37,9 @@
TERMINAL_LOG_DIR,
)
from cli_agent_orchestrator.models.flow import Flow
from cli_agent_orchestrator.models.inbox import MessageStatus
from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType
from cli_agent_orchestrator.models.terminal import Terminal, TerminalId
from cli_agent_orchestrator.plugins import PluginRegistry
from cli_agent_orchestrator.providers.manager import provider_manager
from cli_agent_orchestrator.services import (
flow_service,
Expand Down Expand Up @@ -127,6 +128,9 @@ async def lifespan(app: FastAPI):
logger.info("Starting CLI Agent Orchestrator server...")
setup_logging()
init_db()
registry = PluginRegistry()
await registry.load()
app.state.plugin_registry = registry

# Run cleanup in background
asyncio.create_task(asyncio.to_thread(cleanup_old_data))
Expand All @@ -136,7 +140,7 @@ async def lifespan(app: FastAPI):

# Start inbox watcher
inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL)
inbox_observer.schedule(LogFileHandler(), str(TERMINAL_LOG_DIR), recursive=False)
inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False)
inbox_observer.start()
logger.info("Inbox watcher started (PollingObserver)")

Expand All @@ -154,9 +158,16 @@ async def lifespan(app: FastAPI):
except asyncio.CancelledError:
pass

await registry.teardown()
logger.info("Shutting down CLI Agent Orchestrator server...")


def get_plugin_registry(request: Request) -> PluginRegistry:
"""Return the plugin registry stored on the FastAPI application state."""

return cast(PluginRegistry, request.app.state.plugin_registry)


app = FastAPI(
title="CLI Agent Orchestrator",
description="Simplified CLI Agent Orchestrator API",
Expand Down Expand Up @@ -289,6 +300,7 @@ async def get_skill_content(name: str) -> SkillContentResponse:

@app.post("/sessions", response_model=Terminal, status_code=status.HTTP_201_CREATED)
async def create_session(
request: Request,
provider: str,
agent_profile: str,
session_name: Optional[str] = None,
Expand All @@ -300,13 +312,13 @@ async def create_session(
# Parse comma-separated allowed_tools string into list
allowed_tools_list = allowed_tools.split(",") if allowed_tools else None

result = terminal_service.create_terminal(
result = session_service.create_session(
provider=provider,
agent_profile=agent_profile,
session_name=session_name,
new_session=True,
working_directory=working_directory,
allowed_tools=allowed_tools_list,
registry=get_plugin_registry(request),
)
return result

Expand Down Expand Up @@ -344,9 +356,9 @@ async def get_session(session_name: str) -> Dict:


@app.delete("/sessions/{session_name}")
async def delete_session(session_name: str) -> Dict:
async def delete_session(request: Request, session_name: str) -> Dict:
try:
result = session_service.delete_session(session_name)
result = session_service.delete_session(session_name, registry=get_plugin_registry(request))
return {"success": True, **result}
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
Expand All @@ -363,6 +375,7 @@ async def delete_session(session_name: str) -> Dict:
status_code=status.HTTP_201_CREATED,
)
async def create_terminal_in_session(
request: Request,
session_name: str,
provider: str,
agent_profile: str,
Expand All @@ -383,6 +396,7 @@ async def create_terminal_in_session(
new_session=False,
working_directory=working_directory,
allowed_tools=allowed_tools_list,
registry=get_plugin_registry(request),
)
return result
except ValueError as e:
Expand Down Expand Up @@ -438,9 +452,24 @@ async def get_terminal_working_directory(terminal_id: TerminalId) -> WorkingDire


@app.post("/terminals/{terminal_id}/input")
async def send_terminal_input(terminal_id: TerminalId, message: str) -> Dict:
async def send_terminal_input(
request: Request,
terminal_id: TerminalId,
message: str,
sender_id: Optional[str] = None,
orchestration_type: Optional[OrchestrationType] = None,
) -> Dict:
try:
success = terminal_service.send_input(terminal_id, message)
if sender_id is None or orchestration_type is None:
Comment thread
patricka3125 marked this conversation as resolved.
Outdated
success = terminal_service.send_input(terminal_id, message)
else:
success = terminal_service.send_input(
terminal_id,
message,
registry=get_plugin_registry(request),
sender_id=sender_id,
orchestration_type=orchestration_type,
)
return {"success": success}
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
Expand Down Expand Up @@ -493,10 +522,12 @@ async def exit_terminal(terminal_id: TerminalId) -> Dict:


@app.delete("/terminals/{terminal_id}")
async def delete_terminal(terminal_id: TerminalId) -> Dict:
async def delete_terminal(request: Request, terminal_id: TerminalId) -> Dict:
"""Delete a terminal."""
try:
success = terminal_service.delete_terminal(terminal_id)
success = terminal_service.delete_terminal(
terminal_id, registry=get_plugin_registry(request)
)
return {"success": success}
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
Expand All @@ -509,11 +540,20 @@ async def delete_terminal(terminal_id: TerminalId) -> Dict:

@app.post("/terminals/{receiver_id}/inbox/messages")
async def create_inbox_message_endpoint(
receiver_id: TerminalId, sender_id: str, message: str
request: Request,
receiver_id: TerminalId,
sender_id: str,
message: str,
orchestration_type: OrchestrationType = "send_message",
) -> Dict:
"""Create inbox message and attempt immediate delivery."""
try:
inbox_msg = create_inbox_message(sender_id, receiver_id, message)
inbox_msg = create_inbox_message(
sender_id,
receiver_id,
message,
orchestration_type=orchestration_type,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except Exception as e:
Expand All @@ -527,7 +567,9 @@ async def create_inbox_message_endpoint(
# the terminal becomes idle. Delivery failures must not cause the API
# to report an error — the message was already persisted above.
try:
inbox_service.check_and_send_pending_messages(receiver_id)
inbox_service.check_and_send_pending_messages(
receiver_id, registry=get_plugin_registry(request)
)
except Exception as e:
logger.warning(f"Immediate delivery attempt failed for {receiver_id}: {e}")

Expand Down
35 changes: 33 additions & 2 deletions src/cli_agent_orchestrator/clients/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cli_agent_orchestrator.constants import DATABASE_URL, DB_DIR, DEFAULT_PROVIDER
from cli_agent_orchestrator.models.flow import Flow
from cli_agent_orchestrator.models.inbox import InboxMessage, MessageStatus
from cli_agent_orchestrator.models.inbox import InboxMessage, MessageStatus, OrchestrationType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,6 +39,7 @@ class InboxModel(Base):
sender_id = Column(String, nullable=False)
receiver_id = Column(String, nullable=False)
message = Column(String, nullable=False)
orchestration_type = Column(String, nullable=False, default="send_message")
status = Column(String, nullable=False) # MessageStatus enum value
created_at = Column(DateTime, default=datetime.now)

Expand Down Expand Up @@ -69,6 +70,7 @@ def init_db() -> None:
"""Initialize database tables and apply schema migrations."""
Base.metadata.create_all(bind=engine)
_migrate_add_allowed_tools()
_migrate_add_inbox_orchestration_type()


def _migrate_add_allowed_tools() -> None:
Expand All @@ -90,6 +92,27 @@ def _migrate_add_allowed_tools() -> None:
logger.warning(f"Migration check for allowed_tools failed: {e}")


def _migrate_add_inbox_orchestration_type() -> None:
Comment thread
patricka3125 marked this conversation as resolved.
Outdated
"""Add orchestration_type column to inbox table if missing."""
import sqlite3

from cli_agent_orchestrator.constants import DATABASE_FILE

try:
conn = sqlite3.connect(str(DATABASE_FILE))
cursor = conn.execute("PRAGMA table_info(inbox)")
columns = {row[1] for row in cursor.fetchall()}
if "orchestration_type" not in columns:
conn.execute(
"ALTER TABLE inbox ADD COLUMN orchestration_type TEXT NOT NULL DEFAULT 'send_message'"
)
conn.commit()
logger.info("Migration: added orchestration_type column to inbox table")
conn.close()
except Exception as e:
logger.warning(f"Migration check for inbox orchestration_type failed: {e}")


def create_terminal(
terminal_id: str,
tmux_session: str,
Expand Down Expand Up @@ -209,13 +232,19 @@ def delete_terminals_by_session(tmux_session: str) -> int:
return deleted


def create_inbox_message(sender_id: str, receiver_id: str, message: str) -> InboxMessage:
def create_inbox_message(
Comment thread
patricka3125 marked this conversation as resolved.
Outdated
sender_id: str,
receiver_id: str,
message: str,
orchestration_type: OrchestrationType = "send_message",
) -> InboxMessage:
"""Create inbox message with status=MessageStatus.PENDING."""
with SessionLocal() as db:
inbox_msg = InboxModel(
sender_id=sender_id,
receiver_id=receiver_id,
message=message,
orchestration_type=orchestration_type,
status=MessageStatus.PENDING.value,
)
db.add(inbox_msg)
Expand All @@ -226,6 +255,7 @@ def create_inbox_message(sender_id: str, receiver_id: str, message: str) -> Inbo
sender_id=inbox_msg.sender_id,
receiver_id=inbox_msg.receiver_id,
message=inbox_msg.message,
orchestration_type=inbox_msg.orchestration_type,
status=MessageStatus(inbox_msg.status),
created_at=inbox_msg.created_at,
)
Expand Down Expand Up @@ -263,6 +293,7 @@ def get_inbox_messages(
sender_id=msg.sender_id,
receiver_id=msg.receiver_id,
message=msg.message,
orchestration_type=msg.orchestration_type,
status=MessageStatus(msg.status),
created_at=msg.created_at,
)
Expand Down
29 changes: 22 additions & 7 deletions src/cli_agent_orchestrator/mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from cli_agent_orchestrator.constants import API_BASE_URL, DEFAULT_PROVIDER
from cli_agent_orchestrator.mcp_server.models import HandoffResult
from cli_agent_orchestrator.models.inbox import OrchestrationType
from cli_agent_orchestrator.models.terminal import TerminalStatus
from cli_agent_orchestrator.utils.terminal import generate_session_name, wait_until_terminal_status

Expand Down Expand Up @@ -177,18 +178,26 @@ def _create_terminal(
return terminal["id"], provider


def _send_direct_input(terminal_id: str, message: str) -> None:
def _send_direct_input(
terminal_id: str, message: str, orchestration_type: OrchestrationType
) -> None:
"""Send input directly to a terminal (bypasses inbox).

Args:
terminal_id: Terminal ID
message: Message to send
orchestration_type: Orchestration mode for plugin event emission

Raises:
Exception: If sending fails
"""
response = requests.post(
f"{API_BASE_URL}/terminals/{terminal_id}/input", params={"message": message}
f"{API_BASE_URL}/terminals/{terminal_id}/input",
params={
"message": message,
"sender_id": os.environ.get("CAO_TERMINAL_ID", "supervisor"),
"orchestration_type": orchestration_type,
},
)
response.raise_for_status()

Expand All @@ -211,7 +220,7 @@ def _send_direct_input_handoff(terminal_id: str, provider: str, message: str) ->
else:
handoff_message = message

_send_direct_input(terminal_id, handoff_message)
_send_direct_input(terminal_id, handoff_message, "handoff")
Comment thread
patricka3125 marked this conversation as resolved.
Outdated


def _send_direct_input_assign(terminal_id: str, message: str) -> None:
Expand All @@ -224,10 +233,12 @@ def _send_direct_input_assign(terminal_id: str, message: str) -> None:
f"When done, send results back to terminal {sender_id} using send_message]"
)

_send_direct_input(terminal_id, message)
_send_direct_input(terminal_id, message, "assign")


def _send_to_inbox(receiver_id: str, message: str) -> Dict[str, Any]:
def _send_to_inbox(
receiver_id: str, message: str, orchestration_type: OrchestrationType = "send_message"
) -> Dict[str, Any]:
"""Send message to another terminal's inbox (queued delivery when IDLE).

Args:
Expand All @@ -247,7 +258,11 @@ def _send_to_inbox(receiver_id: str, message: str) -> Dict[str, Any]:

response = requests.post(
f"{API_BASE_URL}/terminals/{receiver_id}/inbox/messages",
params={"sender_id": sender_id, "message": message},
params={
"sender_id": sender_id,
"message": message,
"orchestration_type": orchestration_type,
},
)
response.raise_for_status()
return response.json()
Expand Down Expand Up @@ -583,7 +598,7 @@ def _send_message_impl(receiver_id: str, message: str) -> Dict[str, Any]:
"Use send_message MCP tool for any follow-up work.]"
)

return _send_to_inbox(receiver_id, message)
return _send_to_inbox(receiver_id, message, "send_message")
except Exception as e:
return {"success": False, "error": str(e)}

Expand Down
7 changes: 7 additions & 0 deletions src/cli_agent_orchestrator/models/inbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

from datetime import datetime
from enum import Enum
from typing import Literal

from pydantic import BaseModel, Field

OrchestrationType = Literal["send_message", "handoff", "assign"]


class MessageStatus(str, Enum):
"""Message status enumeration."""
Expand All @@ -21,5 +24,9 @@ class InboxMessage(BaseModel):
sender_id: str = Field(..., description="Sender terminal ID")
receiver_id: str = Field(..., description="Receiver terminal ID")
message: str = Field(..., description="Message content")
orchestration_type: OrchestrationType = Field(
default="send_message",
description="The orchestration mode that caused the message delivery",
)
status: MessageStatus = Field(..., description="Message status")
created_at: datetime = Field(..., description="Creation timestamp")
Loading