Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 254 additions & 22 deletions src/praisonai/praisonai/cli/commands/unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
import atexit
import signal
import json
from pathlib import Path
from types import FrameType
from typing import Set, TextIO
Expand Down Expand Up @@ -76,6 +77,218 @@ def _unregister_cleanup_handlers(original_sigint, original_sigterm):
# so we keep the atexit handler but check if process list is empty


def _spawn_service(service_name: str, service_port: int, host: str, log_handle):
"""Spawn a single service with proper command configuration."""
# Service configuration: [command, extra_flags]
service_config = {
"flow": ["flow", ["--no-open"]],
"claw": ["claw", []],
"ui": ["ui", []]
}

if service_name not in service_config:
raise ValueError(f"Unknown service: {service_name}")

command, extra_flags = service_config[service_name]
argv = [sys.executable, "-m", "praisonai", command, "--port", str(service_port), "--host", host] + extra_flags

return subprocess.Popen(argv, stdout=log_handle, stderr=subprocess.STDOUT)


def _wait_for_service_ready(service_name: str, proc: subprocess.Popen, service_port: int, host: str, log_file: Path, console, timeout: int = 15) -> bool:
"""Wait for service to be ready with proper readiness check."""
check_host = _resolve_check_host(host)
deadline = time.time() + timeout

while time.time() < deadline:
# Check if process crashed
if proc.poll() is not None:
console.print(f"[red]✗ {service_name} crashed during startup (exit code: {proc.returncode})[/red]")
console.print(f"[dim]Check log: {log_file}[/dim]")
return False

# Check if service is accepting connections
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
if sock.connect_ex((check_host, service_port)) == 0:
return True
except OSError:
pass
finally:
sock.close()

time.sleep(0.5)

console.print(f"[red]✗ {service_name} did not become ready within {timeout} seconds[/red]")
console.print(f"[dim]Check log: {log_file}[/dim]")
return False


def _auto_start_services(console, host: str) -> bool:
"""Auto-start PraisonAI services with proper readiness checks."""
services = [
("flow", 7860),
("claw", 8082),
("ui", 8081)
]

success_count = 0

for service_name, service_port in services:
# Check if service is already running
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
check_host = _resolve_check_host(host)
try:
connection_result = sock.connect_ex((check_host, service_port))
if connection_result == 0:
console.print(f"[yellow]✓ {service_name} already running on port {service_port}[/yellow]")
success_count += 1
sock.close()
continue
except OSError:
pass
finally:
sock.close()

# Start the service
console.print(f"[cyan]Starting {service_name} on port {service_port}...[/cyan]")
log_handle = None
proc = None
try:
# Create log directory for troubleshooting
log_dir = Path.home() / ".praisonai" / "unified" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{service_name}.log"

log_handle = open(log_file, "a", encoding="utf-8")

# Spawn service using shared logic
proc = _spawn_service(service_name, service_port, host, log_handle)

# Wait for service to be ready
if _wait_for_service_ready(service_name, proc, service_port, host, log_file, console):
# Track process for cleanup only after successful startup
_ACTIVE_PROCESSES.add(proc)
_PROCESS_LOG_HANDLES[proc] = log_handle
console.print(f"[green]✓ {service_name} started successfully[/green]")
success_count += 1
log_handle = None # Don't close, it's now tracked
else:
# Clean up failed process
if proc and proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
if log_handle:
log_handle.close()
log_handle = None

except Exception as e:
console.print(f"[red]✗ Failed to start {service_name}: {e}[/red]")
if proc:
try:
proc.terminate()
except Exception:
pass
if log_handle:
log_handle.close()

return success_count == len(services)


def _run_aiui_dashboard(port: int, host: str, console) -> bool:
"""Run the aiui dashboard interface using proper CLI entrypoint."""
console.print("[bold green]🦞 Starting aiui Dashboard...[/bold green]")

try:
import tempfile
import os

# Check if aiui is available first
result = subprocess.run([
sys.executable, "-c", "import praisonaiui"
], capture_output=True, text=True)

if result.returncode != 0:
console.print("[red]Error: aiui package not installed.[/red]")
console.print("[yellow]Install with: pip install aiui[/yellow]")
return False

# Create a temporary script for aiui dashboard based on claw pattern
aiui_script = '''import os
import praisonaiui as aiui
from praisonai.ui._aiui_datastore import PraisonAISessionDataStore

# Set up datastore bridge
aiui.set_datastore(PraisonAISessionDataStore())

# Configure dashboard style
aiui.set_style("dashboard")
aiui.set_branding(title="PraisonAI Unified Dashboard", logo="🌟")

# Set up pages for unified dashboard
aiui.set_pages([
"chat", "agents", "memory", "knowledge",
"skills", "sessions", "usage", "config", "logs"
])

# Register a simple welcome message
@aiui.welcome
async def on_welcome():
await aiui.say("🌟 Welcome to PraisonAI Unified Dashboard!")
await aiui.say("Access all your AI services from one interface.")

# Register basic reply handler
@aiui.reply
async def on_reply(message: str, settings: dict | None = None):
await aiui.think("Processing...")
await aiui.say(f"Unified Dashboard received: {message}")
await aiui.say("Use the sidebar to access Flow, Agents, Memory, and more!")
'''

# Create temp file safely
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(aiui_script)
temp_script = f.name

try:
console.print(f"[green]✓ Starting aiui dashboard on {host}:{port}[/green]")

# Use aiui CLI to run the dashboard (NOT create_app)
# Try aiui command first, fallback to python -m
try:
result = subprocess.run([
"aiui", "run", temp_script, "--port", str(port), "--host", host
], check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback to python -m praisonaiui
result = subprocess.run([
sys.executable, "-m", "praisonaiui.cli", "run",
temp_script, "--port", str(port), "--host", host
], check=True)

return True

except subprocess.CalledProcessError as e:
console.print(f"[red]aiui dashboard exited with code {e.returncode}[/red]")
return False
finally:
# Clean up temp file
try:
os.unlink(temp_script)
except (OSError, FileNotFoundError):
pass

except subprocess.CalledProcessError as e:
console.print(f"[red]aiui dashboard failed with code {e.returncode}[/red]")
return False
except Exception as e:
console.print(f"[red]Error running aiui dashboard: {e}[/red]")
return False


def _generate_dashboard_html(host: str = "localhost") -> str:
"""Generate dashboard HTML with dynamic host configuration."""
dashboard_html = """<!DOCTYPE html>
Expand Down Expand Up @@ -393,6 +606,8 @@ def unified(
ctx: typer.Context,
port: int = typer.Option(3000, "--port", "-p", help="Port to run unified dashboard on"),
host: str = typer.Option("127.0.0.1", "--host", help="Host to bind to (use 0.0.0.0 to expose remotely)"),
auto_start: bool = typer.Option(True, "--auto-start/--no-auto-start", help="Auto-start all services"),
aiui: bool = typer.Option(False, "--aiui", help="Use aiui dashboard interface (experimental)"),
):
"""
Launch the PraisonAI Unified Dashboard.
Expand All @@ -402,22 +617,43 @@ def unified(
- Claw Dashboard (Full UI) - port 8082
- Clean Chat UI - port 8081

This unified launcher allows you to:
1. Create agents visually using Flow Builder
2. Chat with agents using the Chat UI
3. Manage everything from Claw Dashboard
4. Connect external services like Telegram
This unified launcher:
1. Auto-starts all services by default (like 'praisonai up')
2. Creates agents visually using Flow Builder
3. Chats with agents using the Chat UI
4. Manages everything from Claw Dashboard
5. Connects external services like Telegram
6. Optionally uses aiui for enhanced dashboard experience

Examples:
praisonai dashboard
praisonai dashboard # Auto-start all services
praisonai dashboard --no-auto-start # Dashboard only (no auto-start)
praisonai dashboard --port 9000 --host 0.0.0.0
praisonai dashboard --aiui # Use aiui interface (experimental)
"""
if ctx.invoked_subcommand is not None:
return

from rich.console import Console
console = Console()

# Auto-start services if enabled (before aiui mode)
if auto_start:
console.print("[bold green]🚀 Auto-starting PraisonAI services...[/bold green]")
if not _auto_start_services(console, host):
console.print("[yellow]⚠️ Some services failed to start, continuing anyway...[/yellow]")
console.print("[green]✅ Auto-start complete[/green]")
console.print()

# Check for aiui mode
if aiui:
result = _run_aiui_dashboard(port, host, console)
if not result:
console.print("[red]Failed to start aiui dashboard, falling back to standard dashboard[/red]")
# Continue to standard dashboard instead of exiting
else:
return # aiui started successfully, exit

# Import optional dependencies inside function to avoid startup overhead
try:
from fastapi import FastAPI, HTTPException
Expand All @@ -429,6 +665,14 @@ def unified(
console.print(f"[dim]Error details: {exc}[/dim]")
raise typer.Abort()

# Auto-start services if not already done for aiui
if auto_start and not aiui:
console.print("[bold green]🚀 Auto-starting PraisonAI services...[/bold green]")
if not _auto_start_services(console, host):
console.print("[yellow]⚠️ Some services failed to start, dashboard will still work[/yellow]")
console.print("[green]✅ Auto-start complete[/green]")
console.print()

# Register cleanup handlers and save originals for restoration
original_handlers = _register_cleanup_handlers()

Expand Down Expand Up @@ -468,22 +712,8 @@ async def start_service(service: str):
log_file = log_dir / f"{service}.log"
log_handle = open(log_file, "a", encoding="utf-8")

if service == "flow":
# Use praisonai module entrypoint so command resolution matches CLI behavior.
proc = subprocess.Popen([
sys.executable, "-m", "praisonai", "flow",
"--port", str(service_port), "--host", host, "--no-open"
], stdout=log_handle, stderr=subprocess.STDOUT)
elif service == "claw":
proc = subprocess.Popen([
sys.executable, "-m", "praisonai", "claw",
"--port", str(service_port), "--host", host
], stdout=log_handle, stderr=subprocess.STDOUT)
elif service == "ui":
proc = subprocess.Popen([
sys.executable, "-m", "praisonai", "ui",
"--port", str(service_port), "--host", host
], stdout=log_handle, stderr=subprocess.STDOUT)
# Use shared service spawning logic
proc = _spawn_service(service, service_port, host, log_handle)

# Wait for service to start with timeout
deadline = time.time() + 15
Expand Down Expand Up @@ -554,6 +784,8 @@ async def health():
console.print()
console.print("[bold green]🌟 Starting PraisonAI Unified Dashboard[/bold green]")
console.print(f"[dim]Unified interface on {host}:{port}[/dim]")
if auto_start:
console.print("[dim]Services auto-started and dashboard ready[/dim]")
console.print("[dim]Access Flow Builder, Claw Dashboard, and Chat UI from one place[/dim]")
console.print()

Expand Down
39 changes: 39 additions & 0 deletions src/praisonai/tests/unit/cli/test_dashboard_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for dashboard (unified) CLI command flags and aiui behavior."""

import inspect
import subprocess
from unittest.mock import MagicMock, patch

from rich.console import Console

from praisonai.cli.commands.unified import _run_aiui_dashboard, unified


def test_unified_command_has_new_flags():
"""unified command should expose auto_start and aiui flags."""
params = inspect.signature(unified).parameters
assert "auto_start" in params
assert "aiui" in params


def test_run_aiui_dashboard_returns_false_when_aiui_missing():
"""Should fail gracefully when praisonaiui import check fails."""
console = Console()
with patch("subprocess.run", return_value=MagicMock(returncode=1)):
assert _run_aiui_dashboard(3000, "127.0.0.1", console) is False


def test_run_aiui_dashboard_returns_false_on_subprocess_failure():
"""Should fail gracefully when aiui subprocess exits non-zero."""
console = Console()
with patch.object(console, "print") as mock_print:
with patch(
"subprocess.run",
side_effect=[
MagicMock(returncode=0),
subprocess.CalledProcessError(1, ["python", "temp_script.py"]),
],
):
assert _run_aiui_dashboard(3000, "127.0.0.1", console) is False
printed_messages = [call.args[0] for call in mock_print.call_args_list if call.args]
assert any("exited with code 1" in message for message in printed_messages)